diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java index 5e2debd..fabe9cd 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/MessageTraceServiceImpl.java @@ -55,6 +55,9 @@ public class MessageTraceServiceImpl implements MessageTraceService { private Logger logger = LoggerFactory.getLogger(MessageTraceServiceImpl.class); private final static int QUERY_MESSAGE_MAX_NUM = 64; + + private final static String UNKNOWN_GROUP_NAME = "%UNKNOWN_GROUP%"; + private final static int MESSAGE_TRACE_MISSING_VALUE = -1; @Resource private MQAdminExt mqAdminExt; @@ -127,7 +130,7 @@ public class MessageTraceServiceImpl implements MessageTraceService { private TraceNode buildTransactionNode(MessageTraceView messageTraceView) { TraceNode transactionNode = buildTraceNode(messageTraceView); - transactionNode.setCostTime(-1); + transactionNode.setCostTime(MESSAGE_TRACE_MISSING_VALUE); return transactionNode; } @@ -135,21 +138,9 @@ public class MessageTraceServiceImpl implements MessageTraceService { Map> requestIdTracePairMap) { Map> subscriptionTraceNodeMap = Maps.newHashMap(); for (Pair traceNodePair : requestIdTracePairMap.values()) { - traceNodePair = makeBeforeOrAfterMissCompatible(traceNodePair); - MessageTraceView subBeforeTrace = traceNodePair.getObject1(); - MessageTraceView subAfterTrace = traceNodePair.getObject2(); - List traceNodeList = subscriptionTraceNodeMap.computeIfAbsent(subBeforeTrace.getGroupName(), - (o) -> Lists.newArrayList()); - TraceNode consumeNode = new TraceNode(); - consumeNode.setRequestId(subBeforeTrace.getRequestId()); - consumeNode.setStoreHost(subBeforeTrace.getStoreHost()); - consumeNode.setClientHost(subBeforeTrace.getClientHost()); - consumeNode.setRetryTimes(subBeforeTrace.getRetryTimes()); - consumeNode.setBeginTimestamp(subBeforeTrace.getTimeStamp()); - consumeNode.setCostTime(subAfterTrace.getCostTime()); - consumeNode.setEndTimestamp(subBeforeTrace.getTimeStamp() + Math.max(0, subAfterTrace.getCostTime())); - consumeNode.setStatus(subAfterTrace.getStatus()); - traceNodeList.add(consumeNode); + List traceNodeList = subscriptionTraceNodeMap + .computeIfAbsent(buildGroupName(traceNodePair), (o) -> Lists.newArrayList()); + traceNodeList.add(buildConsumeMessageTraceNode(traceNodePair)); } return subscriptionTraceNodeMap.entrySet().stream() .map((Function>, SubscriptionNode>) subscriptionEntry -> { @@ -161,23 +152,57 @@ public class MessageTraceServiceImpl implements MessageTraceService { }).collect(Collectors.toList()); } - private Pair makeBeforeOrAfterMissCompatible(Pair traceNodePair) { - if (traceNodePair.getObject1() != null && traceNodePair.getObject2() != null) { - return traceNodePair; + private E getTraceValue(Pair traceNodePair, Function function) { + if (traceNodePair.getObject1() != null) { + return function.apply(traceNodePair.getObject1()); } - MessageTraceView subBeforeTrace = traceNodePair.getObject1(); - MessageTraceView subAfterTrace = traceNodePair.getObject2(); - if (subBeforeTrace == null) { - subBeforeTrace = new MessageTraceView(); - BeanUtils.copyProperties(subAfterTrace, subBeforeTrace); + return function.apply(traceNodePair.getObject2()); + } + + private String buildGroupName(Pair traceNodePair) { + String groupName = getTraceValue(traceNodePair, MessageTraceView::getGroupName); + if (StringUtils.isNoneBlank(groupName)) { + return groupName; } - if (subAfterTrace == null) { - subAfterTrace = new MessageTraceView(); - BeanUtils.copyProperties(subBeforeTrace, subAfterTrace); - subAfterTrace.setStatus(MessageTraceStatusEnum.UNKNOWN.getStatus()); - subAfterTrace.setCostTime(-1); + return UNKNOWN_GROUP_NAME; + } + + private TraceNode buildConsumeMessageTraceNode(Pair pair) { + MessageTraceView subBeforeTrace = pair.getObject1(); + MessageTraceView subAfterTrace = pair.getObject2(); + TraceNode consumeNode = new TraceNode(); + consumeNode.setRequestId(getTraceValue(pair, MessageTraceView::getRequestId)); + consumeNode.setStoreHost(getTraceValue(pair, MessageTraceView::getStoreHost)); + consumeNode.setClientHost(getTraceValue(pair, MessageTraceView::getClientHost)); + if (subBeforeTrace != null) { + consumeNode.setRetryTimes(subBeforeTrace.getRetryTimes()); + consumeNode.setBeginTimestamp(subBeforeTrace.getTimeStamp()); + } else { + consumeNode.setRetryTimes(MESSAGE_TRACE_MISSING_VALUE); + consumeNode.setBeginTimestamp(MESSAGE_TRACE_MISSING_VALUE); } - return new Pair<>(subBeforeTrace, subAfterTrace); + if (subAfterTrace != null) { + consumeNode.setCostTime(subAfterTrace.getCostTime()); + consumeNode.setStatus(subAfterTrace.getStatus()); + if (subAfterTrace.getTimeStamp() > 0) { + consumeNode.setEndTimestamp(subAfterTrace.getTimeStamp()); + } else { + if (subBeforeTrace != null) { + if (subAfterTrace.getCostTime() >= 0) { + consumeNode.setEndTimestamp(subBeforeTrace.getTimeStamp() + subAfterTrace.getCostTime()); + } else { + consumeNode.setEndTimestamp(subBeforeTrace.getTimeStamp()); + } + } else { + consumeNode.setEndTimestamp(MESSAGE_TRACE_MISSING_VALUE); + } + } + } else { + consumeNode.setCostTime(MESSAGE_TRACE_MISSING_VALUE); + consumeNode.setEndTimestamp(MESSAGE_TRACE_MISSING_VALUE); + consumeNode.setStatus(MessageTraceStatusEnum.UNKNOWN.getStatus()); + } + return consumeNode; } private void putIntoMessageTraceViewGroupMap(MessageTraceView messageTraceView, diff --git a/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java b/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java index 1efabda..e6105e9 100644 --- a/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java +++ b/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java @@ -52,7 +52,7 @@ public class MsgTraceDecodeUtil { for (String context : contextList) { String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); if (line[0].equals(Pub.name())) { - TraceContext pubContext = new TraceContext(); + TraceContext pubContext = initTraceContext(); pubContext.setTraceType(Pub); pubContext.setTimeStamp(Long.parseLong(line[1])); pubContext.setRegionId(line[2]); @@ -94,7 +94,7 @@ public class MsgTraceDecodeUtil { pubContext.getTraceBeans().add(bean); resList.add(pubContext); } else if (line[0].equals(TraceType.SubBefore.name())) { - TraceContext subBeforeContext = new TraceContext(); + TraceContext subBeforeContext = initTraceContext(); subBeforeContext.setTraceType(TraceType.SubBefore); subBeforeContext.setTimeStamp(Long.parseLong(line[1])); subBeforeContext.setRegionId(line[2]); @@ -108,7 +108,7 @@ public class MsgTraceDecodeUtil { subBeforeContext.getTraceBeans().add(bean); resList.add(subBeforeContext); } else if (line[0].equals(TraceType.SubAfter.name())) { - TraceContext subAfterContext = new TraceContext(); + TraceContext subAfterContext = initTraceContext(); subAfterContext.setTraceType(TraceType.SubAfter); subAfterContext.setRequestId(line[1]); TraceBean bean = new TraceBean(); @@ -128,16 +128,18 @@ public class MsgTraceDecodeUtil { case TRACE_MSG_SUBAFTER_V3_LEN: subAfterContext.setContextCode(Integer.parseInt(line[6])); subAfterContext.setTimeStamp(Long.parseLong(line[7])); + subAfterContext.setGroupName(line[8]); break; default: subAfterContext.setContextCode(Integer.parseInt(line[6])); subAfterContext.setTimeStamp(Long.parseLong(line[7])); + subAfterContext.setGroupName(line[8]); log.warn("Detect new version trace msg of {} type", TraceType.SubAfter.name()); break; } resList.add(subAfterContext); } else if (line[0].equals(TraceType.EndTransaction.name())) { - TraceContext endTransactionContext = new TraceContext(); + TraceContext endTransactionContext = initTraceContext(); endTransactionContext.setTraceType(TraceType.EndTransaction); endTransactionContext.setTimeStamp(Long.parseLong(line[1])); endTransactionContext.setRegionId(line[2]); @@ -159,4 +161,12 @@ public class MsgTraceDecodeUtil { } return resList; } + + private static TraceContext initTraceContext() { + TraceContext traceContext = new TraceContext(); + traceContext.setTimeStamp(0L); + traceContext.setCostTime(-1); + traceContext.setRequestId(null); + return traceContext; + } } diff --git a/src/main/resources/static/src/messageTrace.js b/src/main/resources/static/src/messageTrace.js index 56e5088..a0fd7e8 100644 --- a/src/main/resources/static/src/messageTrace.js +++ b/src/main/resources/static/src/messageTrace.js @@ -129,12 +129,6 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout if (messageTraceGraph.producerNode) { startTime = +messageTraceGraph.producerNode.traceNode.beginTimestamp; endTime = +messageTraceGraph.producerNode.traceNode.endTimestamp; - } else { - messageTraceGraph.subscriptionNodeList.forEach(subscriptionNode => { - subscriptionNode.consumeNodeList.forEach(consumeNode => { - startTime = Math.min(startTime, consumeNode.beginTimestamp); - }) - }) } function buildNodeColor(traceNode) { @@ -203,17 +197,23 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout } return `costTime: ${formatCostTimeStr(costTime)}
` } + function buildTimeStamp(timestamp){ + if(timestamp < 0){ + return 'N/A'; + } + return new moment(timestamp).format(TIME_FORMAT_PATTERN); + } function formatNodeToolTip(params) { let traceNode = params.data.traceData.traceNode; return ` ${buildCostTimeInfo(traceNode.costTime)} status: ${traceNode.status}
- beginTimestamp: ${new moment(traceNode.beginTimestamp).format(TIME_FORMAT_PATTERN)}
- endTimestamp: ${new moment(traceNode.endTimestamp).format(TIME_FORMAT_PATTERN)}
+ ${buildTraceInfo('beginTimestamp', buildTimeStamp(traceNode.beginTimestamp))} + ${buildTraceInfo('endTimestamp', buildTimeStamp(traceNode.endTimestamp))} clientHost: ${traceNode.clientHost}
storeHost: ${traceNode.storeHost}
- retryTimes: ${traceNode.retryTimes}
+ retryTimes: ${traceNode.retryTimes < 0 ? 'N/A' : traceNode.retryTimes}
${buildTraceInfo('msgType', traceNode.msgType)} ${buildTraceInfo('transactionId', traceNode.transactionId)} ${buildTraceInfo('transactionState', traceNode.transactionState)} @@ -221,12 +221,31 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout `; } + function calcGraphTimestamp(timestamp, relativeTimeStamp, duration, addDuration) { + if (timestamp > 0) { + return timestamp; + } + if (duration < 0) { + return relativeTimeStamp; + } + return addDuration ? relativeTimeStamp + duration : relativeTimeStamp - duration; + } + function addTraceData(traceNode, index) { + if (traceNode.beginTimestamp < 0 && traceNode.endTimestamp < 0) { + return; + } + let beginTimestamp = calcGraphTimestamp(traceNode.beginTimestamp, traceNode.endTimestamp, traceNode.costTime, false); + let endTimestamp = calcGraphTimestamp(traceNode.endTimestamp, traceNode.beginTimestamp, traceNode.costTime, true); + if (endTimestamp === beginTimestamp) { + endTimestamp = beginTimestamp + 1; + } + console.log("beginTimestamp",beginTimestamp,'endTimestamp',endTimestamp); data.push({ value: [ index, - traceNode.beginTimestamp, - traceNode.endTimestamp === traceNode.beginTimestamp ? traceNode.beginTimestamp + 1 : traceNode.endTimestamp, + beginTimestamp, + endTimestamp, traceNode.costTime ], itemStyle: { @@ -239,7 +258,8 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout traceNode: traceNode } }); - endTime = Math.max(traceNode.endTimestamp, endTime); + startTime = Math.min(startTime, beginTimestamp); + endTime = Math.max(endTime, endTimestamp); } messageTraceGraph.subscriptionNodeList.forEach(item => { diff --git a/src/main/resources/static/view/pages/messageTrace.html b/src/main/resources/static/view/pages/messageTrace.html index 4e4a754..8ae6d35 100644 --- a/src/main/resources/static/view/pages/messageTrace.html +++ b/src/main/resources/static/view/pages/messageTrace.html @@ -273,16 +273,20 @@ - {{consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss'}} + {{consumeNode.beginTimestamp < 0 ? 'N/A' : + (consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}} - {{consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss'}} + {{consumeNode.endTimestamp < 0 ? 'N/A' : + (consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}} - {{consumeNode.costTime < 0 ? '--' : + {{consumeNode.costTime < 0 ? 'N/A' : ((consumeNode.costTime === 0 ? '<1' : consumeNode.costTime) + 'ms')}} {{consumeNode.status}} - {{consumeNode.retryTimes}} + + {{consumeNode.retryTimes < 0 ? 'N/A' : consumeNode.retryTimes}} + {{consumeNode.clientHost}} {{consumeNode.storeHost}}