[ISSUE#770] Send message with message trace (#771)

This commit is contained in:
StyleTang
2021-08-06 08:57:04 +08:00
committed by GitHub
parent 7868b2b463
commit 1ea15b511c
11 changed files with 78 additions and 54 deletions

View File

@@ -18,6 +18,7 @@ package org.apache.rocketmq.console.config;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -120,7 +121,10 @@ public class RMQConfigure {
this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect); this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect);
} }
public String getMsgTrackTopicName() { public String getMsgTrackTopicNameOrDefault() {
if (StringUtils.isEmpty(msgTrackTopicName)) {
return TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
return msgTrackTopicName; return msgTrackTopicName;
} }

View File

@@ -16,41 +16,13 @@
*/ */
package org.apache.rocketmq.console.model.request; package org.apache.rocketmq.console.model.request;
import lombok.Data;
@Data
public class SendTopicMessageRequest { public class SendTopicMessageRequest {
private String topic; private String topic;
private String key; private String key;
private String tag; private String tag;
private String messageBody; private String messageBody;
private boolean traceEnabled;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getMessageBody() {
return messageBody;
}
public void setMessageBody(String messageBody) {
this.messageBody = messageBody;
}
} }

View File

@@ -66,10 +66,7 @@ public class MessageTraceServiceImpl implements MessageTraceService {
@Override @Override
public List<MessageTraceView> queryMessageTraceKey(String key) { public List<MessageTraceView> queryMessageTraceKey(String key) {
String queryTopic = configure.getMsgTrackTopicName(); String queryTopic = configure.getMsgTrackTopicNameOrDefault();
if (StringUtils.isEmpty(queryTopic)) {
queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
logger.info("query data topic name is:{}", queryTopic); logger.info("query data topic name is:{}", queryTopic);
return queryMessageTraceByTopicAndKey(queryTopic, key); return queryMessageTraceByTopicAndKey(queryTopic, key);
} }

View File

@@ -25,6 +25,8 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
@@ -41,6 +43,7 @@ import org.apache.rocketmq.console.service.AbstractCommonService;
import org.apache.rocketmq.console.service.TopicService; import org.apache.rocketmq.console.service.TopicService;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.CommandUtil;
import org.joor.Reflect;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -49,6 +52,7 @@ import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@Service @Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService { public class TopicServiceImpl extends AbstractCommonService implements TopicService {
@@ -200,10 +204,14 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
return true; return true;
} }
public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean useTLS) { public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook) {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup, rpcHook); return buildDefaultMQProducer(producerGroup, rpcHook, false);
producer.setUseTLS(useTLS); }
return producer;
public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, configure.getMsgTrackTopicNameOrDefault());
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
} }
private TopicList getSystemTopicList() { private TopicList getSystemTopicList() {
@@ -212,7 +220,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
if (isEnableAcl) { if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
} }
DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, configure.isUseTLS()); DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr()); producer.setNamesrvAddr(configure.getNamesrvAddr());
@@ -229,16 +237,14 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Override @Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
DefaultMQProducer producer = null; DefaultMQProducer producer = null;
AclClientRPCHook rpcHook = null;
if (configure.isACLEnabled()) { if (configure.isACLEnabled()) {
AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials( rpcHook = new AclClientRPCHook(new SessionCredentials(
configure.getAccessKey(), configure.getAccessKey(),
configure.getSecretKey() configure.getSecretKey()
)); ));
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, configure.isUseTLS());
} else {
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, null, configure.isUseTLS());
} }
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr()); producer.setNamesrvAddr(configure.getNamesrvAddr());
try { try {
@@ -252,8 +258,28 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} finally { } finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown(); producer.shutdown();
} }
} }
private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
if (!traceEnabled) {
return;
}
try {
TraceDispatcher traceDispatcher = Reflect.on(producer).field("traceDispatcher").get();
if (traceDispatcher != null) {
ArrayBlockingQueue<TraceContext> traceContextQueue = Reflect.on(traceDispatcher).field("traceContextQueue").get();
while (traceContextQueue.size() > 0) {
Thread.sleep(1);
}
}
// wait another 150ms until async request send finish
// after new RocketMQ version released, this logic can be removed
// https://github.com/apache/rocketmq/pull/2989
Thread.sleep(150);
} catch (Exception ignore) {
}
}
} }

View File

@@ -84,5 +84,6 @@ var en = {
"PASSWORD":"Password", "PASSWORD":"Password",
"NO_DATA":"Don't have ", "NO_DATA":"Don't have ",
"SYSTEM":"SYSTEM", "SYSTEM":"SYSTEM",
"WELCOME":"Hi, welcome using RocketMQ Console" "WELCOME":"Hi, welcome using RocketMQ Console",
"ENABLE_MESSAGE_TRACE":"Enable Message Trace"
} }

View File

