Compare commits

...

7 Commits

Author SHA1 Message Date
dependabot[bot]
e4cf6fb52a Merge a172446767 into 2fb0fce0b1 2024-05-09 16:34:36 +08:00
Javen
2fb0fce0b1 perf: The new metrics of getTransferredTps for rocketmq5.x and the old metrics of getTransferedTps for rocketmq4.x (#197)
Co-authored-by: jinwei2 <jinwei2@enmonster.com>
2024-03-26 17:02:16 +08:00
Abhijeet Mishra
6456630324 [#148] Throwables.propagate in deprecated for making runtime exception more verbose (#160) 2023-04-19 20:33:17 +08:00
Abhijeet Mishra
a25ccd6337 5.1.0 rocketmq version update (#155)
* update rocketmq version to 5.1.0
2023-04-07 08:30:18 +08:00
Abhijeet Mishra
538d1c1c45 [ISSUE apache#149] updated lombok version in pom.xml because of this compilation was failing (#151) 2023-03-20 15:33:45 +08:00
zhangjidi2016
86bdb06364 [ISSUE #123]Optimize groupList.query (#124)
Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
2022-11-28 16:37:02 +08:00
dependabot[bot]
a172446767 Bump cross-fetch from 3.1.4 to 3.1.5 in /frontend
Bumps [cross-fetch](https://github.com/lquixada/cross-fetch) from 3.1.4 to 3.1.5.
- [Release notes](https://github.com/lquixada/cross-fetch/releases)
- [Commits](https://github.com/lquixada/cross-fetch/compare/v3.1.4...v3.1.5)

---
updated-dependencies:
- dependency-name: cross-fetch
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-04-28 23:19:59 +00:00
44 changed files with 653 additions and 249 deletions

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@
.project .project
.factorypath .factorypath
.settings/ .settings/
.vscode

View File

@@ -3587,11 +3587,11 @@ create-hmac@^1.1.0, create-hmac@^1.1.4, create-hmac@^1.1.7:
sha.js "^2.4.8" sha.js "^2.4.8"
cross-fetch@^3.0.4: cross-fetch@^3.0.4:
version "3.1.4" version "3.1.5"
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.4.tgz#9723f3a3a247bf8b89039f3a380a9244e8fa2f39" resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.5.tgz#e1389f44d9e7ba767907f7af8454787952ab534f"
integrity sha512-1eAtFWdIubi6T4XPy6ei9iUFoKpUkIF971QLN8lIvvvwueI65+Nw5haMNKUwfJxabqlIIDODJKGrQ66gxC0PbQ== integrity sha512-lvb1SBsI0Z7GDwmuid+mU3kWVBwTVUbe7S0H52yaaAdQOXq2YktTCZdlAcNKFzE6QtRz0snpw9bNiPeOIkkQvw==
dependencies: dependencies:
node-fetch "2.6.1" node-fetch "2.6.7"
cross-spawn@7.0.3, cross-spawn@^7.0.0, cross-spawn@^7.0.2: cross-spawn@7.0.3, cross-spawn@^7.0.0, cross-spawn@^7.0.2:
version "7.0.3" version "7.0.3"
@@ -7440,10 +7440,12 @@ no-case@^3.0.4:
lower-case "^2.0.2" lower-case "^2.0.2"
tslib "^2.0.3" tslib "^2.0.3"
node-fetch@2.6.1: node-fetch@2.6.7:
version "2.6.1" version "2.6.7"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052" resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.7.tgz#24de9fba827e3b4ae44dc8b20256a379160052ad"
integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw== integrity sha512-ZjMPFEfVx5j+y2yF35Kzx5sF7kDzxuDj6ziH4FFbOp87zKDZNx8yExJIb05OGF4Nlt9IHFIMBkRl41VdvcNdbQ==
dependencies:
whatwg-url "^5.0.0"
node-forge@^0.10.0: node-forge@^0.10.0:
version "0.10.0" version "0.10.0"
@@ -10669,6 +10671,11 @@ tr46@^2.0.2:
dependencies: dependencies:
punycode "^2.1.1" punycode "^2.1.1"
tr46@~0.0.3:
version "0.0.3"
resolved "https://registry.yarnpkg.com/tr46/-/tr46-0.0.3.tgz#8184fd347dac9cdc185992f3a6622e14b9d9ab6a"
integrity sha1-gYT9NH2snNwYWZLzpmIuFLnZq2o=
tryer@^1.0.1: tryer@^1.0.1:
version "1.0.1" version "1.0.1"
resolved "https://registry.yarnpkg.com/tryer/-/tryer-1.0.1.tgz#f2c85406800b9b0f74c9f7465b81eaad241252f8" resolved "https://registry.yarnpkg.com/tryer/-/tryer-1.0.1.tgz#f2c85406800b9b0f74c9f7465b81eaad241252f8"
@@ -11114,6 +11121,11 @@ web-vitals@^1.0.1:
resolved "https://registry.yarnpkg.com/web-vitals/-/web-vitals-1.1.2.tgz#06535308168986096239aa84716e68b4c6ae6d1c" resolved "https://registry.yarnpkg.com/web-vitals/-/web-vitals-1.1.2.tgz#06535308168986096239aa84716e68b4c6ae6d1c"
integrity sha512-PFMKIY+bRSXlMxVAQ+m2aw9c/ioUYfDgrYot0YUa+/xa0sakubWhSDyxAKwzymvXVdF4CZI71g06W+mqhzu6ig== integrity sha512-PFMKIY+bRSXlMxVAQ+m2aw9c/ioUYfDgrYot0YUa+/xa0sakubWhSDyxAKwzymvXVdF4CZI71g06W+mqhzu6ig==
webidl-conversions@^3.0.0:
version "3.0.1"
resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-3.0.1.tgz#24534275e2a7bc6be7bc86611cc16ae0a5654871"
integrity sha1-JFNCdeKnvGvnvIZhHMFq4KVlSHE=
webidl-conversions@^5.0.0: webidl-conversions@^5.0.0:
version "5.0.0" version "5.0.0"
resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-5.0.0.tgz#ae59c8a00b121543a2acc65c0434f57b0fc11aff" resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-5.0.0.tgz#ae59c8a00b121543a2acc65c0434f57b0fc11aff"
@@ -11260,6 +11272,14 @@ whatwg-mimetype@^2.3.0:
resolved "https://registry.yarnpkg.com/whatwg-mimetype/-/whatwg-mimetype-2.3.0.tgz#3d4b1e0312d2079879f826aff18dbeeca5960fbf" resolved "https://registry.yarnpkg.com/whatwg-mimetype/-/whatwg-mimetype-2.3.0.tgz#3d4b1e0312d2079879f826aff18dbeeca5960fbf"
integrity sha512-M4yMwr6mAnQz76TbJm914+gPpB/nCwvZbJU28cUD6dR004SAxDLOOSUaB1JDRqLtaOV/vi0IC5lEAGFgrjGv/g== integrity sha512-M4yMwr6mAnQz76TbJm914+gPpB/nCwvZbJU28cUD6dR004SAxDLOOSUaB1JDRqLtaOV/vi0IC5lEAGFgrjGv/g==
whatwg-url@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-5.0.0.tgz#966454e8765462e37644d3626f6742ce8b70965d"
integrity sha1-lmRU6HZUYuN2RNNib2dCzotwll0=
dependencies:
tr46 "~0.0.3"
webidl-conversions "^3.0.0"
whatwg-url@^8.0.0: whatwg-url@^8.0.0:
version "8.4.0" version "8.4.0"
resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-8.4.0.tgz#50fb9615b05469591d2b2bd6dfaed2942ed72837" resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-8.4.0.tgz#50fb9615b05469591d2b2bd6dfaed2942ed72837"

View File

@@ -91,10 +91,10 @@
<commons-io.version>2.4</commons-io.version> <commons-io.version>2.4</commons-io.version>
<commons-cli.version>1.2</commons-cli.version> <commons-cli.version>1.2</commons-cli.version>
<commons-collections.version>3.2.2</commons-collections.version> <commons-collections.version>3.2.2</commons-collections.version>
<rocketmq.version>4.9.3</rocketmq.version> <rocketmq.version>5.1.0</rocketmq.version>
<surefire.version>2.19.1</surefire.version> <surefire.version>2.19.1</surefire.version>
<aspectj.version>1.9.6</aspectj.version> <aspectj.version>1.9.6</aspectj.version>
<lombok.version>1.18.12</lombok.version> <lombok.version>1.18.22</lombok.version>
<main.basedir>${basedir}/../..</main.basedir> <main.basedir>${basedir}/../..</main.basedir>
<docker.image.prefix>apacherocketmq</docker.image.prefix> <docker.image.prefix>apacherocketmq</docker.image.prefix>
<spring.boot.version>2.6.0</spring.boot.version> <spring.boot.version>2.6.0</spring.boot.version>

View File

@@ -21,7 +21,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
@Slf4j @Slf4j

View File

@@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.controller;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.model.ConnectionInfo;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;

View File

@@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.controller;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.model.request.MessageQuery;

View File

@@ -17,7 +17,7 @@
package org.apache.rocketmq.dashboard.controller; package org.apache.rocketmq.dashboard.controller;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.model.ConnectionInfo;
import org.apache.rocketmq.dashboard.permisssion.Permission; import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProducerService; import org.apache.rocketmq.dashboard.service.ProducerService;

View File

@@ -20,7 +20,7 @@ import com.google.common.collect.Sets;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
public class ConnectionInfo extends Connection { public class ConnectionInfo extends Connection {
private String versionDesc; private String versionDesc;

View File

@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.List; import java.util.List;

View File

@@ -17,8 +17,8 @@
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import lombok.Data; import lombok.Data;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
@Data @Data
public class DlqMessageResendResult { public class DlqMessageResendResult {

View File

@@ -16,8 +16,8 @@
*/ */
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group; private String group;

View File

@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;

View File

@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.dashboard.model.request; package org.apache.rocketmq.dashboard.model.request;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import java.util.List; import java.util.List;

View File

@@ -15,7 +15,6 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.dashboard.model.request; package org.apache.rocketmq.dashboard.model.request;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import java.util.List; import java.util.List;

View File

@@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@@ -28,7 +28,7 @@ import org.apache.commons.collections.CollectionUtils;
public abstract class AbstractCommonService { public abstract class AbstractCommonService {
@Resource @Resource
protected MQAdminExt mqAdminExt; protected MQAdminExt mqAdminExt;
protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable, protected final Set<String> changeToBrokerNameSet(Map<String, Set<String>> clusterAddrTable,
List<String> clusterNameList, List<String> brokerNameList) { List<String> clusterNameList, List<String> brokerNameList) {
Set<String> finalBrokerNameList = Sets.newHashSet(); Set<String> finalBrokerNameList = Sets.newHashSet();
if (CollectionUtils.isNotEmpty(clusterNameList)) { if (CollectionUtils.isNotEmpty(clusterNameList)) {
@@ -38,7 +38,8 @@ public abstract class AbstractCommonService {
} }
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
if (CollectionUtils.isNotEmpty(brokerNameList)) { if (CollectionUtils.isNotEmpty(brokerNameList)) {

View File

@@ -17,8 +17,8 @@
package org.apache.rocketmq.dashboard.service; package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo; import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;

View File

@@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.MessageTrack;

View File

@@ -17,7 +17,7 @@
package org.apache.rocketmq.dashboard.service; package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
public interface ProducerService { public interface ProducerService {
ProducerConnection getProducerConnection(String producerGroup, String topic); ProducerConnection getProducerConnection(String producerGroup, String topic);

View File

@@ -19,10 +19,10 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;

View File

@@ -29,31 +29,41 @@ import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.util.JsonUtil; import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -63,7 +73,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.common.AdminToolResult;
import org.joor.Reflect; import org.joor.Reflect;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -81,7 +93,7 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override @Override
public void updateBrokerConfig(String brokerAddr, Properties properties) public void updateBrokerConfig(String brokerAddr, Properties properties)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException { UnsupportedEncodingException, InterruptedException, MQBrokerException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
} }
@@ -128,7 +140,7 @@ public class MQAdminExtImpl implements MQAdminExt {
} }
@Override @Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws MQBrokerException {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = null; RemotingCommand response = null;
@@ -136,7 +148,8 @@ public class MQAdminExtImpl implements MQAdminExt {
response = remotingClient.invokeSync(addr, request, 3000); response = remotingClient.invokeSync(addr, request, 3000);
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
assert response != null; assert response != null;
switch (response.getCode()) { switch (response.getCode()) {
@@ -145,12 +158,12 @@ public class MQAdminExtImpl implements MQAdminExt {
return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
} }
default: default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
} }
@Override @Override
public TopicConfig examineTopicConfig(String addr, String topic) { public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null; RemotingCommand response = null;
@@ -158,7 +171,8 @@ public class MQAdminExtImpl implements MQAdminExt {
response = remotingClient.invokeSync(addr, request, 3000); response = remotingClient.invokeSync(addr, request, 3000);
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
@@ -166,7 +180,7 @@ public class MQAdminExtImpl implements MQAdminExt {
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
} }
default: default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
} }
@@ -376,14 +390,14 @@ public class MQAdminExtImpl implements MQAdminExt {
} }
@Override @Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, attributes);
} }
@Override @Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes)
throws MQClientException { throws MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
} }
@Override @Override
@@ -572,4 +586,257 @@ public class MQAdminExtImpl implements MQAdminExt {
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return false; return false;
} }
@Override
public void addBrokerToContainer(String brokerContainerAddr, String brokerConfig) throws InterruptedException,
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'addBrokerToContainer'");
}
@Override
public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'removeBrokerFromContainer'");
}
@Override
public void updateGlobalWhiteAddrConfig(String addr, String globalWhiteAddrs, String aclFileFullPath)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateGlobalWhiteAddrConfig'");
}
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineTopicStats'");
}
@Override
public AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(String topic) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineTopicStatsConcurrent'");
}
@Override
public ConsumeStats examineConsumeStats(String brokerAddr, String consumerGroup, String topicName,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'");
}
@Override
public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String consumerGroup, String topic) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStatsConcurrent'");
}
@Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'");
}
@Override
public ProducerTableInfo getAllProducerInfo(String brokerAddr)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getAllProducerInfo'");
}
@Override
public void deleteTopic(String topicName, String clusterName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteTopic'");
}
@Override
public AdminToolResult<BrokerOperatorResult> deleteTopicInBrokerConcurrent(Set<String> addrs, String topic) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInBrokerConcurrent'");
}
@Override
public void deleteTopicInNameServer(Set<String> addrs, String clusterName, String topic)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInNameServer'");
}
@Override
public AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(String group, String topic, long timestamp) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'resetOffsetNewConcurrent'");
}
@Override
public TopicList queryTopicsByConsumer(String group)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumer'");
}
@Override
public AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(String group) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumerConcurrent'");
}
@Override
public SubscriptionData querySubscription(String group, String topic)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'querySubscription'");
}
@Override
public AdminToolResult<List<QueueTimeSpan>> queryConsumeTimeSpanConcurrent(String topic, String group) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryConsumeTimeSpanConcurrent'");
}
@Override
public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLog'");
}
@Override
public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLogByAddr'");
}
@Override
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack,
boolean metrics) throws RemotingException, MQClientException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getConsumerRunningInfo'");
}
@Override
public List<MessageTrack> messageTrackDetailConcurrent(MessageExt msg)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'messageTrackDetailConcurrent'");
}
@Override
public void setMessageRequestMode(String brokerAddr, String topic, String consumerGroup, MessageRequestMode mode,
int popWorkGroupSize, long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'setMessageRequestMode'");
}
@Override
public long searchOffset(String brokerAddr, String topicName, int queueId, long timestamp, long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'searchOffset'");
}
@Override
public void resetOffsetByQueueId(String brokerAddr, String consumerGroup, String topicName, int queueId,
long resetOffset) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'resetOffsetByQueueId'");
}
@Override
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig,
TopicQueueMappingDetail mappingDetail, boolean force)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'createStaticTopic'");
}
@Override
public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName,
Boolean readable) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateAndGetGroupReadForbidden'");
}
@Override
public MessageExt queryMessage(String clusterName, String topic, String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryMessage'");
}
@Override
public HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getBrokerHAStatus'");
}
@Override
public BrokerReplicasInfo getInSyncStateData(String controllerAddress, List<String> brokers)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getInSyncStateData'");
}
@Override
public EpochEntryCache getBrokerEpochCache(String brokerAddr)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getBrokerEpochCache'");
}
@Override
public GetMetaDataResponseHeader getControllerMetaData(String controllerAddr)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getControllerMetaData'");
}
@Override
public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException,
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'resetMasterFlushOffset'");
}
@Override
public Map<String, Properties> getControllerConfig(List<String> controllerServers)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException, UnsupportedEncodingException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getControllerConfig'");
}
@Override
public void updateControllerConfig(Properties properties, List<String> controllers)
throws InterruptedException, RemotingConnectException, UnsupportedEncodingException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateControllerConfig'");
}
@Override
public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'electMaster'");
}
@Override
public void cleanControllerBrokerData(String controllerAddr, String clusterName, String brokerName,
String brokerAddr, boolean isCleanLivingBroker)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'cleanControllerBrokerData'");
}
} }

