Compare commits

..

2 Commits

Author SHA1 Message Date
为了吃方便面
de0fb99e0b Merge branch 'master' into new 2024-08-06 23:15:26 +08:00
NicholasChong
dcaea085f8 ## What is the purpose of the change
When resending a message in a page operation, check whether the consumer's subscription tag is satisfied.

## Brief changelog

XX

## Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.

- [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
- [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
- [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
- [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
- [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass.
- [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
2022-06-07 19:05:32 +08:00
4 changed files with 51 additions and 51 deletions

View File

@@ -28,14 +28,14 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-dashboard</artifactId>
<packaging>jar</packaging>
<version>2.0.0</version>
<version>1.0.1-SNAPSHOT</version>
<name>rocketmq-dashboard</name>
<scm>
<url>git@github.com:apache/rocketmq-dashboard.git</url>
<connection>scm:git:git@github.com:apache/rocketmq-dashboard.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq-dashboard.git</developerConnection>
<tag>rocketmq-dashboard-2.0.0</tag>
<tag>1.0.0</tag>
</scm>
<mailingLists>
@@ -459,7 +459,7 @@
<configuration>
<target>
<copy todir="${project.build.directory}/classes/public">
<fileset dir="${project.basedir}/frontend/build" />
<fileset dir="${project.basedir}/frontend/build"/>
</copy>
</target>
</configuration>

View File

@@ -17,29 +17,27 @@
package org.apache.rocketmq.dashboard.service.client;
import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -66,20 +64,23 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.common.AdminToolResult;
import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service
@@ -464,18 +465,15 @@ public class MQAdminExtImpl implements MQAdminExt {
}
catch (Exception e) {
}
Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
if (clusterList == null || clusterList.isEmpty()) {
return MQAdminInstance.threadLocalMQAdminExt().queryMessage("", topic, msgId);
MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
}
for (String name : clusterList) {
MessageExt messageExt = MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId);
if (messageExt != null) {
return messageExt;
}
else {
return null;
}
return null;
}
@Override

View File

@@ -37,9 +37,10 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
@@ -67,6 +68,7 @@ import java.util.ArrayList;
import java.util.Set;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -218,12 +220,19 @@ public class MessageServiceImpl implements MessageService {
try {
ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
MessageExt messageExt = mqAdminExt.viewMessage(topic, msgId);
for (Connection connection : consumerConnection.getConnectionSet()) {
if (StringUtils.isBlank(connection.getClientId())) {
continue;
}
logger.info("clientId={}", connection.getClientId());
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
ConcurrentMap<String, SubscriptionData> subscriptionTable = consumerConnection.getSubscriptionTable();
SubscriptionData subscriptionData = subscriptionTable.get(topic);
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
|| subscriptionData.getTagsSet().contains(messageExt.getTags())) {
logger.info("clientId={}", connection.getClientId());
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
}
throw new IllegalStateException("CONSUMER NOT SUBSCRIPT THIS TAG");
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);

View File

@@ -276,6 +276,15 @@
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<!--<div class="form-group">-->
<!--<label class="control-label col-sm-4">retryMaxTimes:</label>-->
<!--<div class="col-sm-8">-->
<!--<input class="form-control" ng-model="item.subscriptionGroupConfig.retryMaxTimes"-->
<!--type="text"-->
<!--required/>-->
<!--<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>-->
<!--</div>-->
<!--</div>-->
<div class="form-group">
<label class="control-label col-sm-3">brokerId:</label>
<div class="col-sm-9">
@@ -284,14 +293,6 @@
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">retryMaxTimes:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.retryMaxTimes" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">whichBrokerWhenConsumeSlowly:</label>
<div class="col-sm-9">
@@ -402,14 +403,6 @@
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">retryMaxTimes:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.retryMaxTimes" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">whichBrokerWhenConsumeSlowly:</label>
<div class="col-sm-9">