@@ -85,5 +85,6 @@ var zh = {
"PASSWORD":"密码", "PASSWORD":"密码",
"NO_DATA":"不存在 ", "NO_DATA":"不存在 ",
"SYSTEM":"系统", "SYSTEM":"系统",
"WELCOME":"您好欢迎使用RocketMQ控制台" "WELCOME":"您好欢迎使用RocketMQ控制台",
"ENABLE_MESSAGE_TRACE":"开启消息轨迹"
} }

View File

@@ -439,8 +439,9 @@ module.controller('sendTopicMessageDialogController', ['$scope', 'ngDialog', '$h
$scope.sendTopicMessage = { $scope.sendTopicMessage = {
topic: $scope.ngDialogData.topic, topic: $scope.ngDialogData.topic,
key: "key", key: "key",
tag:"tag", tag: "tag",
messageBody:"messageBody" messageBody: "messageBody",
traceEnabled: false
}; };
$scope.send = function () { $scope.send = function () {
$http({ $http({

View File

@@ -527,7 +527,13 @@
ng-disabled></textarea> ng-disabled></textarea>
</div> </div>
</div> </div>
<div class="form-group">
<label class="control-label col-sm-2">{{'ENABLE_MESSAGE_TRACE'|translate}}:</label>
<div class="col-sm-10">
<md-switch class="md-primary" md-no-ink ng-model="sendTopicMessage.traceEnabled">
</md-switch>
</div>
</div>
</form> </form>
</div> </div>
<div class="modal-footer"> <div class="modal-footer">

View File

@@ -22,6 +22,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.console.service.impl.MessageServiceImpl; import org.apache.rocketmq.console.service.impl.MessageServiceImpl;
import org.apache.rocketmq.console.service.impl.MessageTraceServiceImpl; import org.apache.rocketmq.console.service.impl.MessageTraceServiceImpl;
import org.apache.rocketmq.console.util.MockObjectUtil; import org.apache.rocketmq.console.util.MockObjectUtil;
@@ -56,7 +57,7 @@ public class MessageTraceControllerTest extends BaseControllerTest {
@Before @Before
public void init() throws MQClientException, InterruptedException { public void init() throws MQClientException, InterruptedException {
super.mockRmqConfigure(); super.mockRmqConfigure();
when(configure.getMsgTrackTopicName()).thenReturn(null); when(configure.getMsgTrackTopicNameOrDefault()).thenReturn(TopicValidator.RMQ_SYS_TRACE_TOPIC);
List<MessageExt> messageList = new ArrayList<>(2); List<MessageExt> messageList = new ArrayList<>(2);
MessageExt messageExt = MockObjectUtil.createMessageExt(); MessageExt messageExt = MockObjectUtil.createMessageExt();
messageExt.setBody(MockObjectUtil.createTraceData().getBytes()); messageExt.setBody(MockObjectUtil.createTraceData().getBytes());

View File

@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.console.config.RMQConfigure; import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.model.MessageTraceView; import org.apache.rocketmq.console.model.MessageTraceView;
import org.apache.rocketmq.console.model.trace.MessageTraceGraph; import org.apache.rocketmq.console.model.trace.MessageTraceGraph;
@@ -69,7 +70,7 @@ public class MessageTraceServiceImplTest {
@BeforeEach @BeforeEach
public void init() { public void init() {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
Mockito.when(rmqConfigure.getMsgTrackTopicName()).thenReturn(null); Mockito.when(rmqConfigure.getMsgTrackTopicNameOrDefault()).thenReturn(TopicValidator.RMQ_SYS_TRACE_TOPIC);
fakeMessageExt = new MessageExt(); fakeMessageExt = new MessageExt();
fakeMessageExt.setKeys(Lists.newArrayList(TEST_KEY)); fakeMessageExt.setKeys(Lists.newArrayList(TEST_KEY));
fakeMessageExt.setBody(PUB_TRACE.getBytes(StandardCharsets.UTF_8)); fakeMessageExt.setBody(PUB_TRACE.getBytes(StandardCharsets.UTF_8));

View File

@@ -158,4 +158,18 @@ public class TopicServiceImplTest extends RocketMQConsoleTestBase {
Assert.assertTrue(StringUtils.isNoneBlank(sendResult.getMsgId())); Assert.assertTrue(StringUtils.isNoneBlank(sendResult.getMsgId()));
} }
@Test
public void sendTopicMessageRequestWithMessageTrace() throws Exception {
SendTopicMessageRequest sendTopicMessageRequest = new SendTopicMessageRequest();
sendTopicMessageRequest.setTopic(TEST_CONSOLE_TOPIC);
sendTopicMessageRequest.setMessageBody("sendTopicMessageRequestMessageBody");
sendTopicMessageRequest.setKey("sendTopicMessageRequestKey");
sendTopicMessageRequest.setTag("sendTopicMessageRequestTag");
sendTopicMessageRequest.setTraceEnabled(true);
SendResult sendResult= topicService.sendTopicMessageRequest(sendTopicMessageRequest);
Assert.assertNotNull(sendResult);
Assert.assertTrue(StringUtils.isNoneBlank(sendResult.getMsgId()));
}
} }