View File

@@ -36,8 +36,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.model.request.AclRequest;
import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.AclService; import org.apache.rocketmq.dashboard.service.AclService;
@@ -68,7 +68,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} catch (Exception e) { } catch (Exception e) {
log.error("getAclConfig error.", e); log.error("getAclConfig error.", e);
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
AclConfig aclConfig = new AclConfig(); AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(Collections.emptyList()); aclConfig.setGlobalWhiteAddrs(Collections.emptyList());
@@ -100,7 +101,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -116,7 +118,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr); log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -142,7 +145,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -174,7 +178,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -206,7 +211,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -249,7 +255,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -261,7 +268,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -281,7 +289,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -297,7 +306,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -311,7 +321,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ",")); mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ","));
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }

View File

@@ -17,9 +17,9 @@
package org.apache.rocketmq.dashboard.service.impl; package org.apache.rocketmq.dashboard.service.impl;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.dashboard.service.ClusterService; import org.apache.rocketmq.dashboard.service.ClusterService;
import org.apache.rocketmq.dashboard.util.JsonUtil; import org.apache.rocketmq.dashboard.util.JsonUtil;
@@ -59,7 +59,8 @@ public class ClusterServiceImpl implements ClusterService {
return resultMap; return resultMap;
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -70,7 +71,8 @@ public class ClusterServiceImpl implements ClusterService {
return mqAdminExt.getBrokerConfig(brokerAddr); return mqAdminExt.getBrokerConfig(brokerAddr);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
} }

