diff --git a/src/main/java/org/apache/rocketmq/console/model/MessageTraceView.java b/src/main/java/org/apache/rocketmq/console/model/MessageTraceView.java index 730b201..6febc24 100644 --- a/src/main/java/org/apache/rocketmq/console/model/MessageTraceView.java +++ b/src/main/java/org/apache/rocketmq/console/model/MessageTraceView.java @@ -29,6 +29,7 @@ public class MessageTraceView { private String tags; private String keys; private String storeHost; + private String clientHost; private int costTime; private String msgType; private String offSetMsgId; @@ -37,6 +38,9 @@ public class MessageTraceView { private String groupName; private String status; + public MessageTraceView() { + } + public static List decodeFromTraceTransData(String key,String messageBody) { List messageTraceViewList = new ArrayList(); if (messageBody == null || messageBody.length() <= 0) { @@ -67,6 +71,7 @@ public class MessageTraceView { messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId()); messageTraceView.setTimeStamp(context.getTimeStamp()); messageTraceView.setStoreHost(traceBean.getStoreHost()); + messageTraceView.setClientHost(traceBean.getClientHost()); messageTraceViewList.add(messageTraceView); } return messageTraceViewList; @@ -159,4 +164,12 @@ public class MessageTraceView { public void setStatus(String status) { this.status = status; } + + public String getClientHost() { + return clientHost; + } + + public void setClientHost(String clientHost) { + this.clientHost = clientHost; + } } diff --git a/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java b/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java index e07d1b5..d681f01 100644 --- a/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java +++ b/src/main/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtil.java @@ -25,10 +25,22 @@ import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.common.message.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.rocketmq.client.trace.TraceType.Pub; public class MsgTraceDecodeUtil { + private final static Logger log = LoggerFactory.getLogger(MsgTraceDecodeUtil.class); + + private static final int TRACE_MSG_PUB_V1_LEN = 12; + private static final int TRACE_MSG_PUB_V2_LEN = 13; + private static final int TRACE_MSG_PUB_V3_LEN = 14; + private static final int TRACE_MSG_PUB_V4_LEN = 15; + + private static final int TRACE_MSG_SUBAFTER_V1_LEN = 6; + private static final int TRACE_MSG_SUBAFTER_V2_LEN = 7; + private static final int TRACE_MSG_SUBAFTER_V3_LEN = 9; public static List decoderFromTraceDataString(String traceData) { List resList = new ArrayList(); @@ -53,13 +65,30 @@ public class MsgTraceDecodeUtil { bean.setBodyLength(Integer.parseInt(line[9])); pubContext.setCostTime(Integer.parseInt(line[10])); bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]); - - if (line.length == 13) { - pubContext.setSuccess(Boolean.parseBoolean(line[12])); - } else if (line.length == 14) { - bean.setOffsetMsgId(line[12]); - pubContext.setSuccess(Boolean.parseBoolean(line[13])); + // compatible with different version + switch (line.length) { + case TRACE_MSG_PUB_V1_LEN: + break; + case TRACE_MSG_PUB_V2_LEN: + pubContext.setSuccess(Boolean.parseBoolean(line[12])); + break; + case TRACE_MSG_PUB_V3_LEN: + bean.setOffsetMsgId(line[12]); + pubContext.setSuccess(Boolean.parseBoolean(line[13])); + break; + case TRACE_MSG_PUB_V4_LEN: + bean.setOffsetMsgId(line[12]); + pubContext.setSuccess(Boolean.parseBoolean(line[13])); + bean.setClientHost(line[14]); + break; + default: + bean.setOffsetMsgId(line[12]); + pubContext.setSuccess(Boolean.parseBoolean(line[13])); + bean.setClientHost(line[14]); + log.warn("Detect new version trace msg of {} type", Pub.name()); + break; } + pubContext.setTraceBeans(new ArrayList(1)); pubContext.getTraceBeans().add(bean); resList.add(pubContext); @@ -74,6 +103,7 @@ public class MsgTraceDecodeUtil { bean.setMsgId(line[5]); bean.setRetryTimes(Integer.parseInt(line[6])); bean.setKeys(line[7]); + bean.setClientHost(line[8]); subBeforeContext.setTraceBeans(new ArrayList(1)); subBeforeContext.getTraceBeans().add(bean); resList.add(subBeforeContext); @@ -88,9 +118,24 @@ public class MsgTraceDecodeUtil { subAfterContext.getTraceBeans().add(bean); subAfterContext.setCostTime(Integer.parseInt(line[3])); subAfterContext.setSuccess(Boolean.parseBoolean(line[4])); - if (line.length >= 7) { - // add the context type - subAfterContext.setContextCode(Integer.parseInt(line[6])); + // compatible with different version + switch (line.length) { + case TRACE_MSG_SUBAFTER_V1_LEN: + break; + case TRACE_MSG_SUBAFTER_V2_LEN: + subAfterContext.setContextCode(Integer.parseInt(line[6])); + break; + case TRACE_MSG_SUBAFTER_V3_LEN: + subAfterContext.setContextCode(Integer.parseInt(line[6])); + subAfterContext.setTimeStamp(Long.parseLong(line[7])); + subAfterContext.setGroupName(line[8]); + break; + default: + subAfterContext.setContextCode(Integer.parseInt(line[6])); + subAfterContext.setTimeStamp(Long.parseLong(line[7])); + subAfterContext.setGroupName(line[8]); + log.warn("Detect new version trace msg of {} type", TraceType.SubAfter.name()); + break; } resList.add(subAfterContext); } diff --git a/src/main/resources/static/view/pages/messageTrace.html b/src/main/resources/static/view/pages/messageTrace.html index e3f4dc6..d98e080 100644 --- a/src/main/resources/static/view/pages/messageTrace.html +++ b/src/main/resources/static/view/pages/messageTrace.html @@ -139,6 +139,7 @@ Message Key StoreTime StoreHost + ClientHost costTime status traceType @@ -149,6 +150,7 @@ {{item.keys}} {{item.timeStamp | date:'yyyy-MM-dd HH:mm:ss'}} {{item.storeHost}} + {{item.clientHost}} {{item.costTime}}ms {{item.status}} {{item.msgType}} diff --git a/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java b/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java new file mode 100644 index 0000000..bcdf230 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java @@ -0,0 +1,137 @@ +package org.apache.rocketmq.console.util; + +import java.util.List; +import org.apache.rocketmq.client.trace.TraceConstants; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.common.UtilAll; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MsgTraceDecodeUtilTest { + private StringBuilder pubTraceDataBase; + private StringBuilder subTraceDataBase; + + @Before + public void init() { + pubTraceDataBase = new StringBuilder() + .append("Pub").append(TraceConstants.CONTENT_SPLITOR) + .append("1614663055253").append(TraceConstants.CONTENT_SPLITOR) + .append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR) + .append("DEFAULT_GROUP").append(TraceConstants.CONTENT_SPLITOR) + .append("Trace_test").append(TraceConstants.CONTENT_SPLITOR) + .append("0A741C02622500000000080cc6980189").append(TraceConstants.CONTENT_SPLITOR) + .append(TraceConstants.CONTENT_SPLITOR) + .append("123 456").append(TraceConstants.CONTENT_SPLITOR) + .append("10.10.10.10:30911").append(TraceConstants.CONTENT_SPLITOR) + .append("25").append(TraceConstants.CONTENT_SPLITOR) + .append("1").append(TraceConstants.CONTENT_SPLITOR) + .append("0").append(TraceConstants.CONTENT_SPLITOR); + subTraceDataBase = new StringBuilder() + .append("SubBefore").append(TraceConstants.CONTENT_SPLITOR) + .append("1614666740499").append(TraceConstants.CONTENT_SPLITOR) + .append(TraceConstants.CONTENT_SPLITOR) + .append("test_consumer_group").append(TraceConstants.CONTENT_SPLITOR) + .append("0A741C029C1800000000084501200121").append(TraceConstants.CONTENT_SPLITOR) + .append("0A741C02622500000000080cc698003f").append(TraceConstants.CONTENT_SPLITOR) + .append("2").append(TraceConstants.CONTENT_SPLITOR) + .append("789 ").append(TraceConstants.CONTENT_SPLITOR) + .append("10.10.10.11@39960").append(TraceConstants.CONTENT_SPLITOR) + .append(TraceConstants.FIELD_SPLITOR) + .append("SubAfter").append(TraceConstants.CONTENT_SPLITOR) + .append("0A741C029C1800000000084501200121").append(TraceConstants.CONTENT_SPLITOR) + .append("0A741C02622500000000080cc698003f").append(TraceConstants.CONTENT_SPLITOR) + .append("0").append(TraceConstants.CONTENT_SPLITOR) + .append("false").append(TraceConstants.CONTENT_SPLITOR) + .append("789 ").append(TraceConstants.CONTENT_SPLITOR); + } + + @Test + public void testDecodePubTraceMessage() { + String pubTraceData_V1 = new String(pubTraceDataBase); + List traceContextListV1 = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_V1); + Assert.assertEquals(traceContextListV1.size(), 1); + Assert.assertEquals(traceContextListV1.get(0).getTraceType().toString(), "Pub"); + Assert.assertEquals(traceContextListV1.get(0).isSuccess(), true); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc6980189"); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getOffsetMsgId(), ""); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911"); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getClientHost(), UtilAll.ipToIPv4Str(UtilAll.getIP())); + + String pubTraceData_V2 = new StringBuilder(pubTraceDataBase) + .append("false").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextListV2 = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_V2); + Assert.assertEquals(traceContextListV2.size(), 1); + Assert.assertEquals(traceContextListV2.get(0).getTraceType().toString(), "Pub"); + Assert.assertEquals(traceContextListV2.get(0).isSuccess(), false); + Assert.assertEquals(traceContextListV2.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc6980189"); + Assert.assertEquals(traceContextListV2.get(0).getTraceBeans().get(0).getOffsetMsgId(), ""); + Assert.assertEquals(traceContextListV2.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911"); + Assert.assertEquals(traceContextListV2.get(0).getTraceBeans().get(0).getClientHost(), UtilAll.ipToIPv4Str(UtilAll.getIP())); + + String pubTraceData_V3 = new StringBuilder(pubTraceDataBase) + .append("0A741D02000078BF000000000132F7C9").append(TraceConstants.CONTENT_SPLITOR) + .append("true").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextListV3 = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_V3); + Assert.assertEquals(traceContextListV3.size(), 1); + Assert.assertEquals(traceContextListV3.get(0).getTraceType().toString(), "Pub"); + Assert.assertEquals(traceContextListV3.get(0).isSuccess(), true); + Assert.assertEquals(traceContextListV3.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc6980189"); + Assert.assertEquals(traceContextListV3.get(0).getTraceBeans().get(0).getOffsetMsgId(), "0A741D02000078BF000000000132F7C9"); + Assert.assertEquals(traceContextListV3.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911"); + Assert.assertEquals(traceContextListV3.get(0).getTraceBeans().get(0).getClientHost(), UtilAll.ipToIPv4Str(UtilAll.getIP())); + + String pubTraceData_V4 = new StringBuilder(pubTraceDataBase) + .append("0A741D02000078BF000000000132F7C9").append(TraceConstants.CONTENT_SPLITOR) + .append("true").append(TraceConstants.CONTENT_SPLITOR) + .append("10.10.10.11").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextListV4 = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_V4); + Assert.assertEquals(traceContextListV4.size(), 1); + Assert.assertEquals(traceContextListV4.get(0).getTraceType().toString(), "Pub"); + Assert.assertEquals(traceContextListV4.get(0).isSuccess(), true); + Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc6980189"); + Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getOffsetMsgId(), "0A741D02000078BF000000000132F7C9"); + Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911"); + Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getClientHost(), "10.10.10.11"); + } + + @Test + public void testDecodeSubTraceMessage() { + String subTraceData_V1 = new String(subTraceDataBase); + List traceContextListV1 = MsgTraceDecodeUtil.decoderFromTraceDataString(subTraceData_V1); + Assert.assertEquals(traceContextListV1.size(), 2); + Assert.assertEquals(traceContextListV1.get(0).getTraceType().toString(), "SubBefore"); + Assert.assertEquals(traceContextListV1.get(0).isSuccess(), true); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f"); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getRetryTimes(), 2); + Assert.assertEquals(traceContextListV1.get(0).getTraceBeans().get(0).getClientHost(), "10.10.10.11@39960"); + Assert.assertEquals(traceContextListV1.get(1).getTraceType().toString(), "SubAfter"); + Assert.assertEquals(traceContextListV1.get(1).isSuccess(), false); + Assert.assertEquals(traceContextListV1.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f"); + + String subTraceData_V2 = new StringBuilder(subTraceDataBase) + .append("4").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextListV2 = MsgTraceDecodeUtil.decoderFromTraceDataString(subTraceData_V2); + Assert.assertEquals(traceContextListV2.size(), 2); + Assert.assertEquals(traceContextListV2.get(1).getTraceType().toString(), "SubAfter"); + Assert.assertEquals(traceContextListV2.get(1).isSuccess(), false); + Assert.assertEquals(traceContextListV2.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f"); + Assert.assertEquals(traceContextListV2.get(1).getContextCode(), 4); + + String subTraceData_V3 = new StringBuilder(subTraceDataBase) + .append("4").append(TraceConstants.CONTENT_SPLITOR) + .append("1614666740499").append(TraceConstants.CONTENT_SPLITOR) + .append("test_consumer_group").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextListV3 = MsgTraceDecodeUtil.decoderFromTraceDataString(subTraceData_V3); + Assert.assertEquals(traceContextListV3.size(), 2); + Assert.assertEquals(traceContextListV3.get(1).getTraceType().toString(), "SubAfter"); + Assert.assertEquals(traceContextListV3.get(1).isSuccess(), false); + Assert.assertEquals(traceContextListV3.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f"); + Assert.assertEquals(traceContextListV3.get(1).getGroupName(), "test_consumer_group"); + } +}