mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2025-09-10 11:40:01 +08:00
[ISSUE #768] Console message trace compatibility Optimization
This commit is contained in:
@@ -55,6 +55,9 @@ public class MessageTraceServiceImpl implements MessageTraceService {
|
||||
private Logger logger = LoggerFactory.getLogger(MessageTraceServiceImpl.class);
|
||||
|
||||
private final static int QUERY_MESSAGE_MAX_NUM = 64;
|
||||
|
||||
private final static String UNKNOWN_GROUP_NAME = "%UNKNOWN_GROUP%";
|
||||
private final static int MESSAGE_TRACE_MISSING_VALUE = -1;
|
||||
@Resource
|
||||
private MQAdminExt mqAdminExt;
|
||||
|
||||
@@ -127,7 +130,7 @@ public class MessageTraceServiceImpl implements MessageTraceService {
|
||||
|
||||
private TraceNode buildTransactionNode(MessageTraceView messageTraceView) {
|
||||
TraceNode transactionNode = buildTraceNode(messageTraceView);
|
||||
transactionNode.setCostTime(-1);
|
||||
transactionNode.setCostTime(MESSAGE_TRACE_MISSING_VALUE);
|
||||
return transactionNode;
|
||||
}
|
||||
|
||||
@@ -135,21 +138,9 @@ public class MessageTraceServiceImpl implements MessageTraceService {
|
||||
Map<String, Pair<MessageTraceView, MessageTraceView>> requestIdTracePairMap) {
|
||||
Map<String, List<TraceNode>> subscriptionTraceNodeMap = Maps.newHashMap();
|
||||
for (Pair<MessageTraceView, MessageTraceView> traceNodePair : requestIdTracePairMap.values()) {
|
||||
traceNodePair = makeBeforeOrAfterMissCompatible(traceNodePair);
|
||||
MessageTraceView subBeforeTrace = traceNodePair.getObject1();
|
||||
MessageTraceView subAfterTrace = traceNodePair.getObject2();
|
||||
List<TraceNode> traceNodeList = subscriptionTraceNodeMap.computeIfAbsent(subBeforeTrace.getGroupName(),
|
||||
(o) -> Lists.newArrayList());
|
||||
TraceNode consumeNode = new TraceNode();
|
||||
consumeNode.setRequestId(subBeforeTrace.getRequestId());
|
||||
consumeNode.setStoreHost(subBeforeTrace.getStoreHost());
|
||||
consumeNode.setClientHost(subBeforeTrace.getClientHost());
|
||||
consumeNode.setRetryTimes(subBeforeTrace.getRetryTimes());
|
||||
consumeNode.setBeginTimestamp(subBeforeTrace.getTimeStamp());
|
||||
consumeNode.setCostTime(subAfterTrace.getCostTime());
|
||||
consumeNode.setEndTimestamp(subBeforeTrace.getTimeStamp() + Math.max(0, subAfterTrace.getCostTime()));
|
||||
consumeNode.setStatus(subAfterTrace.getStatus());
|
||||
traceNodeList.add(consumeNode);
|
||||
List<TraceNode> traceNodeList = subscriptionTraceNodeMap
|
||||
.computeIfAbsent(buildGroupName(traceNodePair), (o) -> Lists.newArrayList());
|
||||
traceNodeList.add(buildConsumeMessageTraceNode(traceNodePair));
|
||||
}
|
||||
return subscriptionTraceNodeMap.entrySet().stream()
|
||||
.map((Function<Map.Entry<String, List<TraceNode>>, SubscriptionNode>) subscriptionEntry -> {
|
||||
@@ -161,23 +152,57 @@ public class MessageTraceServiceImpl implements MessageTraceService {
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Pair<MessageTraceView, MessageTraceView> makeBeforeOrAfterMissCompatible(Pair<MessageTraceView, MessageTraceView> traceNodePair) {
|
||||
if (traceNodePair.getObject1() != null && traceNodePair.getObject2() != null) {
|
||||
return traceNodePair;
|
||||
private <E> E getTraceValue(Pair<MessageTraceView, MessageTraceView> traceNodePair, Function<MessageTraceView, E> function) {
|
||||
if (traceNodePair.getObject1() != null) {
|
||||
return function.apply(traceNodePair.getObject1());
|
||||
}
|
||||
MessageTraceView subBeforeTrace = traceNodePair.getObject1();
|
||||
MessageTraceView subAfterTrace = traceNodePair.getObject2();
|
||||
if (subBeforeTrace == null) {
|
||||
subBeforeTrace = new MessageTraceView();
|
||||
BeanUtils.copyProperties(subAfterTrace, subBeforeTrace);
|
||||
return function.apply(traceNodePair.getObject2());
|
||||
}
|
||||
|
||||
private String buildGroupName(Pair<MessageTraceView, MessageTraceView> traceNodePair) {
|
||||
String groupName = getTraceValue(traceNodePair, MessageTraceView::getGroupName);
|
||||
if (StringUtils.isNoneBlank(groupName)) {
|
||||
return groupName;
|
||||
}
|
||||
if (subAfterTrace == null) {
|
||||
subAfterTrace = new MessageTraceView();
|
||||
BeanUtils.copyProperties(subBeforeTrace, subAfterTrace);
|
||||
subAfterTrace.setStatus(MessageTraceStatusEnum.UNKNOWN.getStatus());
|
||||
subAfterTrace.setCostTime(-1);
|
||||
return UNKNOWN_GROUP_NAME;
|
||||
}
|
||||
|
||||
private TraceNode buildConsumeMessageTraceNode(Pair<MessageTraceView, MessageTraceView> pair) {
|
||||
MessageTraceView subBeforeTrace = pair.getObject1();
|
||||
MessageTraceView subAfterTrace = pair.getObject2();
|
||||
TraceNode consumeNode = new TraceNode();
|
||||
consumeNode.setRequestId(getTraceValue(pair, MessageTraceView::getRequestId));
|
||||
consumeNode.setStoreHost(getTraceValue(pair, MessageTraceView::getStoreHost));
|
||||
consumeNode.setClientHost(getTraceValue(pair, MessageTraceView::getClientHost));
|
||||
if (subBeforeTrace != null) {
|
||||
consumeNode.setRetryTimes(subBeforeTrace.getRetryTimes());
|
||||
consumeNode.setBeginTimestamp(subBeforeTrace.getTimeStamp());
|
||||
} else {
|
||||
consumeNode.setRetryTimes(MESSAGE_TRACE_MISSING_VALUE);
|
||||
consumeNode.setBeginTimestamp(MESSAGE_TRACE_MISSING_VALUE);
|
||||
}
|
||||
return new Pair<>(subBeforeTrace, subAfterTrace);
|
||||
if (subAfterTrace != null) {
|
||||
consumeNode.setCostTime(subAfterTrace.getCostTime());
|
||||
consumeNode.setStatus(subAfterTrace.getStatus());
|
||||
if (subAfterTrace.getTimeStamp() > 0) {
|
||||
consumeNode.setEndTimestamp(subAfterTrace.getTimeStamp());
|
||||
} else {
|
||||
if (subBeforeTrace != null) {
|
||||
if (subAfterTrace.getCostTime() >= 0) {
|
||||
consumeNode.setEndTimestamp(subBeforeTrace.getTimeStamp() + subAfterTrace.getCostTime());
|
||||
} else {
|
||||
consumeNode.setEndTimestamp(subBeforeTrace.getTimeStamp());
|
||||
}
|
||||
} else {
|
||||
consumeNode.setEndTimestamp(MESSAGE_TRACE_MISSING_VALUE);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
consumeNode.setCostTime(MESSAGE_TRACE_MISSING_VALUE);
|
||||
consumeNode.setEndTimestamp(MESSAGE_TRACE_MISSING_VALUE);
|
||||
consumeNode.setStatus(MessageTraceStatusEnum.UNKNOWN.getStatus());
|
||||
}
|
||||
return consumeNode;
|
||||
}
|
||||
|
||||
private void putIntoMessageTraceViewGroupMap(MessageTraceView messageTraceView,
|
||||
|
@@ -52,7 +52,7 @@ public class MsgTraceDecodeUtil {
|
||||
for (String context : contextList) {
|
||||
String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
|
||||
if (line[0].equals(Pub.name())) {
|
||||
TraceContext pubContext = new TraceContext();
|
||||
TraceContext pubContext = initTraceContext();
|
||||
pubContext.setTraceType(Pub);
|
||||
pubContext.setTimeStamp(Long.parseLong(line[1]));
|
||||
pubContext.setRegionId(line[2]);
|
||||
@@ -94,7 +94,7 @@ public class MsgTraceDecodeUtil {
|
||||
pubContext.getTraceBeans().add(bean);
|
||||
resList.add(pubContext);
|
||||
} else if (line[0].equals(TraceType.SubBefore.name())) {
|
||||
TraceContext subBeforeContext = new TraceContext();
|
||||
TraceContext subBeforeContext = initTraceContext();
|
||||
subBeforeContext.setTraceType(TraceType.SubBefore);
|
||||
subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
|
||||
subBeforeContext.setRegionId(line[2]);
|
||||
@@ -108,7 +108,7 @@ public class MsgTraceDecodeUtil {
|
||||
subBeforeContext.getTraceBeans().add(bean);
|
||||
resList.add(subBeforeContext);
|
||||
} else if (line[0].equals(TraceType.SubAfter.name())) {
|
||||
TraceContext subAfterContext = new TraceContext();
|
||||
TraceContext subAfterContext = initTraceContext();
|
||||
subAfterContext.setTraceType(TraceType.SubAfter);
|
||||
subAfterContext.setRequestId(line[1]);
|
||||
TraceBean bean = new TraceBean();
|
||||
@@ -128,16 +128,18 @@ public class MsgTraceDecodeUtil {
|
||||
case TRACE_MSG_SUBAFTER_V3_LEN:
|
||||
subAfterContext.setContextCode(Integer.parseInt(line[6]));
|
||||
subAfterContext.setTimeStamp(Long.parseLong(line[7]));
|
||||
subAfterContext.setGroupName(line[8]);
|
||||
break;
|
||||
default:
|
||||
subAfterContext.setContextCode(Integer.parseInt(line[6]));
|
||||
subAfterContext.setTimeStamp(Long.parseLong(line[7]));
|
||||
subAfterContext.setGroupName(line[8]);
|
||||
log.warn("Detect new version trace msg of {} type", TraceType.SubAfter.name());
|
||||
break;
|
||||
}
|
||||
resList.add(subAfterContext);
|
||||
} else if (line[0].equals(TraceType.EndTransaction.name())) {
|
||||
TraceContext endTransactionContext = new TraceContext();
|
||||
TraceContext endTransactionContext = initTraceContext();
|
||||
endTransactionContext.setTraceType(TraceType.EndTransaction);
|
||||
endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
|
||||
endTransactionContext.setRegionId(line[2]);
|
||||
@@ -159,4 +161,12 @@ public class MsgTraceDecodeUtil {
|
||||
}
|
||||
return resList;
|
||||
}
|
||||
|
||||
private static TraceContext initTraceContext() {
|
||||
TraceContext traceContext = new TraceContext();
|
||||
traceContext.setTimeStamp(0L);
|
||||
traceContext.setCostTime(-1);
|
||||
traceContext.setRequestId(null);
|
||||
return traceContext;
|
||||
}
|
||||
}
|
||||
|
@@ -129,12 +129,6 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout
|
||||
if (messageTraceGraph.producerNode) {
|
||||
startTime = +messageTraceGraph.producerNode.traceNode.beginTimestamp;
|
||||
endTime = +messageTraceGraph.producerNode.traceNode.endTimestamp;
|
||||
} else {
|
||||
messageTraceGraph.subscriptionNodeList.forEach(subscriptionNode => {
|
||||
subscriptionNode.consumeNodeList.forEach(consumeNode => {
|
||||
startTime = Math.min(startTime, consumeNode.beginTimestamp);
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function buildNodeColor(traceNode) {
|
||||
@@ -203,17 +197,23 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout
|
||||
}
|
||||
return `costTime: ${formatCostTimeStr(costTime)}<br/>`
|
||||
}
|
||||
function buildTimeStamp(timestamp){
|
||||
if(timestamp < 0){
|
||||
return 'N/A';
|
||||
}
|
||||
return new moment(timestamp).format(TIME_FORMAT_PATTERN);
|
||||
}
|
||||
|
||||
function formatNodeToolTip(params) {
|
||||
let traceNode = params.data.traceData.traceNode;
|
||||
return `
|
||||
${buildCostTimeInfo(traceNode.costTime)}
|
||||
status: ${traceNode.status}<br />
|
||||
beginTimestamp: ${new moment(traceNode.beginTimestamp).format(TIME_FORMAT_PATTERN)}<br />
|
||||
endTimestamp: ${new moment(traceNode.endTimestamp).format(TIME_FORMAT_PATTERN)}<br />
|
||||
${buildTraceInfo('beginTimestamp', buildTimeStamp(traceNode.beginTimestamp))}
|
||||
${buildTraceInfo('endTimestamp', buildTimeStamp(traceNode.endTimestamp))}
|
||||
clientHost: ${traceNode.clientHost}<br />
|
||||
storeHost: ${traceNode.storeHost}<br />
|
||||
retryTimes: ${traceNode.retryTimes}<br />
|
||||
retryTimes: ${traceNode.retryTimes < 0 ? 'N/A' : traceNode.retryTimes}<br />
|
||||
${buildTraceInfo('msgType', traceNode.msgType)}
|
||||
${buildTraceInfo('transactionId', traceNode.transactionId)}
|
||||
${buildTraceInfo('transactionState', traceNode.transactionState)}
|
||||
@@ -221,12 +221,31 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout
|
||||
`;
|
||||
}
|
||||
|
||||
function calcGraphTimestamp(timestamp, relativeTimeStamp, duration, addDuration) {
|
||||
if (timestamp > 0) {
|
||||
return timestamp;
|
||||
}
|
||||
if (duration < 0) {
|
||||
return relativeTimeStamp;
|
||||
}
|
||||
return addDuration ? relativeTimeStamp + duration : relativeTimeStamp - duration;
|
||||
}
|
||||
|
||||
function addTraceData(traceNode, index) {
|
||||
if (traceNode.beginTimestamp < 0 && traceNode.endTimestamp < 0) {
|
||||
return;
|
||||
}
|
||||
let beginTimestamp = calcGraphTimestamp(traceNode.beginTimestamp, traceNode.endTimestamp, traceNode.costTime, false);
|
||||
let endTimestamp = calcGraphTimestamp(traceNode.endTimestamp, traceNode.beginTimestamp, traceNode.costTime, true);
|
||||
if (endTimestamp === beginTimestamp) {
|
||||
endTimestamp = beginTimestamp + 1;
|
||||
}
|
||||
console.log("beginTimestamp",beginTimestamp,'endTimestamp',endTimestamp);
|
||||
data.push({
|
||||
value: [
|
||||
index,
|
||||
traceNode.beginTimestamp,
|
||||
traceNode.endTimestamp === traceNode.beginTimestamp ? traceNode.beginTimestamp + 1 : traceNode.endTimestamp,
|
||||
beginTimestamp,
|
||||
endTimestamp,
|
||||
traceNode.costTime
|
||||
],
|
||||
itemStyle: {
|
||||
@@ -239,7 +258,8 @@ module.controller('messageTraceDetailViewDialogController', ['$scope', '$timeout
|
||||
traceNode: traceNode
|
||||
}
|
||||
});
|
||||
endTime = Math.max(traceNode.endTimestamp, endTime);
|
||||
startTime = Math.min(startTime, beginTimestamp);
|
||||
endTime = Math.max(endTime, endTimestamp);
|
||||
}
|
||||
|
||||
messageTraceGraph.subscriptionNodeList.forEach(item => {
|
||||
|
@@ -273,16 +273,20 @@
|
||||
</tr>
|
||||
<tr ng-repeat="consumeNode in subscriptionNode.consumeNodeList">
|
||||
<td class="text-center">
|
||||
{{consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss'}}
|
||||
{{consumeNode.beginTimestamp < 0 ? 'N/A' :
|
||||
(consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
|
||||
</td>
|
||||
<td class="text-center">
|
||||
{{consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss'}}
|
||||
{{consumeNode.endTimestamp < 0 ? 'N/A' :
|
||||
(consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
|
||||
</td>
|
||||
<td class="text-center">{{consumeNode.costTime < 0 ? '--' :
|
||||
<td class="text-center">{{consumeNode.costTime < 0 ? 'N/A' :
|
||||
((consumeNode.costTime === 0 ? '<1' : consumeNode.costTime) + 'ms')}}
|
||||
</td>
|
||||
<td class="text-center">{{consumeNode.status}}</td>
|
||||
<td class="text-center">{{consumeNode.retryTimes}}</td>
|
||||
<td class="text-center">
|
||||
{{consumeNode.retryTimes < 0 ? 'N/A' : consumeNode.retryTimes}}
|
||||
</td>
|
||||
<td class="text-center">{{consumeNode.clientHost}}</td>
|
||||
<td class="text-center">{{consumeNode.storeHost}}</td>
|
||||
</tr>
|
||||
|
Reference in New Issue
Block a user