View File

@@ -29,6 +29,14 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@@ -36,18 +44,19 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
@@ -60,12 +69,12 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.ConsumerService; import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static com.google.common.base.Throwables.propagate;
@Service @Service
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource @Resource
@@ -73,6 +82,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>(); private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
private ExecutorService executorService;
@Override
public void afterPropertiesSet() {
Runtime runtime = Runtime.getRuntime();
int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicLong threadIndex = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
}
};
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000), threadFactory, handler);
}
@Override
public void destroy() {
ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
}
static { static {
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP); SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP); SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
@@ -95,12 +129,29 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList(); List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
for (String consumerGroup : consumerGroupSet) { for (String consumerGroup : consumerGroupSet) {
groupConsumeInfoList.add(queryGroup(consumerGroup)); executorService.submit(() -> {
try {
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
} finally {
countDownLatch.countDown();
}
});
} }
try {
countDownLatch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("query consumerGroup countDownLatch await Exception", e);
}
if (!skipSysGroup) { if (!skipSysGroup) {
groupConsumeInfoList.stream().map(group -> { groupConsumeInfoList.stream().map(group -> {
if (SYSTEM_GROUP_SET.contains(group.getGroup())) { if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
@@ -166,7 +217,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
@Override @Override
@@ -226,7 +278,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return group2ConsumerInfoMap; return group2ConsumerInfoMap;
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -289,7 +342,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return consumerConfigInfoList; return consumerConfigInfoList;
} }
@@ -314,7 +368,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return true; return true;
} }
@@ -341,7 +396,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
return true; return true;
} }
@@ -356,7 +412,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return brokerNameSet; return brokerNameSet;
@@ -368,7 +425,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -378,7 +436,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
} }

