[ISSUE #19]Message track query enhancement (#21)

* [ISSUE #19]Message track query enhancement

* traceTopic can be null and system Topic is used by default

* add unit test

* select messageTrack topic in messageTrace page

Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
zhangjidi2016
2021-09-23 21:01:32 +08:00
committed by GitHub
parent dc67c660ff
commit 58336d951a
16 changed files with 97 additions and 65 deletions

View File

@@ -18,7 +18,6 @@ package org.apache.rocketmq.dashboard.config;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -48,8 +47,6 @@ public class RMQConfigure {
private boolean enableDashBoardCollect;
private String msgTrackTopicName;
private boolean loginRequired = false;
private String accessKey;
@@ -123,17 +120,6 @@ public class RMQConfigure {
this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect);
}
public String getMsgTrackTopicNameOrDefault() {
if (StringUtils.isEmpty(msgTrackTopicName)) {
return TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
return msgTrackTopicName;
}
public void setMsgTrackTopicName(String msgTrackTopicName) {
this.msgTrackTopicName = msgTrackTopicName;
}
public boolean isLoginRequired() {
return loginRequired;
}

View File

@@ -64,7 +64,8 @@ public class MessageTraceController {
@RequestMapping(value = "/viewMessageTraceGraph.query", method = RequestMethod.GET)
@ResponseBody
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId) {
return messageTraceService.queryMessageTraceGraph(msgId);
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId,
@RequestParam(required = false) String traceTopic) {
return messageTraceService.queryMessageTraceGraph(msgId, traceTopic);
}
}

View File

@@ -51,12 +51,9 @@ public class TopicController {
@RequestMapping(value = "/list.query", method = RequestMethod.GET)
@ResponseBody
public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) {
boolean flag = false;
if ("true".equals(skipSysProcess)) {
flag = true;
}
return topicService.fetchAllTopicList(flag);
public Object list(@RequestParam(value = "skipSysProcess", required = false) boolean skipSysProcess,
@RequestParam(value = "skipRetryAndDlq", required = false) boolean skipRetryAndDlq) {
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
}
@RequestMapping(value = "/stats.query", method = RequestMethod.GET)

View File

@@ -27,5 +27,5 @@ public interface MessageTraceService {
List<MessageTraceView> queryMessageTraceByTopicAndKey(final String topic, final String key);
MessageTraceGraph queryMessageTraceGraph(final String key);
MessageTraceGraph queryMessageTraceGraph(final String key, final String traceTopic);
}

View File

@@ -29,7 +29,7 @@ import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import java.util.List;
public interface TopicService {
TopicList fetchAllTopicList(boolean skipSysProcess);
TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq);
TopicStatsTable stats(String topic);

View File

@@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.MessageTraceView;
import org.apache.rocketmq.dashboard.model.trace.ProducerNode;
@@ -65,7 +66,7 @@ public class MessageTraceServiceImpl implements MessageTraceService {
@Override
public List<MessageTraceView> queryMessageTraceKey(String key) {
String queryTopic = configure.getMsgTrackTopicNameOrDefault();
String queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
logger.info("query data topic name is:{}", queryTopic);
return queryMessageTraceByTopicAndKey(queryTopic, key);
}
@@ -86,8 +87,11 @@ public class MessageTraceServiceImpl implements MessageTraceService {
}
@Override
public MessageTraceGraph queryMessageTraceGraph(String key) {
List<MessageTraceView> messageTraceViews = queryMessageTraceKey(key);
public MessageTraceGraph queryMessageTraceGraph(String key, String topic) {
if (StringUtils.isEmpty(topic)) {
topic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
List<MessageTraceView> messageTraceViews = queryMessageTraceByTopicAndKey(topic, key);
return buildMessageTraceGraph(messageTraceViews);
}

View File

@@ -20,6 +20,7 @@ package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -36,6 +37,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
@@ -61,23 +63,23 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
private RMQConfigure configure;
@Override
public TopicList fetchAllTopicList(boolean skipSysProcess) {
public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq) {
try {
TopicList allTopics = mqAdminExt.fetchAllTopicList();
if (skipSysProcess) {
return allTopics;
}
TopicList sysTopics = getSystemTopicList();
Set<String> topics = new HashSet<>();
for (String topic : allTopics.getTopicList()) {
if (sysTopics.getTopicList().contains(topic)) {
topics.add(String.format("%s%s", "%SYS%", topic));
} else {
topics.add(topic);
}
}
Set<String> topics =
allTopics.getTopicList().stream().map(topic -> {
if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
topic = String.format("%s%s", "%SYS%", topic);
}
return topic;
}).filter(topic -> {
if (skipRetryAndDlq) {
return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
}
return true;
}).collect(Collectors.toSet());
allTopics.getTopicList().clear();
allTopics.getTopicList().addAll(topics);
return allTopics;
@@ -209,7 +211,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
}
public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, configure.getMsgTrackTopicNameOrDefault());
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
}