diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 6f5bc25..83143c3 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -17,29 +17,27 @@ package org.apache.rocketmq.dashboard.service.client; import com.google.common.base.Throwables; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQAdminImpl; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageRequestMode; -import org.apache.rocketmq.dashboard.util.JsonUtil; -import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; -import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; -import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; @@ -66,20 +64,23 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.dashboard.util.JsonUtil; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.common.AdminToolResult; +import org.joor.Reflect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; @Service @@ -461,18 +462,23 @@ public class MQAdminExtImpl implements MQAdminExt { logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId); try { return viewMessage(msgId); + } catch (Exception e) { } - catch (Exception e) { - } - + MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl(); Set clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); if (clusterList == null || clusterList.isEmpty()) { - return MQAdminInstance.threadLocalMQAdminExt().queryMessage("", topic, msgId); - } - for (String name : clusterList) { - MessageExt messageExt = MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId); - if (messageExt != null) { - return messageExt; + QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", topic, msgId, 32, + MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get(); + if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) { + return qr.getMessageList().get(0); + } + } else { + for (String name : clusterList) { + QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", name, topic, msgId, 32, + MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get(); + if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) { + return qr.getMessageList().get(0); + } } } return null;