From dcaea085f8d6cf35b1dfd624ec409396d8c9d88e Mon Sep 17 00:00:00 2001 From: NicholasChong Date: Tue, 7 Jun 2022 19:05:32 +0800 Subject: [PATCH] ## What is the purpose of the change When resending a message in a page operation, check whether the consumer's subscription tag is satisfied. ## Brief changelog XX ## Verifying this change XXXX Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`. - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body. - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test). - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass. - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas). --- .../dashboard/service/impl/MessageServiceImpl.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java index 6cb6432..d3b6479 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.exception.ServiceException; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; @@ -67,6 +68,7 @@ import java.util.ArrayList; import java.util.Set; import java.util.Collection; import java.util.Date; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -215,12 +217,19 @@ public class MessageServiceImpl implements MessageService { try { ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + MessageExt messageExt = mqAdminExt.viewMessage(topic, msgId); for (Connection connection : consumerConnection.getConnectionSet()) { if (StringUtils.isBlank(connection.getClientId())) { continue; } - logger.info("clientId={}", connection.getClientId()); - return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); + ConcurrentMap subscriptionTable = consumerConnection.getSubscriptionTable(); + SubscriptionData subscriptionData = subscriptionTable.get(topic); + if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL) + || subscriptionData.getTagsSet().contains(messageExt.getTags())) { + logger.info("clientId={}", connection.getClientId()); + return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); + } + throw new IllegalStateException("CONSUMER NOT SUBSCRIPT THIS TAG"); } } catch (Exception e) { throw Throwables.propagate(e);