View File

@@ -107,7 +107,8 @@ public class DashboardCollectServiceImpl implements DashboardCollectService {
strings = Files.readLines(file, Charsets.UTF_8); strings = Files.readLines(file, Charsets.UTF_8);
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
for (String string : strings) { for (String string : strings) {

View File

@@ -25,8 +25,8 @@ import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult; import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
@@ -62,10 +62,12 @@ public class DlqMessageServiceImpl implements DlqMessageService {
&& e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) { && e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId()); return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId());
} else { } else {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return messageService.queryMessageByPage(query); return messageService.queryMessageByPage(query);
} }

View File

@@ -37,9 +37,9 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException; import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
@@ -115,7 +115,8 @@ public class MessageServiceImpl implements MessageService {
if (err instanceof MQClientException) { if (err instanceof MQClientException) {
throw new ServiceException(-1, ((MQClientException) err).getErrorMessage()); throw new ServiceException(-1, ((MQClientException) err).getErrorMessage());
} }
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -185,7 +186,8 @@ public class MessageServiceImpl implements MessageService {
}); });
return messageViewList; return messageViewList;
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
consumer.shutdown(); consumer.shutdown();
} }
@@ -209,7 +211,8 @@ public class MessageServiceImpl implements MessageService {
try { try {
return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -223,7 +226,8 @@ public class MessageServiceImpl implements MessageService {
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
throw new IllegalStateException("NO CONSUMER"); throw new IllegalStateException("NO CONSUMER");
@@ -388,7 +392,8 @@ public class MessageServiceImpl implements MessageService {
PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total); PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
return new MessagePageTask(page, queueOffsetInfos); return new MessagePageTask(page, queueOffsetInfos);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
consumer.shutdown(); consumer.shutdown();
} }
@@ -455,7 +460,8 @@ public class MessageServiceImpl implements MessageService {
} }
return new PageImpl<>(messageViews, query.page(), total); return new PageImpl<>(messageViews, query.page(), total);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
consumer.shutdown(); consumer.shutdown();
} }

