fix:Failed to find messages older than 3 days using message ID #274 (#275)

This commit is contained in:
Xu Yichi
2025-03-31 10:45:24 +08:00
committed by GitHub
parent 1aad0cda25
commit 3d13e4e2b8

View File

@@ -17,29 +17,27 @@
package org.apache.rocketmq.dashboard.service.client; package org.apache.rocketmq.dashboard.service.client;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -66,20 +64,23 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.common.AdminToolResult; import org.apache.rocketmq.tools.admin.common.AdminToolResult;
import org.joor.Reflect;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service @Service
@@ -461,18 +462,23 @@ public class MQAdminExtImpl implements MQAdminExt {
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId); logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
try { try {
return viewMessage(msgId); return viewMessage(msgId);
} catch (Exception e) {
} }
catch (Exception e) { MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
}
Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
if (clusterList == null || clusterList.isEmpty()) { if (clusterList == null || clusterList.isEmpty()) {
return MQAdminInstance.threadLocalMQAdminExt().queryMessage("", topic, msgId); QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", topic, msgId, 32,
} MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get();
for (String name : clusterList) { if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) {
MessageExt messageExt = MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId); return qr.getMessageList().get(0);
if (messageExt != null) { }
return messageExt; } else {
for (String name : clusterList) {
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", name, topic, msgId, 32,
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get();
if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) {
return qr.getMessageList().get(0);
}
} }
} }
return null; return null;