diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java index 6cb6432..d3b6479 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.exception.ServiceException; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; @@ -67,6 +68,7 @@ import java.util.ArrayList; import java.util.Set; import java.util.Collection; import java.util.Date; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -215,12 +217,19 @@ public class MessageServiceImpl implements MessageService { try { ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + MessageExt messageExt = mqAdminExt.viewMessage(topic, msgId); for (Connection connection : consumerConnection.getConnectionSet()) { if (StringUtils.isBlank(connection.getClientId())) { continue; } - logger.info("clientId={}", connection.getClientId()); - return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); + ConcurrentMap subscriptionTable = consumerConnection.getSubscriptionTable(); + SubscriptionData subscriptionData = subscriptionTable.get(topic); + if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL) + || subscriptionData.getTagsSet().contains(messageExt.getTags())) { + logger.info("clientId={}", connection.getClientId()); + return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); + } + throw new IllegalStateException("CONSUMER NOT SUBSCRIPT THIS TAG"); } } catch (Exception e) { throw Throwables.propagate(e);