View File

@@ -82,7 +82,8 @@ public class MonitorServiceImpl implements MonitorService {
MixAll.string2File(dataStr, path); MixAll.string2File(dataStr, path);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }

View File

@@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.dashboard.service.ProducerService; import org.apache.rocketmq.dashboard.service.ProducerService;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -35,7 +35,8 @@ public class ProducerServiceImpl implements ProducerService {
return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic); return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
} }

View File

@@ -30,13 +30,13 @@ import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
@@ -84,7 +84,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
allTopics.getTopicList().addAll(topics); allTopics.getTopicList().addAll(topics);
return allTopics; return allTopics;
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -93,7 +94,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
return mqAdminExt.examineTopicStats(topic); return mqAdminExt.examineTopicStats(topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -102,7 +104,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
return mqAdminExt.examineTopicRouteInfo(topic); return mqAdminExt.examineTopicRouteInfo(topic);
} catch (Exception ex) { } catch (Exception ex) {
throw Throwables.propagate(ex); Throwables.throwIfUnchecked(ex);
throw new RuntimeException(ex);
} }
} }
@@ -111,7 +114,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
return mqAdminExt.queryTopicConsumeByWho(topic); return mqAdminExt.queryTopicConsumeByWho(topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -126,7 +130,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
} }
} catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -137,7 +142,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
clusterInfo = mqAdminExt.examineBrokerClusterInfo(); clusterInfo = mqAdminExt.examineBrokerClusterInfo();
return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -170,7 +176,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
} }
mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
} catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
return true; return true;
} }
@@ -181,7 +188,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo(); clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
deleteTopic(topic, clusterName); deleteTopic(topic, clusterName);
@@ -197,11 +205,13 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo(); clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return true; return true;
} }
@@ -230,7 +240,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
producer.start(); producer.start();
return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L); return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
producer.shutdown(); producer.shutdown();
} }
@@ -258,7 +269,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
); );
return producer.send(msg); return producer.send(msg);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown(); producer.shutdown();

