From 1ea15b511cef2b5984a87f22edf31d3229c2acee Mon Sep 17 00:00:00 2001 From: StyleTang Date: Fri, 6 Aug 2021 08:57:04 +0800 Subject: [PATCH] [ISSUE#770] Send message with message trace (#771) --- .../rocketmq/console/config/RMQConfigure.java | 6 ++- .../request/SendTopicMessageRequest.java | 36 ++------------- .../service/impl/MessageTraceServiceImpl.java | 5 +- .../service/impl/TopicServiceImpl.java | 46 +++++++++++++++---- src/main/resources/static/src/i18n/en.js | 3 +- src/main/resources/static/src/i18n/zh.js | 3 +- src/main/resources/static/src/topic.js | 5 +- .../resources/static/view/pages/topic.html | 8 +++- .../MessageTraceControllerTest.java | 3 +- .../impl/MessageTraceServiceImplTest.java | 3 +- .../service/impl/TopicServiceImplTest.java | 14 ++++++ 11 files changed, 78 insertions(+), 54 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java index 7718ffa..62df95b 100644 --- a/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java +++ b/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.console.config; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -120,7 +121,10 @@ public class RMQConfigure { this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect); } - public String getMsgTrackTopicName() { + public String getMsgTrackTopicNameOrDefault() { + if (StringUtils.isEmpty(msgTrackTopicName)) { + return TopicValidator.RMQ_SYS_TRACE_TOPIC; + } return msgTrackTopicName; } diff --git a/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java b/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java index c7ffa8a..c3b1bd8 100644 --- a/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java +++ b/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java @@ -16,41 +16,13 @@ */ package org.apache.rocketmq.console.model.request; +import lombok.Data; + +@Data public class SendTopicMessageRequest { private String topic; private String key; private String tag; private String messageBody; - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public String getTag() { - return tag; - } - - public void setTag(String tag) { - this.tag = tag; - } - - public String getMessageBody() { - return messageBody; - } - - public void setMessageBody(String messageBody) { - this.messageBody = messageBody; - } + private boolean traceEnabled; } diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java index fabe9cd..f837f69 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java @@ -66,10 +66,7 @@ public class MessageTraceServiceImpl implements MessageTraceService { @Override public List queryMessageTraceKey(String key) { - String queryTopic = configure.getMsgTrackTopicName(); - if (StringUtils.isEmpty(queryTopic)) { - queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC; - } + String queryTopic = configure.getMsgTrackTopicNameOrDefault(); logger.info("query data topic name is:{}", queryTopic); return queryMessageTraceByTopicAndKey(queryTopic, key); } diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java index 6400cfb..700ee5b 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java @@ -25,6 +25,8 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -41,6 +43,7 @@ import org.apache.rocketmq.console.service.AbstractCommonService; import org.apache.rocketmq.console.service.TopicService; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.CommandUtil; +import org.joor.Reflect; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -49,6 +52,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { @@ -200,10 +204,14 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ return true; } - public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean useTLS) { - DefaultMQProducer producer = new DefaultMQProducer(producerGroup, rpcHook); - producer.setUseTLS(useTLS); - return producer; + public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook) { + return buildDefaultMQProducer(producerGroup, rpcHook, false); + } + + public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) { + DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, configure.getMsgTrackTopicNameOrDefault()); + defaultMQProducer.setUseTLS(configure.isUseTLS()); + return defaultMQProducer; } private TopicList getSystemTopicList() { @@ -212,7 +220,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ if (isEnableAcl) { rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); } - DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, configure.isUseTLS()); + DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook); producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setNamesrvAddr(configure.getNamesrvAddr()); @@ -229,16 +237,14 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { DefaultMQProducer producer = null; + AclClientRPCHook rpcHook = null; if (configure.isACLEnabled()) { - AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials( + rpcHook = new AclClientRPCHook(new SessionCredentials( configure.getAccessKey(), configure.getSecretKey() )); - producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, configure.isUseTLS()); - } else { - producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, null, configure.isUseTLS()); } - + producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setNamesrvAddr(configure.getNamesrvAddr()); try { @@ -252,8 +258,28 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } catch (Exception e) { throw Throwables.propagate(e); } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); producer.shutdown(); } } + private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { + if (!traceEnabled) { + return; + } + try { + TraceDispatcher traceDispatcher = Reflect.on(producer).field("traceDispatcher").get(); + if (traceDispatcher != null) { + ArrayBlockingQueue traceContextQueue = Reflect.on(traceDispatcher).field("traceContextQueue").get(); + while (traceContextQueue.size() > 0) { + Thread.sleep(1); + } + } + // wait another 150ms until async request send finish + // after new RocketMQ version released, this logic can be removed + // https://github.com/apache/rocketmq/pull/2989 + Thread.sleep(150); + } catch (Exception ignore) { + } + } } diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index 520f4bc..14b61b4 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -84,5 +84,6 @@ var en = { "PASSWORD":"Password", "NO_DATA":"Don't have ", "SYSTEM":"SYSTEM", - "WELCOME":"Hi, welcome using RocketMQ Console" + "WELCOME":"Hi, welcome using RocketMQ Console", + "ENABLE_MESSAGE_TRACE":"Enable Message Trace" } \ No newline at end of file diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index 1ec28b3..c90f537 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -85,5 +85,6 @@ var zh = { "PASSWORD":"密码", "NO_DATA":"不存在 ", "SYSTEM":"系统", - "WELCOME":"您好,欢迎使用RocketMQ控制台" + "WELCOME":"您好,欢迎使用RocketMQ控制台", + "ENABLE_MESSAGE_TRACE":"开启消息轨迹" } \ No newline at end of file diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index ab1bf9a..7450f1a 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -439,8 +439,9 @@ module.controller('sendTopicMessageDialogController', ['$scope', 'ngDialog', '$h $scope.sendTopicMessage = { topic: $scope.ngDialogData.topic, key: "key", - tag:"tag", - messageBody:"messageBody" + tag: "tag", + messageBody: "messageBody", + traceEnabled: false }; $scope.send = function () { $http({ diff --git a/src/main/resources/static/view/pages/topic.html b/src/main/resources/static/view/pages/topic.html index 8b0bdde..f74c3e8 100644 --- a/src/main/resources/static/view/pages/topic.html +++ b/src/main/resources/static/view/pages/topic.html @@ -527,7 +527,13 @@ ng-disabled> - +
+ +
+ + +
+