Filter system topic while console dashboard was making topic statistics (#642)

Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
zhangjidi2016
2021-01-11 10:17:44 +08:00
committed by GitHub
parent 16a703b5f9
commit a615c8cc59

View File

@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod; import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -73,16 +74,19 @@ public class DashboardCollectTask {
if (!rmqConfigure.isEnableDashBoardCollect()) { if (!rmqConfigure.isEnableDashBoardCollect()) {
return; return;
} }
Date date = new Date(); Date date = new Date();
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
try { try {
TopicList topicList = mqAdminExt.fetchAllTopicList(); TopicList topicList = mqAdminExt.fetchAllTopicList();
Set<String> topicSet = topicList.getTopicList(); Set<String> topicSet = topicList.getTopicList();
this.addSystemTopic();
for (String topic : topicSet) { for (String topic : topicSet) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
|| TopicValidator.isSystemTopic(topic)) {
continue; continue;
} }
TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic); TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
@@ -320,4 +324,16 @@ public class DashboardCollectTask {
return newTpsList; return newTpsList;
} }
private void addSystemTopic() throws Exception {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable();
for(Map.Entry<String, Set<String>> entry : clusterTable.entrySet()){
String clusterName = entry.getKey();
TopicValidator.addSystemTopic(clusterName);
Set<String> brokerNames = entry.getValue();
for (String brokerName : brokerNames) {
TopicValidator.addSystemTopic(brokerName);
}
}
}
} }