View File

@@ -24,10 +24,10 @@ import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -93,7 +93,8 @@ public class CollectTaskRunnble implements Runnable {
try { try {
list = dashboardCollectService.getTopicMap().get(topic); list = dashboardCollectService.getTopicMap().get(topic);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
if (null == list) { if (null == list) {
list = Lists.newArrayList(); list = Lists.newArrayList();

View File

@@ -34,10 +34,10 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.dashboard.service.DashboardCollectService;
@@ -84,7 +84,8 @@ public class DashboardCollectTask {
} }
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -128,7 +129,8 @@ public class DashboardCollectTask {
log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap())); log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap()));
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -144,10 +146,12 @@ public class DashboardCollectTask {
Thread.sleep(1000); Thread.sleep(1000);
} }
catch (InterruptedException e1) { catch (InterruptedException e1) {
throw Throwables.propagate(e1); Throwables.throwIfUnchecked(e1);
throw new RuntimeException(e1);
} }
fetchBrokerRuntimeStats(brokerAddr, retryTime - 1); fetchBrokerRuntimeStats(brokerAddr, retryTime - 1);
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -197,7 +201,8 @@ public class DashboardCollectTask {
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -250,7 +255,7 @@ public class DashboardCollectTask {
private void addSystemTopic() throws Exception { private void addSystemTopic() throws Exception {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable(); Map<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable();
for (Map.Entry<String, Set<String>> entry : clusterTable.entrySet()) { for (Map.Entry<String, Set<String>> entry : clusterTable.entrySet()) {
String clusterName = entry.getKey(); String clusterName = entry.getKey();
TopicValidator.addSystemTopic(clusterName); TopicValidator.addSystemTopic(clusterName);

View File

@@ -50,7 +50,14 @@
<td class="text-center">{{instance.address}}</td> <td class="text-center">{{instance.address}}</td>
<td class="text-center">{{instance.brokerVersionDesc}}</td> <td class="text-center">{{instance.brokerVersionDesc}}</td>
<td class="text-center">{{instance.putTps.split(' ')[0]| number:2}}</td> <td class="text-center">{{instance.putTps.split(' ')[0]| number:2}}</td>
<td class="text-center">{{instance.getTransferedTps.split(' ')[0]| number:2}}</td> <td class="text-center">
<span ng-if="!instance.getTransferedTps || !instance.getTransferedTps.trim()">
{{instance.getTransferredTps.split(' ')[0] | number:2}}
</span>
<span ng-if="instance.getTransferedTps && instance.getTransferedTps.trim()">
{{instance.getTransferedTps.split(' ')[0] | number:2}}
</span>
</td>
<td class="text-center">{{instance.msgPutTotalTodayMorning - <td class="text-center">{{instance.msgPutTotalTodayMorning -
instance.msgPutTotalYesterdayMorning}} instance.msgPutTotalYesterdayMorning}}
</td> </td>

View File

@@ -34,27 +34,27 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl; import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
import org.apache.rocketmq.dashboard.service.client.MQAdminInstance; import org.apache.rocketmq.dashboard.service.client.MQAdminInstance;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;
@@ -82,6 +82,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@@ -570,11 +571,13 @@ public class MQAdminExtImplTest {
public void testCreateTopic() throws Exception { public void testCreateTopic() throws Exception {
assertNotNull(mqAdminExtImpl); assertNotNull(mqAdminExtImpl);
{ {
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt()); doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyMap());
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt()); doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt(), anyMap());
} }
mqAdminExtImpl.createTopic("key", "topic_test", 8); Map<String, String> map = new HashMap<>();
mqAdminExtImpl.createTopic("key", "topic_test", 8, 1); map.put("message.type", "FIFO");
mqAdminExtImpl.createTopic("key", "topic_test", 8, map);
mqAdminExtImpl.createTopic("key", "topic_test", 8, 1, map);
} }
@Test @Test

View File

@@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.admin;
import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;

View File

@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.model.request.AclRequest;
import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl; import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;

View File

@@ -18,8 +18,8 @@ package org.apache.rocketmq.dashboard.controller;
import java.util.HashMap; import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.dashboard.service.impl.ClusterServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ClusterServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.junit.Test; import org.junit.Test;

View File

@@ -23,17 +23,17 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
@@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
consumerService.afterPropertiesSet();
super.mockRmqConfigure(); super.mockRmqConfigure();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
@@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest {
perform = mockMvc.perform(requestBuilder); perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk()) perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(2))) .andExpect(jsonPath("$.data", hasSize(2)))
.andExpect(jsonPath("$.data[0].group").value("group_test"))
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name())); .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
// executorService shutdown
consumerService.destroy();
} }
@Test @Test

