From ea8834bacd895236633df136b55111b33b43bde6 Mon Sep 17 00:00:00 2001 From: yukon Date: Wed, 19 Oct 2022 16:22:23 +0800 Subject: [PATCH 01/15] Update .asf.yaml --- .asf.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.asf.yaml b/.asf.yaml index 685f086..02bc9f1 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -9,3 +9,10 @@ github: squash: true merge: false rebase: false + +notifications: + commits: commits@rocketmq.apache.org + issues: commits@rocketmq.apache.org + pullrequests: commits@rocketmq.apache.org + jobs: commits@rocketmq.apache.org + discussions: dev@rocketmq.apache.org From fc9781e6fc11678e1b35c2f6fead97f6d37580fc Mon Sep 17 00:00:00 2001 From: yannizhou05 <61256379+yannizhou05@users.noreply.github.com> Date: Tue, 22 Nov 2022 18:30:31 -0600 Subject: [PATCH 02/15] Fix flaky testList (#120) --- .../rocketmq/dashboard/controller/TopicControllerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java index abe6ac7..6b5cb56 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java @@ -54,6 +54,7 @@ import org.springframework.http.MediaType; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -135,8 +136,9 @@ public class TopicControllerTest extends BaseControllerTest { // 3、filter system topic requestBuilder = MockMvcRequestBuilders.get(url); perform = mockMvc.perform(requestBuilder); + String[] topicString = {"%SYS%system_topic2","common_topic2","%SYS%system_topic1","common_topic1"}; perform.andExpect(status().isOk()) - .andExpect(jsonPath("$.data.topicList[2]").value("%SYS%system_topic1")); + .andExpect(jsonPath("$.data.topicList").value(containsInAnyOrder(topicString))); } @Test From 7a54427d9c7c900ac4ec96dabc5125e8bf830526 Mon Sep 17 00:00:00 2001 From: Oliver Date: Wed, 23 Nov 2022 08:32:33 +0800 Subject: [PATCH 03/15] [ISSUE #111] Update log name (#112) --- src/main/resources/logback.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 76823e8..87ffc3b 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -25,10 +25,10 @@ - ${user.home}/logs/consolelogs/rocketmq-console.log + ${user.home}/logs/dashboardlogs/rocketmq-dashboard.log true - ${user.home}/logs/consolelogs/rocketmq-console-%d{yyyy-MM-dd}.%i.log + ${user.home}/logs/dashboardlogs/rocketmq-dashboard-%d{yyyy-MM-dd}.%i.log From 86bdb0636494bc23751f5df90fdb56a87b928ca7 Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Mon, 28 Nov 2022 16:37:02 +0800 Subject: [PATCH 04/15] [ISSUE #123]Optimize groupList.query (#124) Co-authored-by: zhangjidi --- .../service/impl/ConsumerServiceImpl.java | 58 ++++++++++++++++++- .../controller/ConsumerControllerTest.java | 4 +- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 3ad85d4..b1011b7 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -29,6 +29,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; @@ -48,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; @@ -60,12 +69,14 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.ConsumerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; import static com.google.common.base.Throwables.propagate; @Service -public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { +public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); @Resource @@ -73,6 +84,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum private static final Set SYSTEM_GROUP_SET = new HashSet<>(); + private ExecutorService executorService; + + @Override + public void afterPropertiesSet() { + Runtime runtime = Runtime.getRuntime(); + int corePoolSize = Math.max(10, runtime.availableProcessors() * 2); + int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2); + ThreadFactory threadFactory = new ThreadFactory() { + private final AtomicLong threadIndex = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet()); + } + }; + RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); + this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(5000), threadFactory, handler); + } + + @Override + public void destroy() { + ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS); + } + static { SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP); SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP); @@ -97,10 +133,26 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum catch (Exception err) { throw Throwables.propagate(err); } - List groupConsumeInfoList = Lists.newArrayList(); + List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); + CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); for (String consumerGroup : consumerGroupSet) { - groupConsumeInfoList.add(queryGroup(consumerGroup)); + executorService.submit(() -> { + try { + GroupConsumeInfo consumeInfo = queryGroup(consumerGroup); + groupConsumeInfoList.add(consumeInfo); + } catch (Exception e) { + logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); + } finally { + countDownLatch.countDown(); + } + }); } + try { + countDownLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("query consumerGroup countDownLatch await Exception", e); + } + if (!skipSysGroup) { groupConsumeInfoList.stream().map(group -> { if (SYSTEM_GROUP_SET.contains(group.getGroup())) { diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java index dfc3c22..b95e80a 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java @@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest { @Before public void init() throws Exception { + consumerService.afterPropertiesSet(); super.mockRmqConfigure(); ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); @@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest { perform = mockMvc.perform(requestBuilder); perform.andExpect(status().isOk()) .andExpect(jsonPath("$.data", hasSize(2))) - .andExpect(jsonPath("$.data[0].group").value("group_test")) .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name())); + // executorService shutdown + consumerService.destroy(); } @Test From 538d1c1c459d67153efe079bb9586da1f91a62e6 Mon Sep 17 00:00:00 2001 From: Abhijeet Mishra Date: Mon, 20 Mar 2023 13:03:45 +0530 Subject: [PATCH 05/15] [ISSUE apache#149] updated lombok version in pom.xml because of this compilation was failing (#151) --- .gitignore | 1 + pom.xml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 8f1890a..f5d6d52 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ .project .factorypath .settings/ +.vscode \ No newline at end of file diff --git a/pom.xml b/pom.xml index 31d4968..62da76c 100644 --- a/pom.xml +++ b/pom.xml @@ -94,7 +94,7 @@ 4.9.3 2.19.1 1.9.6 - 1.18.12 + 1.18.22 ${basedir}/../.. apacherocketmq 2.6.0 From a25ccd63378ee4e750543d6115eab0e6fb8dbba7 Mon Sep 17 00:00:00 2001 From: Abhijeet Mishra Date: Fri, 7 Apr 2023 06:00:18 +0530 Subject: [PATCH 06/15] 5.1.0 rocketmq version update (#155) * update rocketmq version to 5.1.0 --- pom.xml | 2 +- .../admin/MQAdminPooledObjectFactory.java | 2 +- .../controller/ConsumerController.java | 2 +- .../controller/MessageController.java | 2 +- .../controller/ProducerController.java | 2 +- .../dashboard/model/ConnectionInfo.java | 2 +- .../model/ConsumerGroupRollBackStat.java | 2 +- .../model/DlqMessageResendResult.java | 4 +- .../dashboard/model/GroupConsumeInfo.java | 4 +- .../dashboard/model/QueueStatInfo.java | 2 +- .../model/request/ConsumerConfigInfo.java | 2 +- .../model/request/TopicConfigInfo.java | 1 - .../service/AbstractCommonService.java | 4 +- .../dashboard/service/ConsumerService.java | 4 +- .../dashboard/service/MessageService.java | 2 +- .../dashboard/service/ProducerService.java | 2 +- .../dashboard/service/TopicService.java | 8 +- .../service/client/MQAdminExtImpl.java | 319 ++++++++++++++++-- .../service/impl/AclServiceImpl.java | 4 +- .../service/impl/ClusterServiceImpl.java | 6 +- .../service/impl/ConsumerServiceImpl.java | 22 +- .../service/impl/DlqMessageServiceImpl.java | 4 +- .../service/impl/MessageServiceImpl.java | 6 +- .../service/impl/ProducerServiceImpl.java | 2 +- .../service/impl/TopicServiceImpl.java | 12 +- .../dashboard/task/CollectTaskRunnble.java | 8 +- .../dashboard/task/DashboardCollectTask.java | 10 +- .../dashboard/admin/MQAdminExtImplTest.java | 49 +-- .../dashboard/admin/MQAdminPoolTest.java | 2 +- .../controller/AclControllerTest.java | 2 +- .../controller/ClusterControllerTest.java | 4 +- .../controller/ConsumerControllerTest.java | 20 +- .../controller/DlqMessageControllerTest.java | 8 +- .../controller/MessageControllerTest.java | 6 +- .../controller/ProducerControllerTest.java | 4 +- .../controller/TopicControllerTest.java | 16 +- .../task/DashboardCollectTaskTest.java | 12 +- .../testbase/RocketMQConsoleTestBase.java | 2 +- .../dashboard/util/MockObjectUtil.java | 46 +-- 39 files changed, 439 insertions(+), 172 deletions(-) diff --git a/pom.xml b/pom.xml index 62da76c..2b61bfc 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,7 @@ 2.4 1.2 3.2.2 - 4.9.3 + 5.1.0 2.19.1 1.9.6 1.18.22 diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java index b68f931..0c495be 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java +++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java @@ -21,7 +21,7 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.tools.admin.MQAdminExt; @Slf4j diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java index 68becd1..d9f22e4 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.controller; import com.google.common.base.Preconditions; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java index e4dfcd9..9eb08f6 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.controller; import com.google.common.collect.Maps; import org.apache.rocketmq.common.Pair; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.request.MessageQuery; diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java index 9c1d79d..389506e 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.dashboard.controller; import javax.annotation.Resource; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.permisssion.Permission; import org.apache.rocketmq.dashboard.service.ProducerService; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java index 9070542..a100f92 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java @@ -20,7 +20,7 @@ import com.google.common.collect.Sets; import java.util.Collection; import java.util.HashSet; import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.Connection; public class ConnectionInfo extends Connection { private String versionDesc; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java b/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java index f7e4a4e..a42037b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.dashboard.model; -import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import com.google.common.collect.Lists; import java.util.List; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java index 44bf55f..b93978d 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.dashboard.model; import lombok.Data; -import org.apache.rocketmq.common.protocol.body.CMResult; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.CMResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; @Data public class DlqMessageResendResult { diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java index 6429ba7..5c86572 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java @@ -16,8 +16,8 @@ */ package org.apache.rocketmq.dashboard.model; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; public class GroupConsumeInfo implements Comparable { private String group; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java index 38daddd..29dc542 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.dashboard.model; -import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.BeanUtils; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java index acebafc..2a7e9c0 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.dashboard.model.request; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import java.util.List; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java index 32572fe..2c633cd 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java @@ -15,7 +15,6 @@ * limitations under the License. */ package org.apache.rocketmq.dashboard.model.request; - import com.google.common.base.Objects; import java.util.List; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java b/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java index 1f85796..004ece4 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java @@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.service; import org.apache.rocketmq.tools.admin.MQAdminExt; import com.google.common.base.Throwables; import com.google.common.collect.Sets; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; @@ -28,7 +28,7 @@ import org.apache.commons.collections.CollectionUtils; public abstract class AbstractCommonService { @Resource protected MQAdminExt mqAdminExt; - protected final Set changeToBrokerNameSet(HashMap> clusterAddrTable, + protected final Set changeToBrokerNameSet(Map> clusterAddrTable, List clusterNameList, List brokerNameList) { Set finalBrokerNameList = Sets.newHashSet(); if (CollectionUtils.isNotEmpty(clusterNameList)) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java index 6f4965c..c475931 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.dashboard.service; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.TopicConsumerInfo; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java b/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java index 802ca45..36fb5cd 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.tools.admin.api.MessageTrack; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java index cd9f582..ac0e731 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.dashboard.service; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; public interface ProducerService { ProducerConnection getProducerConnection(String producerGroup, String topic); diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java index 57f0dea..3a28444 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java @@ -19,10 +19,10 @@ package org.apache.rocketmq.dashboard.service; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 6788522..2e45a8b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -29,31 +29,41 @@ import org.apache.rocketmq.client.impl.MQAdminImpl; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; -import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; -import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.message.MessageRequestMode; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; +import org.apache.rocketmq.remoting.protocol.body.QueryConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.dashboard.util.JsonUtil; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -63,7 +73,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.apache.rocketmq.tools.admin.common.AdminToolResult; import org.joor.Reflect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +93,7 @@ public class MQAdminExtImpl implements MQAdminExt { @Override public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - UnsupportedEncodingException, InterruptedException, MQBrokerException { + UnsupportedEncodingException, InterruptedException, MQBrokerException, MQClientException { MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); } @@ -376,14 +388,14 @@ public class MQAdminExtImpl implements MQAdminExt { } @Override - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); + public void createTopic(String key, String newTopic, int queueNum, Map attributes) throws MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, attributes); } @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map attributes) throws MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag, attributes); } @Override @@ -572,4 +584,257 @@ public class MQAdminExtImpl implements MQAdminExt { String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return false; } + + @Override + public void addBrokerToContainer(String brokerContainerAddr, String brokerConfig) throws InterruptedException, + MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'addBrokerToContainer'"); + } + + @Override + public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName, + long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'removeBrokerFromContainer'"); + } + + @Override + public void updateGlobalWhiteAddrConfig(String addr, String globalWhiteAddrs, String aclFileFullPath) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'updateGlobalWhiteAddrConfig'"); + } + + @Override + public TopicStatsTable examineTopicStats(String brokerAddr, String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineTopicStats'"); + } + + @Override + public AdminToolResult examineTopicStatsConcurrent(String topic) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineTopicStatsConcurrent'"); + } + + @Override + public ConsumeStats examineConsumeStats(String brokerAddr, String consumerGroup, String topicName, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'"); + } + + @Override + public AdminToolResult examineConsumeStatsConcurrent(String consumerGroup, String topic) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStatsConcurrent'"); + } + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) + throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'"); + } + + @Override + public ProducerTableInfo getAllProducerInfo(String brokerAddr) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getAllProducerInfo'"); + } + + @Override + public void deleteTopic(String topicName, String clusterName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteTopic'"); + } + + @Override + public AdminToolResult deleteTopicInBrokerConcurrent(Set addrs, String topic) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInBrokerConcurrent'"); + } + + @Override + public void deleteTopicInNameServer(Set addrs, String clusterName, String topic) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInNameServer'"); + } + + @Override + public AdminToolResult resetOffsetNewConcurrent(String group, String topic, long timestamp) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'resetOffsetNewConcurrent'"); + } + + @Override + public TopicList queryTopicsByConsumer(String group) + throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumer'"); + } + + @Override + public AdminToolResult queryTopicsByConsumerConcurrent(String group) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumerConcurrent'"); + } + + @Override + public SubscriptionData querySubscription(String group, String topic) + throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'querySubscription'"); + } + + @Override + public AdminToolResult> queryConsumeTimeSpanConcurrent(String topic, String group) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryConsumeTimeSpanConcurrent'"); + } + + @Override + public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLog'"); + } + + @Override + public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLogByAddr'"); + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack, + boolean metrics) throws RemotingException, MQClientException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getConsumerRunningInfo'"); + } + + @Override + public List messageTrackDetailConcurrent(MessageExt msg) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'messageTrackDetailConcurrent'"); + } + + @Override + public void setMessageRequestMode(String brokerAddr, String topic, String consumerGroup, MessageRequestMode mode, + int popWorkGroupSize, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'setMessageRequestMode'"); + } + + @Override + public long searchOffset(String brokerAddr, String topicName, int queueId, long timestamp, long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'searchOffset'"); + } + + @Override + public void resetOffsetByQueueId(String brokerAddr, String consumerGroup, String topicName, int queueId, + long resetOffset) throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'resetOffsetByQueueId'"); + } + + @Override + public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, + TopicQueueMappingDetail mappingDetail, boolean force) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'createStaticTopic'"); + } + + @Override + public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName, + Boolean readable) throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'updateAndGetGroupReadForbidden'"); + } + + @Override + public MessageExt queryMessage(String clusterName, String topic, String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryMessage'"); + } + + @Override + public HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getBrokerHAStatus'"); + } + + @Override + public BrokerReplicasInfo getInSyncStateData(String controllerAddress, List brokers) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getInSyncStateData'"); + } + + @Override + public EpochEntryCache getBrokerEpochCache(String brokerAddr) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getBrokerEpochCache'"); + } + + @Override + public GetMetaDataResponseHeader getControllerMetaData(String controllerAddr) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getControllerMetaData'"); + } + + @Override + public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, + MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'resetMasterFlushOffset'"); + } + + @Override + public Map getControllerConfig(List controllerServers) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQClientException, UnsupportedEncodingException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getControllerConfig'"); + } + + @Override + public void updateControllerConfig(Properties properties, List controllers) + throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'updateControllerConfig'"); + } + + @Override + public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName, + String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'electMaster'"); + } + + @Override + public void cleanControllerBrokerData(String controllerAddr, String clusterName, String brokerName, + String brokerAddr, boolean isCleanLivingBroker) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'cleanControllerBrokerData'"); + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java index 1e7e294..c16392c 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java @@ -36,8 +36,8 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.AclService; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java index c57f7e1..3512ec2 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java @@ -17,9 +17,9 @@ package org.apache.rocketmq.dashboard.service.impl; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.dashboard.service.ClusterService; import org.apache.rocketmq.dashboard.util.JsonUtil; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index b1011b7..a5367a3 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -44,18 +44,18 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java index 006f1c2..5a22643 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java @@ -25,8 +25,8 @@ import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.dashboard.model.DlqMessageResendResult; import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.MessagePage; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java index 6cb6432..b80864b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java @@ -37,9 +37,9 @@ import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.exception.ServiceException; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java index 3ce408b..8918060 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service.impl; import com.google.common.base.Throwables; import javax.annotation.Resource; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; import org.apache.rocketmq.dashboard.service.ProducerService; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.springframework.stereotype.Service; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index 9dfde72..cd0dd89 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -30,13 +30,13 @@ import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java index 2e96566..b772176 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.tools.admin.MQAdminExt; diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java index cbc08da..c943284 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java @@ -34,10 +34,10 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import javax.annotation.Resource; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.service.DashboardCollectService; @@ -250,7 +250,7 @@ public class DashboardCollectTask { private void addSystemTopic() throws Exception { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - HashMap> clusterTable = clusterInfo.getClusterAddrTable(); + Map> clusterTable = clusterInfo.getClusterAddrTable(); for (Map.Entry> entry : clusterTable.entrySet()) { String clusterName = entry.getKey(); TopicValidator.addSystemTopic(clusterName); diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java index 1b6a33b..b2264bd 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java @@ -34,27 +34,27 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; -import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl; import org.apache.rocketmq.dashboard.service.client.MQAdminInstance; import org.apache.rocketmq.dashboard.util.MockObjectUtil; @@ -82,6 +82,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -570,11 +571,13 @@ public class MQAdminExtImplTest { public void testCreateTopic() throws Exception { assertNotNull(mqAdminExtImpl); { - doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt()); - doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt()); + doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyMap()); + doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt(), anyMap()); } - mqAdminExtImpl.createTopic("key", "topic_test", 8); - mqAdminExtImpl.createTopic("key", "topic_test", 8, 1); + Map map = new HashMap<>(); + map.put("message.type", "FIFO"); + mqAdminExtImpl.createTopic("key", "topic_test", 8, map); + mqAdminExtImpl.createTopic("key", "topic_test", 8, 1, map); } @Test diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java index 6859927..48f5265 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.admin; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.tools.admin.MQAdminExt; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/AclControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/AclControllerTest.java index 8899b84..4bd81c7 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/AclControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/AclControllerTest.java @@ -21,7 +21,7 @@ import com.google.common.collect.Lists; import java.util.List; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl; import org.apache.rocketmq.dashboard.util.MockObjectUtil; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ClusterControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ClusterControllerTest.java index e9b9995..96adedc 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/ClusterControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ClusterControllerTest.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.dashboard.controller; import java.util.HashMap; import java.util.Properties; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.dashboard.service.impl.ClusterServiceImpl; import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.junit.Test; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java index b95e80a..3bff28a 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java @@ -23,17 +23,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java index 482b314..d7bb976 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java @@ -21,10 +21,10 @@ import com.google.common.collect.Lists; import java.util.List; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.CMResult; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.CMResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessageView; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/MessageControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/MessageControllerTest.java index 8edc2de..cffb38a 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/MessageControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/MessageControllerTest.java @@ -32,9 +32,9 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.body.CMResult; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.CMResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ProducerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ProducerControllerTest.java index ef1d39b..b0c6608 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/ProducerControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ProducerControllerTest.java @@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.controller; import java.util.HashSet; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; import org.apache.rocketmq.dashboard.interceptor.AuthInterceptor; import org.apache.rocketmq.dashboard.service.impl.LoginServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ProducerServiceImpl; diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java index 6b5cb56..7e50c56 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/TopicControllerTest.java @@ -29,16 +29,16 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl; diff --git a/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java b/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java index c820655..e36d79c 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java @@ -33,12 +33,12 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.BaseTest; import org.apache.rocketmq.dashboard.config.CollectExecutorConfig; import org.apache.rocketmq.dashboard.config.RMQConfigure; diff --git a/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java b/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java index 9814f66..29d6b8f 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java +++ b/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.service.ConsumerService; diff --git a/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java b/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java index fe7ac23..ece2507 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java +++ b/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java @@ -33,39 +33,39 @@ import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.common.AclConfig; -import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.OffsetWrapper; -import org.apache.rocketmq.common.admin.TopicOffset; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper; +import org.apache.rocketmq.remoting.protocol.admin.TopicOffset; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumeStatus; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsItem; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.checkerframework.checker.units.qual.A; -import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; +import static org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; public class MockObjectUtil { From 6456630324e3ff22217a6da221a64c09bea82a72 Mon Sep 17 00:00:00 2001 From: Abhijeet Mishra Date: Wed, 19 Apr 2023 18:03:17 +0530 Subject: [PATCH 07/15] [#148] Throwables.propagate in deprecated for making runtime exception more verbose (#160) --- .../service/AbstractCommonService.java | 3 +- .../service/client/MQAdminExtImpl.java | 14 ++++---- .../service/impl/AclServiceImpl.java | 33 +++++++++++------ .../service/impl/ClusterServiceImpl.java | 6 ++-- .../service/impl/ConsumerServiceImpl.java | 29 +++++++++------ .../impl/DashboardCollectServiceImpl.java | 3 +- .../service/impl/DlqMessageServiceImpl.java | 6 ++-- .../service/impl/MessageServiceImpl.java | 18 ++++++---- .../service/impl/MonitorServiceImpl.java | 3 +- .../service/impl/ProducerServiceImpl.java | 3 +- .../service/impl/TopicServiceImpl.java | 36 ++++++++++++------- .../dashboard/task/CollectTaskRunnble.java | 3 +- .../dashboard/task/DashboardCollectTask.java | 15 +++++--- .../testbase/RocketMQConsoleTestBase.java | 9 +++-- 14 files changed, 118 insertions(+), 63 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java b/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java index 004ece4..a546fbf 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java @@ -38,7 +38,8 @@ public abstract class AbstractCommonService { } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } if (CollectionUtils.isNotEmpty(brokerNameList)) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 2e45a8b..360c0e3 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -140,7 +140,7 @@ public class MQAdminExtImpl implements MQAdminExt { } @Override - public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws MQBrokerException { RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = null; @@ -148,7 +148,8 @@ public class MQAdminExtImpl implements MQAdminExt { response = remotingClient.invokeSync(addr, request, 3000); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } assert response != null; switch (response.getCode()) { @@ -157,12 +158,12 @@ public class MQAdminExtImpl implements MQAdminExt { return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); } default: - throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + throw new MQBrokerException(response.getCode(), response.getRemark()); } } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { + public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException { RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand response = null; @@ -170,7 +171,8 @@ public class MQAdminExtImpl implements MQAdminExt { response = remotingClient.invokeSync(addr, request, 3000); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -178,7 +180,7 @@ public class MQAdminExtImpl implements MQAdminExt { return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); } default: - throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + throw new MQBrokerException(response.getCode(), response.getRemark()); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java index c16392c..0c0177f 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java @@ -68,7 +68,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } catch (Exception e) { log.error("getAclConfig error.", e); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } AclConfig aclConfig = new AclConfig(); aclConfig.setGlobalWhiteAddrs(Collections.emptyList()); @@ -100,7 +101,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -116,7 +118,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -142,7 +145,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -174,7 +178,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -206,7 +211,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -249,7 +255,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -261,7 +268,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -281,7 +289,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -297,7 +306,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -311,7 +321,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ",")); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java index 3512ec2..facf448 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java @@ -59,7 +59,8 @@ public class ClusterServiceImpl implements ClusterService { return resultMap; } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -70,7 +71,8 @@ public class ClusterServiceImpl implements ClusterService { return mqAdminExt.getBrokerConfig(brokerAddr); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index a5367a3..9fb51d6 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -73,8 +73,6 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; -import static com.google.common.base.Throwables.propagate; - @Service public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); @@ -131,7 +129,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); @@ -218,7 +217,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } List mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate() { @Override @@ -278,7 +278,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return group2ConsumerInfoMap; } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -341,7 +342,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return consumerConfigInfoList; } @@ -366,7 +368,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return true; } @@ -393,7 +396,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } return true; } @@ -408,7 +412,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return brokerNameSet; @@ -420,7 +425,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -430,7 +436,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java index fa8f073..75cebd4 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java @@ -107,7 +107,8 @@ public class DashboardCollectServiceImpl implements DashboardCollectService { strings = Files.readLines(file, Charsets.UTF_8); } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } StringBuffer sb = new StringBuffer(); for (String string : strings) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java index 5a22643..4d3c3f7 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java @@ -62,10 +62,12 @@ public class DlqMessageServiceImpl implements DlqMessageService { && e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) { return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId()); } else { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return messageService.queryMessageByPage(query); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java index b80864b..d51197f 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java @@ -115,7 +115,8 @@ public class MessageServiceImpl implements MessageService { if (err instanceof MQClientException) { throw new ServiceException(-1, ((MQClientException) err).getErrorMessage()); } - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -185,7 +186,8 @@ public class MessageServiceImpl implements MessageService { }); return messageViewList; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { consumer.shutdown(); } @@ -209,7 +211,8 @@ public class MessageServiceImpl implements MessageService { try { return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -223,7 +226,8 @@ public class MessageServiceImpl implements MessageService { return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } throw new IllegalStateException("NO CONSUMER"); @@ -388,7 +392,8 @@ public class MessageServiceImpl implements MessageService { PageImpl page = new PageImpl<>(messageViews, query.page(), total); return new MessagePageTask(page, queueOffsetInfos); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { consumer.shutdown(); } @@ -455,7 +460,8 @@ public class MessageServiceImpl implements MessageService { } return new PageImpl<>(messageViews, query.page(), total); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { consumer.shutdown(); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java index ea4dd58..d0e44c3 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java @@ -82,7 +82,8 @@ public class MonitorServiceImpl implements MonitorService { MixAll.string2File(dataStr, path); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java index 8918060..5f5e491 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java @@ -35,7 +35,8 @@ public class ProducerServiceImpl implements ProducerService { return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index cd0dd89..1d7d571 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -84,7 +84,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ allTopics.getTopicList().addAll(topics); return allTopics; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -93,7 +94,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { return mqAdminExt.examineTopicStats(topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -102,7 +104,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { return mqAdminExt.examineTopicRouteInfo(topic); } catch (Exception ex) { - throw Throwables.propagate(ex); + Throwables.throwIfUnchecked(ex); + throw new RuntimeException(ex); } } @@ -111,7 +114,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { return mqAdminExt.queryTopicConsumeByWho(topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -126,7 +130,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -137,7 +142,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ clusterInfo = mqAdminExt.examineBrokerClusterInfo(); return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -170,7 +176,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } return true; } @@ -181,7 +188,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { clusterInfo = mqAdminExt.examineBrokerClusterInfo(); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { deleteTopic(topic, clusterName); @@ -197,11 +205,13 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { clusterInfo = mqAdminExt.examineBrokerClusterInfo(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return true; } @@ -230,7 +240,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ producer.start(); return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { producer.shutdown(); } @@ -258,7 +269,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ ); return producer.send(msg); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); producer.shutdown(); diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java index b772176..28fd7a5 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java @@ -93,7 +93,8 @@ public class CollectTaskRunnble implements Runnable { try { list = dashboardCollectService.getTopicMap().get(topic); } catch (ExecutionException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } if (null == list) { list = Lists.newArrayList(); diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java index c943284..d58668b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java @@ -84,7 +84,8 @@ public class DashboardCollectTask { } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -128,7 +129,8 @@ public class DashboardCollectTask { log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap())); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -144,10 +146,12 @@ public class DashboardCollectTask { Thread.sleep(1000); } catch (InterruptedException e1) { - throw Throwables.propagate(e1); + Throwables.throwIfUnchecked(e1); + throw new RuntimeException(e1); } fetchBrokerRuntimeStats(brokerAddr, retryTime - 1); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -197,7 +201,8 @@ public class DashboardCollectTask { } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java b/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java index 29d6b8f..d4eff14 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java +++ b/src/test/java/org/apache/rocketmq/dashboard/testbase/RocketMQConsoleTestBase.java @@ -78,7 +78,8 @@ public abstract class RocketMQConsoleTestBase { } } } - throw Throwables.propagate(exception); + Throwables.throwIfUnchecked(exception); + throw new RuntimeException(exception); } } @@ -91,7 +92,8 @@ public abstract class RocketMQConsoleTestBase { producer.start(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -137,7 +139,8 @@ public abstract class RocketMQConsoleTestBase { consumer.start(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } From 2fb0fce0b16fe04ff7ec0181d4127267e9c9af31 Mon Sep 17 00:00:00 2001 From: Javen Date: Tue, 26 Mar 2024 17:02:16 +0800 Subject: [PATCH 08/15] perf: The new metrics of getTransferredTps for rocketmq5.x and the old metrics of getTransferedTps for rocketmq4.x (#197) Co-authored-by: jinwei2 --- src/main/resources/static/view/pages/cluster.html | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/resources/static/view/pages/cluster.html b/src/main/resources/static/view/pages/cluster.html index d308ab9..aa778b7 100644 --- a/src/main/resources/static/view/pages/cluster.html +++ b/src/main/resources/static/view/pages/cluster.html @@ -50,7 +50,14 @@ {{instance.address}} {{instance.brokerVersionDesc}} {{instance.putTps.split(' ')[0]| number:2}} - {{instance.getTransferedTps.split(' ')[0]| number:2}} + + + {{instance.getTransferredTps.split(' ')[0] | number:2}} + + + {{instance.getTransferedTps.split(' ')[0] | number:2}} + + {{instance.msgPutTotalTodayMorning - instance.msgPutTotalYesterdayMorning}} From 823bce2b8b770f15ea63674d5e6d66a8d43479d9 Mon Sep 17 00:00:00 2001 From: guangdashao <54162228+guangdashao@users.noreply.github.com> Date: Tue, 4 Jun 2024 11:40:46 +0800 Subject: [PATCH 09/15] feat: add topic message type add message type --- .../model/request/TopicConfigInfo.java | 18 ++- .../service/impl/ClusterServiceImpl.java | 6 + .../service/impl/TopicServiceImpl.java | 151 +++++++++++++----- src/main/resources/application.yml | 8 +- src/main/resources/static/src/i18n/en.js | 8 +- src/main/resources/static/src/i18n/zh.js | 8 +- src/main/resources/static/src/topic.js | 5 +- .../resources/static/view/pages/topic.html | 13 ++ 8 files changed, 168 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java index 2c633cd..6b9eb67 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java @@ -31,6 +31,7 @@ public class TopicConfigInfo { private int perm; private boolean order; + private String messageType; public List getClusterNameList() { return clusterNameList; } @@ -91,6 +92,18 @@ public class TopicConfigInfo { this.order = order; } + + public String getMessageType() { + return messageType; + } + + public void setMessageType(String messageType) { + this.messageType = messageType; + } + + + + @Override public boolean equals(Object o) { if (this == o) @@ -102,12 +115,13 @@ public class TopicConfigInfo { readQueueNums == that.readQueueNums && perm == that.perm && order == that.order && - Objects.equal(topicName, that.topicName); + Objects.equal(topicName, that.topicName) && + Objects.equal(messageType, that.messageType); } @Override public int hashCode() { - return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order); + return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order,messageType); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java index facf448..12e7f71 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.dashboard.service.impl; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.route.BrokerData; @@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.Arrays; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; @Service public class ClusterServiceImpl implements ClusterService { @@ -56,6 +59,9 @@ public class ClusterServiceImpl implements ClusterService { } resultMap.put("clusterInfo", clusterInfo); resultMap.put("brokerServer", brokerServer); + // add messageType + resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted() + .collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue())))); return resultMap; } catch (Exception err) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index 1d7d571..ecd08de 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -18,25 +18,24 @@ package org.apache.rocketmq.dashboard.service.impl; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; -import org.apache.rocketmq.remoting.protocol.body.GroupList; -import org.apache.rocketmq.remoting.protocol.body.TopicList; -import org.apache.rocketmq.remoting.protocol.route.BrokerData; -import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; @@ -44,6 +43,12 @@ import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.TopicService; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.tools.command.CommandUtil; import org.joor.Reflect; import org.springframework.beans.BeanUtils; @@ -55,6 +60,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE; @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { @@ -68,18 +78,18 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ TopicList allTopics = mqAdminExt.fetchAllTopicList(); TopicList sysTopics = getSystemTopicList(); Set topics = - allTopics.getTopicList().stream().map(topic -> { - if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { - topic = String.format("%s%s", "%SYS%", topic); - } - return topic; - }).filter(topic -> { - if (skipRetryAndDlq) { - return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) - || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); - } - return true; - }).collect(Collectors.toSet()); + allTopics.getTopicList().stream().map(topic -> { + if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { + topic = String.format("%s%s", "%SYS%", topic); + } + return topic; + }).filter(topic -> { + if (skipRetryAndDlq) { + return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) + || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); + } + return true; + }).collect(Collectors.toSet()); allTopics.getTopicList().clear(); allTopics.getTopicList().addAll(topics); return allTopics; @@ -123,10 +133,15 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { TopicConfig topicConfig = new TopicConfig(); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); + String messageType = topicCreateOrUpdateRequest.getMessageType(); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.NORMAL.name(); + } + topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()), messageType)); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), - topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { + topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); } } catch (Exception err) { @@ -156,6 +171,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); BeanUtils.copyProperties(topicConfig, topicConfigInfo); topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); + String messageType = topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.UNSPECIFIED.name(); + } + topicConfigInfo.setMessageType(messageType); topicConfigInfoList.add(topicConfigInfo); } return topicConfigInfoList; @@ -226,6 +246,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ return defaultMQProducer; } + public TransactionMQProducer buildTransactionMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) { + TransactionMQProducer defaultMQProducer = new TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC); + defaultMQProducer.setUseTLS(configure.isUseTLS()); + return defaultMQProducer; + } + private TopicList getSystemTopicList() { RPCHook rpcHook = null; boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); @@ -249,32 +275,61 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { - DefaultMQProducer producer = null; + List topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic()); + String messageType = topicConfigInfos.get(0).getMessageType(); AclClientRPCHook rpcHook = null; if (configure.isACLEnabled()) { rpcHook = new AclClientRPCHook(new SessionCredentials( - configure.getAccessKey(), - configure.getSecretKey() + configure.getAccessKey(), + configure.getSecretKey() )); } - producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); - producer.setInstanceName(String.valueOf(System.currentTimeMillis())); - producer.setNamesrvAddr(configure.getNamesrvAddr()); - try { - producer.start(); - Message msg = new Message(sendTopicMessageRequest.getTopic(), - sendTopicMessageRequest.getTag(), - sendTopicMessageRequest.getKey(), - sendTopicMessageRequest.getMessageBody().getBytes() - ); - return producer.send(msg); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } finally { - waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); - producer.shutdown(); + if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) { + // transaction message + TransactionListener transactionListener = new TransactionListenerImpl(); + + TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(configure.getNamesrvAddr()); + producer.setTransactionListener(transactionListener); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.sendMessageInTransaction(msg, null); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); + producer.shutdown(); + } + } else { + // no transaction message + DefaultMQProducer producer = null; + producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(configure.getNamesrvAddr()); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.send(msg); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); + producer.shutdown(); + } } + } private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { @@ -296,4 +351,20 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } catch (Exception ignore) { } } + + static class TransactionListenerImpl implements TransactionListener { + private AtomicInteger transactionIndex = new AtomicInteger(0); + + private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); + + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0ab405e..090e421 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -42,7 +42,9 @@ rocketmq: # configure multiple namesrv addresses to manage multiple different clusters namesrvAddrs: - 127.0.0.1:9876 - - 127.0.0.2:9876 + # - 127.0.0.2:9876 + # - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876 + # - 10.151.47.30:9876 # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true isVIPChannel: # timeout for mqadminExt, default 5000ms @@ -58,8 +60,8 @@ rocketmq: loginRequired: false useTLS: false # set the accessKey and secretKey if you used acl - accessKey: # if version > 4.4.0 - secretKey: # if version > 4.4.0 +# accessKey: rocketmq2 +# secretKey: 12345678 threadpool: config: diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index f9a4e3c..943ce48 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -123,5 +123,11 @@ var en = { "GROUP_PERM":"Group Permission", "SYNCHRONIZE":"Synchronize Data", "SHOW":"Show", - "HIDE":"Hide" + "HIDE":"Hide", + "MESSAGE_TYPE":"messageType", + "MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL", + "MESSAGE_TYPE_NORMAL": "NORMAL", + "MESSAGE_TYPE_FIFO": "FIFO", + "MESSAGE_TYPE_DELAY": "DELAY", + "MESSAGE_TYPE_TRANSACTION": "TRANSACTION", } diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index b6fa589..8a3b3ff 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -124,5 +124,11 @@ var zh = { "GROUP_PERM":"消费组权限", "SYNCHRONIZE":"同步", "SHOW":"显示", - "HIDE":"隐藏" + "HIDE":"隐藏", + "MESSAGE_TYPE":"消息类型", + "MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息", + "MESSAGE_TYPE_NORMAL": "普通消息", + "MESSAGE_TYPE_FIFO": "顺序消息", + "MESSAGE_TYPE_DELAY": "定时/延时消息", + "MESSAGE_TYPE_TRANSACTION": "事务消息", } \ No newline at end of file diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index 998f219..bce0df8 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -328,8 +328,8 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati var bIsUpdate = true; if (request == null) { request = [{ - writeQueueNums: 16, - readQueueNums: 16, + writeQueueNums: 8, + readQueueNums: 8, perm: 6, order: false, topicName: "", @@ -355,6 +355,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati topicRequestList: request, allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable), allBrokerNameList: Object.keys(resp.data.brokerServer), + allMessageTypeList: resp.data.messageTypes, bIsUpdate: bIsUpdate, writeOperationEnabled: $scope.writeOperationEnabled } diff --git a/src/main/resources/static/view/pages/topic.html b/src/main/resources/static/view/pages/topic.html index bea5ac7..7547c10 100644 --- a/src/main/resources/static/view/pages/topic.html +++ b/src/main/resources/static/view/pages/topic.html @@ -63,6 +63,7 @@ + - diff --git a/src/main/resources/static/view/pages/proxy.html b/src/main/resources/static/view/pages/proxy.html new file mode 100644 index 0000000..43f34ce --- /dev/null +++ b/src/main/resources/static/view/pages/proxy.html @@ -0,0 +1,67 @@ + +
+
+

ProxyServerAddressList

+
+ +
+
+ +
+
+
+ + + +
+
+
+
+ + From 2bc59db3409ebc7293b1c8f68bd4bc149036cb13 Mon Sep 17 00:00:00 2001 From: Akai <91858554+1294566108@users.noreply.github.com> Date: Thu, 13 Jun 2024 15:22:38 +0800 Subject: [PATCH 13/15] Supplement UserGuide for RocketMQ 5.0 (#208) * Support UserGuide for new feature * update userGuide md * update userGuide md --------- Co-authored-by: yuanziwei --- docs/1_0_0/UserGuide_CN.md | 12 ++++++++++++ docs/1_0_0/UserGuide_EN.md | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/docs/1_0_0/UserGuide_CN.md b/docs/1_0_0/UserGuide_CN.md index 0bf62a4..291b1c1 100755 --- a/docs/1_0_0/UserGuide_CN.md +++ b/docs/1_0_0/UserGuide_CN.md @@ -63,6 +63,18 @@ * 根据消息主题和消息Id进行消息的查询 * 消息详情可以展示这条消息的详细信息,查看消息对应到具体消费组的消费情况(如果异常,可以查看具体的异常信息)。可以向指定的消费组重发消息。 +## RocketMQ-V5.0 仪表盘 +* 版本切换 + * RocketMQ右上角可切换不同版本,用户可以自主选择 RocketMQ-5.x 或 RocketMQ-4.x 版本 +* 主题页面 + * 支持延迟/顺序/事务消息的筛选 + * 支持延迟/顺序/事物/普通等多种消息类型主题的新增与更新 +* 消费页面 + * 支持顺序消费类型订阅组的过滤 + * 提供顺序消费类型订阅组的新增与更新,如果需要开启顺序消费,FIFO类型的订阅组一定需要打开consumeOrderlyEnable选项 +* 代理页面(RocketMQ 5.0新增) + * 支持代理节点的新增与查询 + * 支持代理节点地址配置:在application.yml中可对proxyAddr和proxyAddrs属性进行预配置 ## HTTPS 方式访问Dashboard * HTTPS功能实际上是使用SpringBoot提供的配置功能即可完成,首先,需要有一个SSL KeyStore来存放服务端证书,可以使用本工程所提供的测试密钥库: diff --git a/docs/1_0_0/UserGuide_EN.md b/docs/1_0_0/UserGuide_EN.md index fd469d2..b896dbc 100644 --- a/docs/1_0_0/UserGuide_EN.md +++ b/docs/1_0_0/UserGuide_EN.md @@ -64,6 +64,18 @@ * look over this message's detail info.you can see the message's consume state(each group has one line),show the exception message if has exception. you can send this message to the group you selected +## RocketMQ-V5.0 dashboard +* Version switching + * RocketMQ can switch between different versions in the upper right corner, and users can freely choose between RocketMQ-5.X or RocketMQ-4.X versions +* Theme page + * Support filtering of delayed/sequential/transaction messages + * Support the addition and update of multiple message types such as delay, sequence, object, and ordinary themes +* Consumption page + * Support filtering of subscription groups for fifo consumption types + * Provide the addition and update of subscription groups for sequential consumption types. If fifo consumption needs to be enabled, FIFO type subscription groups must have the consumeOrderlyEnable option enabled +* Proxy page (Added in RocketMQ 5.0) + * Support for adding and querying proxy nodes + * Support proxy node address configuration: ProxyAddr and proxyAddrs properties can be pre configured in application.yml ## Access Dashboard with HTTPS * SpringBoot itself has provided the SSL configuration. You can use the project test Keystore:resources/rmqcngkeystore.jks. The store is generated with the following unix keytool commands: From d9fc76d3a31c15d3d92fef563b905b8a72aded5d Mon Sep 17 00:00:00 2001 From: Akai <91858554+1294566108@users.noreply.github.com> Date: Wed, 24 Jul 2024 10:56:47 +0800 Subject: [PATCH 14/15] fix:Fixed the issue that normal messages in version v4 are not showed (#222) Co-authored-by: yuanziwei --- src/main/resources/static/src/topic.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index a455572..4ef6720 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -141,6 +141,9 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati if (type.includes("NORMAL")) { return true } + if (!localStorage.getItem('isV5') && type.includes("UNSPECIFIED")) { + return true + } } if ($scope.filterDelay) { if (type.includes("DELAY")) { From 5d08d3b122462b9303da6b2af52a2f2627a2286e Mon Sep 17 00:00:00 2001 From: Akai <91858554+1294566108@users.noreply.github.com> Date: Wed, 24 Jul 2024 10:57:04 +0800 Subject: [PATCH 15/15] Support Unspecified Topic Add & Update & Query (#223) * fix:Fixed the issue that normal messages in version v4 are not showed * feat:support unspecified topic --------- Co-authored-by: yuanziwei --- src/main/resources/static/src/i18n/en.js | 1 + src/main/resources/static/src/i18n/zh.js | 1 + src/main/resources/static/src/topic.js | 15 ++++++++++++--- src/main/resources/static/view/pages/topic.html | 2 ++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index 83083d7..2c1450d 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -60,6 +60,7 @@ var en = { "RETRY": "RETRY", "FIFO": "FIFO", "TRANSACTION": "TRANSACTION", + "UNSPECIFIED": "UNSPECIFIED", "DLQ": "DLQ", "QUANTITY":"Quantity", "TYPE":"Type", diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index f8c3c1d..2f0e6f3 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -61,6 +61,7 @@ var zh = { "RETRY": "重试", "FIFO": "顺序", "TRANSACTION": "事务", + "UNSPECIFIED": "未指定", "DLQ": "死信", "QUANTITY":"数量", "TYPE":"类型", diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index 4ef6720..81383b3 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -48,6 +48,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati $scope.filterDelay = false $scope.filterFifo = false $scope.filterTransaction = false + $scope.filterUnspecified = false $scope.filterRetry = false $scope.filterDLQ = false $scope.filterSystem = false @@ -91,6 +92,9 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati $scope.$watch('filterTransaction', function () { $scope.filterList(1); }); + $scope.$watch('filterUnspecified', function () { + $scope.filterList(1); + }); $scope.$watch('filterDelay', function () { $scope.filterList(1); }); @@ -137,6 +141,11 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati return true } } + if (localStorage.getItem('isV5') && $scope.filterUnspecified) { + if (type.includes("UNSPECIFIED")) { + return true + } + } if ($scope.filterNormal) { if (type.includes("NORMAL")) { return true @@ -145,17 +154,17 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati return true } } - if ($scope.filterDelay) { + if (localStorage.getItem('isV5') && $scope.filterDelay) { if (type.includes("DELAY")) { return true } } - if ($scope.filterFifo) { + if (localStorage.getItem('isV5') && $scope.filterFifo) { if (type.includes("FIFO")) { return true } } - if ($scope.filterTransaction) { + if (localStorage.getItem('isV5') && $scope.filterTransaction) { if (type.includes("TRANSACTION")) { return true } diff --git a/src/main/resources/static/view/pages/topic.html b/src/main/resources/static/view/pages/topic.html index fb0ccf6..f12d352 100644 --- a/src/main/resources/static/view/pages/topic.html +++ b/src/main/resources/static/view/pages/topic.html @@ -30,6 +30,8 @@ {{'TRANSACTION' | translate}} + {{'UNSPECIFIED' | translate}} + {{'RETRY' | translate}} {{'DLQ' | translate}}