[ISSUE #743] Implement RocketMQ message trace UI (#744)

* Backend
1. Add viewMessageTraceGraph method (aggregate MessageTraceView list)
2. MessageTraceView add filed requestId and retryTimes
3. Add Unit Test MessageTraceServiceImplTest
4. Add lombok dependency

Frontend
1. implement message trace graph UI
2. upgrade echarts to 5.1.2
3. message trace page can fetch messageId from routeParams

* 1. fix code style issue
2. fix pom duplicate maven-compiler-plugin issue

* 1. subAfterTrace groupName use subBeforeTrace groupName instead
2. fix MessageTraceServiceImplTest UT

* 1. implement features
  1.1 support transaction message trace
  1.2 support only producer trace enabled case
  1.3 support only consumer trace enabled case
2. add(fix) trace type (message type)
3. use rocketmq.version = 4.9.0

* [ISSUE #743] fix trace UI issue (when costTime is 0, will not show trace node)
Solution: use minimum width 1
This commit is contained in:
StyleTang
2021-07-13 14:39:00 +08:00
committed by GitHub
parent 2e252a54e6
commit 7f413e10ee
16 changed files with 840 additions and 99 deletions

16
pom.xml
View File

@@ -63,9 +63,10 @@
<commons-lang.version>2.6</commons-lang.version>
<commons-io.version>2.4</commons-io.version>
<commons-cli.version>1.2</commons-cli.version>
<rocketmq.version>4.8.0</rocketmq.version>
<rocketmq.version>4.9.0</rocketmq.version>
<surefire.version>2.19.1</surefire.version>
<aspectj.version>1.9.6</aspectj.version>
<lombok.version>1.18.12</lombok.version>
<main.basedir>${basedir}/../..</main.basedir>
<docker.image.prefix>apacherocketmq</docker.image.prefix>
<spring.boot.version>2.2.2.RELEASE</spring.boot.version>
@@ -182,7 +183,11 @@
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
@@ -195,6 +200,13 @@
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>

View File

@@ -18,19 +18,17 @@
package org.apache.rocketmq.console.controller;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.model.MessageView;
import org.apache.rocketmq.console.model.trace.MessageTraceGraph;
import org.apache.rocketmq.console.service.MessageService;
import org.apache.rocketmq.console.service.MessageTraceService;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -41,16 +39,12 @@ import org.springframework.web.bind.annotation.ResponseBody;
@RequestMapping("/messageTrace")
public class MessageTraceController {
private Logger logger = LoggerFactory.getLogger(MessageController.class);
@Resource
private MessageService messageService;
@Resource
private MessageTraceService messageTraceService;
@Resource
private RMQConfigure rmqConfigure;
@RequestMapping(value = "/viewMessage.query", method = RequestMethod.GET)
@ResponseBody
public Object viewMessage(@RequestParam(required = false) String topic, @RequestParam String msgId) {
@@ -62,12 +56,13 @@ public class MessageTraceController {
@RequestMapping(value = "/viewMessageTraceDetail.query", method = RequestMethod.GET)
@ResponseBody
public Object viewTraceMessages(@RequestParam(required = false) String topic, @RequestParam String msgId) {
String queryTopic = rmqConfigure.getMsgTrackTopicName();
if (StringUtils.isEmpty(queryTopic)) {
queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
logger.info("query data topic name is:{}",queryTopic);
return messageTraceService.queryMessageTraceByTopicAndKey(queryTopic, msgId);
public Object viewTraceMessages(@RequestParam String msgId) {
return messageTraceService.queryMessageTraceKey(msgId);
}
@RequestMapping(value = "/viewMessageTraceGraph.query", method = RequestMethod.GET)
@ResponseBody
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId) {
return messageTraceService.queryMessageTraceGraph(msgId);
}
}

View File

@@ -18,15 +18,16 @@
package org.apache.rocketmq.console.model;
import com.google.common.base.Charsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.console.util.MsgTraceDecodeUtil;
public class MessageTraceView {
import java.util.ArrayList;
import java.util.List;
public class MessageTraceView {
private String requestId;
private String msgId;
private String tags;
private String keys;
@@ -38,7 +39,12 @@ public class MessageTraceView {
private long timeStamp;
private String topic;
private String groupName;
private int retryTimes;
private String status;
private String transactionState;
private String transactionId;
private boolean fromTransactionCheck;
private String traceType;
public MessageTraceView() {
}
@@ -51,7 +57,6 @@ public class MessageTraceView {
}
List<TraceContext> traceContextList = MsgTraceDecodeUtil.decoderFromTraceDataString(messageBody);
for (TraceContext context : traceContextList) {
MessageTraceView messageTraceView = new MessageTraceView();
TraceBean traceBean = context.getTraceBeans().get(0);
@@ -69,11 +74,17 @@ public class MessageTraceView {
messageTraceView.setMsgId(traceBean.getMsgId());
messageTraceView.setTags(traceBean.getTags());
messageTraceView.setTopic(traceBean.getTopic());
messageTraceView.setMsgType(context.getTraceType().name());
messageTraceView.setMsgType(traceBean.getMsgType() == null ? null : traceBean.getMsgType().name());
messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
messageTraceView.setTimeStamp(context.getTimeStamp());
messageTraceView.setStoreHost(traceBean.getStoreHost());
messageTraceView.setClientHost(messageExt.getBornHostString());
messageTraceView.setRequestId(context.getRequestId());
messageTraceView.setRetryTimes(traceBean.getRetryTimes());
messageTraceView.setTransactionState(traceBean.getTransactionState() == null ? null : traceBean.getTransactionState().name());
messageTraceView.setTransactionId(traceBean.getTransactionId());
messageTraceView.setFromTransactionCheck(traceBean.isFromTransactionCheck());
messageTraceView.setTraceType(context.getTraceType() == null ? null : context.getTraceType().name());
messageTraceViewList.add(messageTraceView);
}
return messageTraceViewList;
@@ -174,4 +185,52 @@ public class MessageTraceView {
public void setClientHost(String clientHost) {
this.clientHost = clientHost;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public String getTransactionState() {
return transactionState;
}
public void setTransactionState(String transactionState) {
this.transactionState = transactionState;
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}
public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
public String getTraceType() {
return traceType;
}
public void setTraceType(String traceType) {
this.traceType = traceType;
}
}

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model.trace;
import lombok.Data;
import org.apache.rocketmq.console.model.MessageTraceView;
import java.util.List;
@Data
public class MessageTraceGraph {
private ProducerNode producerNode;
private List<SubscriptionNode> subscriptionNodeList;
private List<MessageTraceView> messageTraceViews;
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model.trace;
import lombok.Data;
import java.util.List;
@Data
public class ProducerNode {
private String msgId;
private String tags;
private String keys;
private String offSetMsgId;
private String topic;
private String groupName;
private TraceNode traceNode;
private List<TraceNode> transactionNodeList;
}

View File

@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model.trace;
import lombok.Data;
import java.util.List;
@Data
public class SubscriptionNode {
private String subscriptionGroup;
private List<TraceNode> consumeNodeList;
}

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model.trace;
import lombok.Data;
@Data
public class TraceNode {
private String requestId;
private String storeHost;
private String clientHost;
private int costTime;
private long beginTimeStamp;
private long endTimeStamp;
private int retryTimes;
private String status;
private String transactionState;
private String transactionId;
private boolean fromTransactionCheck;
private String msgType;
}

View File

@@ -19,9 +19,13 @@ package org.apache.rocketmq.console.service;
import java.util.List;
import org.apache.rocketmq.console.model.MessageTraceView;
import org.apache.rocketmq.console.model.trace.MessageTraceGraph;
public interface MessageTraceService {
List<MessageTraceView> queryMessageTraceKey(final String key);
List<MessageTraceView> queryMessageTraceByTopicAndKey(final String topic, final String key);
MessageTraceGraph queryMessageTraceGraph(final String key);
}

View File

@@ -271,6 +271,11 @@ public class MQAdminExtImpl implements MQAdminExt {
MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName);
}
@Override
public void deleteSubscriptionGroup(String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
throw new UnsupportedOperationException();
}
@Override
public void createAndUpdateKvConfig(String namespace, String key, String value)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

View File

@@ -17,17 +17,36 @@
package org.apache.rocketmq.console.service.impl;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.model.MessageTraceView;
import org.apache.rocketmq.console.model.trace.ProducerNode;
import org.apache.rocketmq.console.model.trace.MessageTraceGraph;
import org.apache.rocketmq.console.model.trace.SubscriptionNode;
import org.apache.rocketmq.console.model.trace.TraceNode;
import org.apache.rocketmq.console.service.MessageTraceService;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@Service
public class MessageTraceServiceImpl implements MessageTraceService {
@@ -38,7 +57,21 @@ public class MessageTraceServiceImpl implements MessageTraceService {
@Resource
private MQAdminExt mqAdminExt;
@Override public List<MessageTraceView> queryMessageTraceByTopicAndKey(String topic, String key) {
@Resource
private RMQConfigure rmqConfigure;
@Override
public List<MessageTraceView> queryMessageTraceKey(String key) {
String queryTopic = rmqConfigure.getMsgTrackTopicName();
if (StringUtils.isEmpty(queryTopic)) {
queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
logger.info("query data topic name is:{}", queryTopic);
return queryMessageTraceByTopicAndKey(queryTopic, key);
}
@Override
public List<MessageTraceView> queryMessageTraceByTopicAndKey(String topic, String key) {
try {
List<MessageTraceView> messageTraceViews = new ArrayList<MessageTraceView>();
List<MessageExt> messageTraceList = mqAdminExt.queryMessage(topic, key, QUERY_MESSAGE_MAX_NUM, 0, System.currentTimeMillis()).getMessageList();
@@ -51,4 +84,106 @@ public class MessageTraceServiceImpl implements MessageTraceService {
throw Throwables.propagate(err);
}
}
@Override
public MessageTraceGraph queryMessageTraceGraph(String key) {
List<MessageTraceView> messageTraceViews = queryMessageTraceKey(key);
return buildMessageTraceGraph(messageTraceViews);
}
private MessageTraceGraph buildMessageTraceGraph(List<MessageTraceView> messageTraceViews) {
MessageTraceGraph messageTraceGraph = new MessageTraceGraph();
messageTraceGraph.setMessageTraceViews(messageTraceViews);
if (CollectionUtils.isEmpty(messageTraceViews)) {
return messageTraceGraph;
}
ProducerNode producerNode = null;
List<TraceNode> transactionNodeList = new ArrayList<>();
Map<String, Pair<MessageTraceView, MessageTraceView>> requestIdTracePairMap = Maps.newHashMap();
for (MessageTraceView messageTraceView : messageTraceViews) {
switch (TraceType.valueOf(messageTraceView.getTraceType())) {
case Pub:
producerNode = buildMessageRoot(messageTraceView);
break;
case EndTransaction:
transactionNodeList.add(buildTransactionNode(messageTraceView));
break;
case SubBefore:
case SubAfter:
putIntoMessageTraceViewGroupMap(messageTraceView, requestIdTracePairMap);
break;
default:
break;
}
}
if (producerNode != null) {
producerNode.setTransactionNodeList(transactionNodeList);
}
messageTraceGraph.setProducerNode(producerNode);
messageTraceGraph.setSubscriptionNodeList(buildSubscriptionNodeList(requestIdTracePairMap));
return messageTraceGraph;
}
private TraceNode buildTransactionNode(MessageTraceView messageTraceView) {
return buildTraceNode(messageTraceView);
}
private List<SubscriptionNode> buildSubscriptionNodeList(
Map<String, Pair<MessageTraceView, MessageTraceView>> requestIdTracePairMap) {
Map<String, List<TraceNode>> subscriptionTraceNodeMap = Maps.newHashMap();
for (Pair<MessageTraceView, MessageTraceView> traceNodePair : requestIdTracePairMap.values()) {
MessageTraceView subBeforeTrace = traceNodePair.getObject1();
MessageTraceView subAfterTrace = traceNodePair.getObject2();
List<TraceNode> traceNodeList = subscriptionTraceNodeMap.computeIfAbsent(subBeforeTrace.getGroupName(),
(o) -> Lists.newArrayList());
TraceNode consumeNode = new TraceNode();
consumeNode.setRequestId(subBeforeTrace.getRequestId());
consumeNode.setStoreHost(subBeforeTrace.getStoreHost());
consumeNode.setClientHost(subBeforeTrace.getClientHost());
consumeNode.setRetryTimes(subBeforeTrace.getRetryTimes());
consumeNode.setBeginTimeStamp(subBeforeTrace.getTimeStamp());
consumeNode.setCostTime(subAfterTrace.getCostTime());
consumeNode.setEndTimeStamp(subBeforeTrace.getTimeStamp() + subAfterTrace.getCostTime());
consumeNode.setStatus(subAfterTrace.getStatus());
traceNodeList.add(consumeNode);
}
return subscriptionTraceNodeMap.entrySet().stream()
.map((Function<Map.Entry<String, List<TraceNode>>, SubscriptionNode>) subscriptionEntry -> {
SubscriptionNode subscriptionNode = new SubscriptionNode();
subscriptionNode.setSubscriptionGroup(subscriptionEntry.getKey());
subscriptionNode.setConsumeNodeList(subscriptionEntry.getValue());
return subscriptionNode;
}).collect(Collectors.toList());
}
private void putIntoMessageTraceViewGroupMap(MessageTraceView messageTraceView,
Map<String, Pair<MessageTraceView, MessageTraceView>> messageTraceViewGroupMap) {
Pair<MessageTraceView, MessageTraceView> messageTracePair = messageTraceViewGroupMap
.computeIfAbsent(messageTraceView.getRequestId(), (o) -> new Pair<>(null, null));
switch (TraceType.valueOf(messageTraceView.getTraceType())) {
case SubBefore:
messageTracePair.setObject1(messageTraceView);
break;
case SubAfter:
messageTracePair.setObject2(messageTraceView);
break;
default:
break;
}
}
private ProducerNode buildMessageRoot(MessageTraceView messageTraceView) {
ProducerNode root = new ProducerNode();
BeanUtils.copyProperties(messageTraceView, root);
root.setTraceNode(buildTraceNode(messageTraceView));
return root;
}
private TraceNode buildTraceNode(MessageTraceView messageTraceView) {
TraceNode traceNode = new TraceNode();
BeanUtils.copyProperties(messageTraceView, traceNode);
traceNode.setBeginTimeStamp(messageTraceView.getTimeStamp());
traceNode.setEndTimeStamp(messageTraceView.getTimeStamp() + messageTraceView.getCostTime());
return traceNode;
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.rocketmq.console.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceContext;
@@ -127,16 +128,33 @@ public class MsgTraceDecodeUtil {
case TRACE_MSG_SUBAFTER_V3_LEN:
subAfterContext.setContextCode(Integer.parseInt(line[6]));
subAfterContext.setTimeStamp(Long.parseLong(line[7]));
subAfterContext.setGroupName(line[8]);
break;
default:
subAfterContext.setContextCode(Integer.parseInt(line[6]));
subAfterContext.setTimeStamp(Long.parseLong(line[7]));
subAfterContext.setGroupName(line[8]);
log.warn("Detect new version trace msg of {} type", TraceType.SubAfter.name());
break;
}
resList.add(subAfterContext);
} else if (line[0].equals(TraceType.EndTransaction.name())) {
TraceContext endTransactionContext = new TraceContext();
endTransactionContext.setTraceType(TraceType.EndTransaction);
endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
endTransactionContext.setRegionId(line[2]);
endTransactionContext.setGroupName(line[3]);
TraceBean bean = new TraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setTransactionId(line[10]);
bean.setTransactionState(LocalTransactionState.valueOf(line[11]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[12]));
endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean);
resList.add(endTransactionContext);
}
}
return resList;

View File

@@ -16,34 +16,43 @@
*/
var module = app;
module.controller('messageTraceController', ['$scope', 'ngDialog', '$http','Notification',function ($scope, ngDialog, $http,Notification) {
const PRODUCER_COLOR = '#029e02'
const SUCCESS_COLOR = '#75d874';
const ERROR_COLOR = 'red';
const TRANSACTION_COMMIT_COLOR = SUCCESS_COLOR;
const TRANSACTION_ROLLBACK_COLOR = ERROR_COLOR;
const TRANSACTION_UNKNOWN_COLOR = 'grey'
const TIME_FORMAT_PATTERN = "YYYY-MM-DD HH:mm:ss.SSS";
const DEFAULT_DISPLAY_DURATION = 10 * 1000
// transactionTraceNode do not have costTime, assume it cost 50ms
const transactionCheckCostTime = 50;
module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog', '$http', 'Notification', function ($scope, $routeParams, ngDialog, $http, Notification) {
$scope.allTopicList = [];
$scope.selectedTopic =[];
$scope.key ="";
$scope.messageId ="";
$scope.queryMessageByTopicAndKeyResult=[];
$scope.queryMessageByMessageIdResult={};
$scope.queryMessageTraceListsByTopicAndKeyResult=[];
$scope.selectedTopic = [];
$scope.key = "";
$scope.messageId = $routeParams.messageId;
$scope.queryMessageByTopicAndKeyResult = [];
$scope.queryMessageByMessageIdResult = {};
$scope.queryMessageTraceListsByTopicAndKeyResult = [];
$http({
method: "GET",
url: "topic/list.query",
params: {
skipSysProcess:"true"
skipSysProcess: "true"
}
}).success(function (resp) {
if(resp.status ==0){
if (resp.status == 0) {
$scope.allTopicList = resp.data.topicList.sort();
console.log($scope.allTopicList);
}else {
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.timepickerBegin = moment().subtract(1, 'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerEnd = moment().add(1,'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerOptions ={format: 'YYYY-MM-DD HH:mm', showClear: true};
$scope.timepickerEnd = moment().add(1, 'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerOptions = {format: 'YYYY-MM-DD HH:mm', showClear: true};
$scope.queryMessageByTopicAndKey = function () {
console.log($scope.selectedTopic);
console.log($scope.key);
@@ -52,45 +61,45 @@ module.controller('messageTraceController', ['$scope', 'ngDialog', '$http','Noti
url: "message/queryMessageByTopicAndKey.query",
params: {
topic: $scope.selectedTopic,
key:$scope.key
key: $scope.key
}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
$scope.queryMessageByTopicAndKeyResult = resp.data;
console.log($scope.queryMessageByTopicAndKeyResult);
}else {
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
$scope.queryMessageByMessageId = function (messageId,topic) {
$scope.queryMessageByMessageId = function (messageId, topic) {
$http({
method: "GET",
url: "messageTrace/viewMessage.query",
params: {
msgId: messageId,
topic:topic
topic: topic
}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
$scope.queryMessageByMessageIdResult = resp.data;
console.log($scope.queryMessageByTopicAndKeyResult);
}else {
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
$scope.queryMessageTraceByMessageId = function (messageId,topic) {
$scope.queryMessageTraceByMessageId = function (messageId, topic) {
$http({
method: "GET",
url: "messageTrace/viewMessageTraceDetail.query",
url: "messageTrace/viewMessageTraceGraph.query",
params: {
msgId: messageId,
topic:topic
topic: topic
}
}).success(function (resp) {
if (resp.status == 0) {
@@ -98,27 +107,247 @@ module.controller('messageTraceController', ['$scope', 'ngDialog', '$http','Noti
ngDialog.open({
template: 'messageTraceDetailViewDialog',
controller: 'messageTraceDetailViewDialogController',
data:resp.data
data: resp.data
});
}else {
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
}]);
module.controller('messageTraceDetailViewDialogController',['$scope', 'ngDialog', '$http','Notification', function ($scope, ngDialog, $http,Notification) {
$scope.showExceptionDesc = function (errmsg) {
if(errmsg == null){
errmsg = "Don't have Exception"
module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout', 'ngDialog', '$http', 'Notification', function ($scope, $timeout, ngDialog, $http, Notification) {
$scope.displayGraph = false;
$scope.graphButtonName = 'Show Graph';
$scope.displayMessageTraceGraph = function (messageTraceGraph) {
let dom = document.getElementById("messageTraceGraph");
$scope.messageTraceGraph = echarts.init(dom);
let option;
let data = [];
let dataZoomEnd = 100;
let startTime = Number.MAX_VALUE;
let endTime = 0;
let messageGroups = [];
if (messageTraceGraph.producerNode) {
startTime = +messageTraceGraph.producerNode.traceNode.beginTimeStamp;
endTime = +messageTraceGraph.producerNode.traceNode.endTimeStamp;
} else {
messageTraceGraph.subscriptionNodeList.forEach(subscriptionNode => {
subscriptionNode.consumeNodeList.forEach(consumeNode => {
startTime = Math.min(startTime, consumeNode.beginTimeStamp);
})
})
}
ngDialog.open({
template: 'operationResultDialog',
data:{
result:errmsg
function buildNodeColor(traceNode, index) {
let nodeColor = SUCCESS_COLOR;
if (traceNode.transactionState) {
switch (traceNode.transactionState) {
case 'COMMIT_MESSAGE':
return TRANSACTION_COMMIT_COLOR;
case 'ROLLBACK_MESSAGE':
return TRANSACTION_ROLLBACK_COLOR;
case 'UNKNOW':
return TRANSACTION_UNKNOWN_COLOR;
default:
return ERROR_COLOR;
}
}
});
if (traceNode.status !== 'success') {
nodeColor = ERROR_COLOR;
}
if (index === messageGroups.length - 1) {
nodeColor = PRODUCER_COLOR;
}
return nodeColor;
}
function formatXAxisTime(value) {
let duration = Math.max(0, value - startTime);
if (duration < 1000)
return duration + 'ms';
duration /= 1000;
if (duration < 60)
return duration + 's';
duration /= 60;
if (duration < 60)
return duration + 'm';
duration /= 60;
return duration + 'h';
}
function buildTraceInfo(itemName, itemValue) {
if (itemValue) {
return `${itemName}: ${itemValue}<br />`
}
return "";
}
function formatNodeToolTip(params) {
let traceNode = params.data.traceData.traceNode;
return `
costTime: ${traceNode.costTime}ms<br />
status: ${traceNode.status}<br />
beginTimeStamp: ${new moment(traceNode.beginTimeStamp).format(TIME_FORMAT_PATTERN)}<br />
endTimeStamp: ${new moment(traceNode.endTimeStamp).format(TIME_FORMAT_PATTERN)}<br />
clientHost: ${traceNode.clientHost}<br />
storeHost: ${traceNode.storeHost}<br />
retryTimes: ${traceNode.retryTimes}<br />
${buildTraceInfo('msgType', traceNode.msgType)}
${buildTraceInfo('transactionId', traceNode.transactionId)}
${buildTraceInfo('transactionState', traceNode.transactionState)}
${buildTraceInfo('fromTransactionCheck', traceNode.fromTransactionCheck)}
`;
}
function addTraceData(traceNode, index) {
data.push({
value: [
index,
traceNode.beginTimeStamp,
traceNode.endTimeStamp,
traceNode.costTime
],
itemStyle: {
normal: {
color: buildNodeColor(traceNode, index),
opacity: 1
}
},
traceData: {
traceNode: traceNode
}
});
endTime = Math.max(traceNode.endTimeStamp, endTime);
}
messageTraceGraph.subscriptionNodeList.forEach(item => {
messageGroups.push(item.subscriptionGroup)
})
messageTraceGraph.subscriptionNodeList.forEach((subscriptionNode, index) => {
subscriptionNode.consumeNodeList.forEach(traceNode => addTraceData(traceNode, index))
})
if (messageTraceGraph.producerNode) {
messageGroups.push(messageTraceGraph.producerNode.groupName)
let producerNodeIndex = messageGroups.length - 1;
addTraceData(messageTraceGraph.producerNode.traceNode, producerNodeIndex);
messageTraceGraph.producerNode.transactionNodeList.forEach(transactionNode => {
transactionNode.beginTimeStamp = Math.max(messageTraceGraph.producerNode.traceNode.endTimeStamp,
transactionNode.endTimeStamp - transactionCheckCostTime);
addTraceData(transactionNode, producerNodeIndex)
endTime = Math.max(endTime, transactionNode.endTimeStamp);
})
}
let totalDuration = endTime - startTime;
if (totalDuration > DEFAULT_DISPLAY_DURATION) {
dataZoomEnd = DEFAULT_DISPLAY_DURATION / totalDuration * 100
}
function renderItem(params, api) {
let messageGroup = api.value(0);
let start = api.coord([api.value(1), messageGroup]);
let end = api.coord([api.value(2), messageGroup]);
let height = api.size([0, 1])[1] * 0.6;
let rectShape = echarts.graphic.clipRectByRect({
x: start[0],
y: start[1] - height / 2,
width: Math.max(end[0] - start[0], 1),
height: height
}, {
x: params.coordSys.x,
y: params.coordSys.y,
width: params.coordSys.width,
height: params.coordSys.height
});
return rectShape && {
type: 'rect',
transition: ['shape'],
shape: rectShape,
style: api.style({
text: `${api.value(3)}ms`,
textFill: '#fff'
})
};
}
option = {
tooltip: {
formatter: function (params) {
return formatNodeToolTip(params);
}
},
title: {
text: messageTraceGraph.producerNode ? messageTraceGraph.producerNode.topic : "",
left: 'center'
},
dataZoom: [{
type: 'slider',
filterMode: 'weakFilter',
showDataShadow: false,
top: 400,
start: 0,
end: dataZoomEnd,
labelFormatter: ''
}, {
type: 'inside',
filterMode: 'weakFilter'
}
],
grid: {
height: 300
},
xAxis: {
min: startTime,
scale: true,
axisLabel: {
formatter: function (value) {
return formatXAxisTime(value)
}
}
},
yAxis: {
data: messageGroups
},
series: [{
type: 'custom',
renderItem: renderItem,
itemStyle: {
opacity: 0.8
},
encode: {
x: [1, 2],
y: 0
},
data: data
}]
};
$scope.messageTraceGraph.setOption(option);
}
$scope.showGraph = function () {
$scope.displayGraph = !$scope.displayGraph;
if ($scope.displayGraph) {
$scope.graphButtonName = 'Hide Graph';
$scope.displayMessageTraceGraph($scope.ngDialogData);
} else {
$scope.messageTraceGraph.dispose();
$scope.graphButtonName = 'Show Graph';
}
console.log("here is my data", $scope.ngDialogData)
};
function initGraph() {
$timeout(function () {
if (document.getElementById('messageTraceGraph') == null) {
initGraph();
} else {
$scope.showGraph();
}
}, 50);
}
initGraph();
}]
);

File diff suppressed because one or more lines are too long

View File

@@ -131,7 +131,13 @@
<script type="text/ng-template" id="messageTraceDetailViewDialog">
<md-content class="md-padding">
<div>
<div class="row" >
<button class="ngdialog-button ngdialog-button-primary" type="button"
ng-click="showGraph()">{{graphButtonName}}
</button>
</div>
<div class="row" ng-hide="!displayGraph" id="messageTraceGraph" style="height: 500px; width: 1024px"></div>
<div class="row">
<table class="table table-bordered">
<tr>
<th class="text-center">Message ID</th>
@@ -143,8 +149,10 @@
<th class="text-center">costTime</th>
<th class="text-center">status</th>
<th class="text-center">traceType</th>
<th class="text-center">msgType</th>
<th class="text-center">transactionState</th>
</tr>
<tr ng-repeat="item in ngDialogData">
<tr ng-repeat="item in ngDialogData.messageTraceViews">
<td class="text-center">{{item.msgId}}</td>
<td class="text-center">{{item.tags}}</td>
<td class="text-center">{{item.keys}}</td>
@@ -153,7 +161,9 @@
<td class="text-center">{{item.clientHost}}</td>
<td class="text-center">{{item.costTime}}ms</td>
<td class="text-center">{{item.status}}</td>
<th class="text-center">{{item.traceType}}</th>
<th class="text-center">{{item.msgType}}</th>
<th class="text-center">{{item.transactionState}}</th>
</tr>
</table>
</div>

View File

@@ -380,7 +380,7 @@ public class TopicControllerTest extends BaseControllerTest {
queueData.setPerm(6);
queueData.setReadQueueNums(4);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;

View File

@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.service.impl;
import lombok.SneakyThrows;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.model.MessageTraceView;
import org.apache.rocketmq.console.model.trace.MessageTraceGraph;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
public class MessageTraceServiceImplTest {
@InjectMocks
private MessageTraceServiceImpl messageTraceService;
@Mock
private MQAdminExt mqAdminExt;
@Mock
private RMQConfigure rmqConfigure;
private static final String TEST_MESSAGE_ID = "7F0000016D48512DDF172E788E5E0000";
private static final String FAKE_SUBSCRIPTION_GROUP = "CID_JODIE";
private static final String TEST_KEY = "TEST_KEY";
private static final String PUB_TRACE = "Pub\u00011625848452706\u0001DefaultRegion\u0001sendGroup\u0001" +
"TopicTraceTest\u0001" + TEST_MESSAGE_ID + "\u0001TagA\u0001OrderID188\u0001" +
"192.168.0.101:10911\u000111\u000117\u00010\u0001C0A8006500002A9F0000000000003866\u0001true\u0002";
private static final String SUB_TRACE1 = "SubBefore\u00011625848452722\u0001null\u0001" + FAKE_SUBSCRIPTION_GROUP +
"\u00017F0000016801512DDF172E788E720014\u0001" + TEST_MESSAGE_ID + "\u00010\u0001OrderID188\u0002";
private static final String SUB_TRACE2 = "SubAfter\u00017F0000016801512DDF172E788E720014\u0001" + TEST_MESSAGE_ID +
"\u000140\u0001true\u0001OrderID188\u00010\u0002";
private static final String END_TRANSACTION_TRACE = "EndTransaction\u00011625913838389\u0001DefaultRegion\u0001" +
FAKE_SUBSCRIPTION_GROUP + "\u0001TopicTraceTest\u0001" + TEST_MESSAGE_ID +
"\u0001TagA\u0001OrderID188\u0001192.168.0.101:10911\u00012\u00017F000001ACFE512DDF17325DBAEA0000" +
"\u0001UNKNOW\u0001true\u0002";
private MessageExt fakeMessageExt;
private MessageExt fakeMessageExt2;
private MessageExt fakeMessageExt3;
private MessageExt fakeMessageExt4;
@BeforeEach
public void init() {
MockitoAnnotations.initMocks(this);
Mockito.when(rmqConfigure.getMsgTrackTopicName()).thenReturn(null);
fakeMessageExt = new MessageExt();
fakeMessageExt.setKeys(Lists.newArrayList(TEST_KEY));
fakeMessageExt.setBody(PUB_TRACE.getBytes(StandardCharsets.UTF_8));
fakeMessageExt2 = new MessageExt();
fakeMessageExt2.setKeys(Lists.newArrayList(TEST_KEY));
fakeMessageExt2.setBody(SUB_TRACE1.getBytes(StandardCharsets.UTF_8));
fakeMessageExt3 = new MessageExt();
fakeMessageExt3.setKeys(Lists.newArrayList(TEST_KEY));
fakeMessageExt3.setBody(SUB_TRACE2.getBytes(StandardCharsets.UTF_8));
fakeMessageExt4 = new MessageExt();
fakeMessageExt4.setKeys(Lists.newArrayList(TEST_KEY));
fakeMessageExt4.setBody(END_TRANSACTION_TRACE.getBytes(StandardCharsets.UTF_8));
}
@Test
@SneakyThrows
public void queryMessageTraceKeyTest() {
List<MessageExt> messageTraceList = Lists.newArrayList(fakeMessageExt);
QueryResult queryResult = new QueryResult(1, messageTraceList);
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(queryResult);
List<MessageTraceView> views = messageTraceService.queryMessageTraceKey(TEST_MESSAGE_ID);
Assertions.assertEquals(1, views.size());
}
@Test
@SneakyThrows
public void queryMessageTraceKeyWithExceptionTest() {
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenThrow(new RuntimeException());
Assertions.assertThrows(RuntimeException.class, () -> messageTraceService.queryMessageTraceKey(TEST_MESSAGE_ID));
}
@Test
@SneakyThrows
public void queryMessageTraceWithNoResultTest() {
QueryResult queryResult = new QueryResult(1, Collections.emptyList());
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(queryResult);
MessageTraceGraph messageTraceGraph = messageTraceService.queryMessageTraceGraph(TEST_MESSAGE_ID);
Assertions.assertNotNull(messageTraceGraph);
Assertions.assertEquals(0, messageTraceGraph.getMessageTraceViews().size());
}
@Test
@SneakyThrows
public void queryMessageTraceTest() {
List<MessageExt> messageTraceList = Lists.newArrayList(fakeMessageExt, fakeMessageExt2, fakeMessageExt3, fakeMessageExt4);
QueryResult queryResult = new QueryResult(1, messageTraceList);
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(queryResult);
MessageTraceGraph messageTraceGraph = messageTraceService.queryMessageTraceGraph(TEST_MESSAGE_ID);
Assertions.assertNotNull(messageTraceGraph);
Assertions.assertEquals(TEST_MESSAGE_ID, messageTraceGraph.getProducerNode().getMsgId());
Assertions.assertEquals(1, messageTraceGraph.getSubscriptionNodeList().size());
Assertions.assertEquals(FAKE_SUBSCRIPTION_GROUP, messageTraceGraph.getSubscriptionNodeList()
.get(0).getSubscriptionGroup());
Assertions.assertEquals(4, messageTraceGraph.getMessageTraceViews().size());
Assertions.assertEquals(1, messageTraceGraph.getProducerNode().getTransactionNodeList().size());
Assertions.assertEquals(LocalTransactionState.UNKNOW.name(),
messageTraceGraph.getProducerNode().getTransactionNodeList().get(0).getTransactionState());
for (MessageTraceView view : messageTraceGraph.getMessageTraceViews()) {
Assertions.assertEquals(0, view.getRetryTimes());
}
}
}