View File

@@ -21,10 +21,10 @@ import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.MessageView;

View File

@@ -32,9 +32,9 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl; import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl;

View File

@@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.controller;
import java.util.HashSet; import java.util.HashSet;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.dashboard.interceptor.AuthInterceptor; import org.apache.rocketmq.dashboard.interceptor.AuthInterceptor;
import org.apache.rocketmq.dashboard.service.impl.LoginServiceImpl; import org.apache.rocketmq.dashboard.service.impl.LoginServiceImpl;
import org.apache.rocketmq.dashboard.service.impl.ProducerServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ProducerServiceImpl;

View File

@@ -29,16 +29,16 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl;

View File

@@ -33,12 +33,12 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.BaseTest; import org.apache.rocketmq.dashboard.BaseTest;
import org.apache.rocketmq.dashboard.config.CollectExecutorConfig; import org.apache.rocketmq.dashboard.config.CollectExecutorConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;

View File

@@ -29,7 +29,7 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.ConsumerService; import org.apache.rocketmq.dashboard.service.ConsumerService;
@@ -78,7 +78,8 @@ public abstract class RocketMQConsoleTestBase {
} }
} }
} }
throw Throwables.propagate(exception); Throwables.throwIfUnchecked(exception);
throw new RuntimeException(exception);
} }
} }
@@ -91,7 +92,8 @@ public abstract class RocketMQConsoleTestBase {
producer.start(); producer.start();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -137,7 +139,8 @@ public abstract class RocketMQConsoleTestBase {
consumer.start(); consumer.start();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }

View File

@@ -33,39 +33,39 @@ import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsItem;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.checkerframework.checker.units.qual.A; import org.checkerframework.checker.units.qual.A;
import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; import static org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY;
public class MockObjectUtil { public class MockObjectUtil {