From bbabd1cd0df299c8c59d44344ffe4c369c6eef13 Mon Sep 17 00:00:00 2001 From: Xu Yichi <110229037+Crazylychee@users.noreply.github.com> Date: Sun, 13 Apr 2025 19:41:24 +0800 Subject: [PATCH] [ISSUES #281 #274 #285 #287] Speeds up topic and consumer queries, adds caching, and fixes delay/dead-letter topic mix-up (#286) * fix: Resolved issue of query failure under a large number of topics and consumers apache#281 * fix: Expand the message ID query time range to avoid query failure * fix: Remove duplicates from topic queries, increase system topic recognition #287 --- .../controller/ConsumerController.java | 7 + .../dashboard/controller/TopicController.java | 6 + .../dashboard/model/GroupConsumeInfo.java | 10 + .../dashboard/service/ClusterInfoService.java | 72 +++++ .../dashboard/service/ConsumerService.java | 8 +- .../dashboard/service/TopicService.java | 1 + .../service/client/MQAdminExtImpl.java | 32 ++- .../service/impl/ConsumerServiceImpl.java | 246 +++++++++++------- .../service/impl/TopicServiceImpl.java | 142 +++++++--- .../dashboard/task/DashboardCollectTask.java | 11 + src/main/resources/static/src/consumer.js | 30 ++- src/main/resources/static/src/i18n/en.js | 1 + src/main/resources/static/src/i18n/zh.js | 3 +- src/main/resources/static/src/message.js | 2 +- src/main/resources/static/src/topic.js | 43 ++- .../resources/static/view/pages/consumer.html | 10 +- 16 files changed, 464 insertions(+), 160 deletions(-) create mode 100644 src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java index 96fc056..cf4a210 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java @@ -51,6 +51,13 @@ public class ConsumerController { return consumerService.queryGroupList(skipSysGroup, address); } + @RequestMapping(value = "/group.refresh") + @ResponseBody + public Object refresh(String address, + String consumerGroup) { + return consumerService.refreshGroup(address, consumerGroup); + } + @RequestMapping(value = "/group.query") @ResponseBody public Object groupQuery(@RequestParam String consumerGroup, String address) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java index 467c18e..665a80a 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java @@ -56,6 +56,12 @@ public class TopicController { return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq); } + @RequestMapping(value = "/refresh", method = {RequestMethod.POST}) + @ResponseBody + public Object refresh() { + return topicService.refreshTopicList(); + } + @RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET) @ResponseBody public Object listTopicType() { diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java index db11c41..358d02e 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.dashboard.model; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import java.util.Date; import java.util.List; public class GroupConsumeInfo implements Comparable { @@ -31,6 +32,7 @@ public class GroupConsumeInfo implements Comparable { private int consumeTps; private long diffTotal = -1; private String subGroupType = "NORMAL"; + private Date updateTime; public String getGroup() { @@ -112,4 +114,12 @@ public class GroupConsumeInfo implements Comparable { public void setVersion(String version) { this.version = version; } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java new file mode 100644 index 0000000..3dc12b3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +@Service +public class ClusterInfoService { + + @Resource + private MQAdminExt mqAdminExt; + + @Value("${rocketmq.cluster.cache.expire:60000}") + private long cacheExpireMs; + + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final AtomicReference cachedRef = new AtomicReference<>(); + + + @PostConstruct + public void init() { + scheduler.scheduleAtFixedRate(this::refresh, + 0, cacheExpireMs / 2, TimeUnit.MILLISECONDS); + } + + public ClusterInfo get() { + ClusterInfo info = cachedRef.get(); + return info != null ? info : refresh(); + } + + public synchronized ClusterInfo refresh() { + try { + ClusterInfo fresh = mqAdminExt.examineBrokerClusterInfo(); + cachedRef.set(fresh); + return fresh; + } catch (Exception e) { + log.warn("Refresh cluster info failed", e); + ClusterInfo old = cachedRef.get(); + if (old != null) { + return old; + } + throw new IllegalStateException("No cluster info available", e); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java index e284c44..001a184 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java @@ -17,14 +17,14 @@ package org.apache.rocketmq.dashboard.service; +import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; +import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; +import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.TopicConsumerInfo; -import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; -import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; -import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; import java.util.List; import java.util.Map; @@ -55,4 +55,6 @@ public interface ConsumerService { ConsumerConnection getConsumerConnection(String consumerGroup, String address); ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); + + Object refreshGroup(String address, String consumerGroup); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java index 9ff0bf0..b0f4814 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java @@ -54,4 +54,5 @@ public interface TopicService { SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest); + boolean refreshTopicList(); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 83143c3..146f9e5 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -17,11 +17,15 @@ package org.apache.rocketmq.dashboard.service.client; import com.google.common.base.Throwables; + import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -87,9 +91,15 @@ import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; public class MQAdminExtImpl implements MQAdminExt { private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class); - public MQAdminExtImpl() { + private static final ConcurrentMap TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>(); + + public MQAdminExtImpl() {} + + public static void clearTopicConfigCache() { + TOPIC_CONFIG_CACHE.clear(); } + @Override public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, @@ -145,7 +155,7 @@ public class MQAdminExtImpl implements MQAdminExt { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = null; try { - response = remotingClient.invokeSync(addr, request, 3000); + response = remotingClient.invokeSync(addr, request, 8000); } catch (Exception err) { Throwables.throwIfUnchecked(err); @@ -164,19 +174,27 @@ public class MQAdminExtImpl implements MQAdminExt { @Override public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException { + TopicConfigSerializeWrapper cachedWrapper = TOPIC_CONFIG_CACHE.get(addr); + + if (cachedWrapper != null && cachedWrapper.getTopicConfigTable().containsKey(topic)) { + return cachedWrapper.getTopicConfigTable().get(topic); + } + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand response = null; try { response = remotingClient.invokeSync(addr, request, 3000); - } - catch (Exception err) { + } catch (Exception err) { Throwables.throwIfUnchecked(err); throw new RuntimeException(err); } switch (response.getCode()) { case ResponseCode.SUCCESS: { - TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class); + TopicConfigSerializeWrapper topicConfigSerializeWrapper = + decode(response.getBody(), TopicConfigSerializeWrapper.class); + + TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper); return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); } default: @@ -468,14 +486,14 @@ public class MQAdminExtImpl implements MQAdminExt { Set clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); if (clusterList == null || clusterList.isEmpty()) { QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", topic, msgId, 32, - MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get(); + 0L, Long.MAX_VALUE, true).get(); if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) { return qr.getMessageList().get(0); } } else { for (String name : clusterList) { QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", name, topic, msgId, 32, - MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get(); + 0L, Long.MAX_VALUE, true).get(); if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) { return qr.getMessageList().get(0); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 9bc37ab..2f81582 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -23,14 +23,18 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -41,11 +45,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.annotation.Resource; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; +import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; +import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; +import org.apache.rocketmq.dashboard.service.ClusterInfoService; import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; @@ -65,15 +74,13 @@ import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.QueueStatInfo; import org.apache.rocketmq.dashboard.model.TopicConsumerInfo; -import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; -import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; -import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.ConsumerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @@ -85,10 +92,17 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Resource private RMQConfigure configure; + @Autowired + private ClusterInfoService clusterInfoService; + + private volatile boolean isCacheBeingBuilt = false; + private static final Set SYSTEM_GROUP_SET = new HashSet<>(); private ExecutorService executorService; + private final List cacheConsumeInfoList = Collections.synchronizedList(new ArrayList<>()); + @Override public void afterPropertiesSet() { Runtime runtime = Runtime.getRuntime(); @@ -104,7 +118,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum }; RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(5000), threadFactory, handler); + new LinkedBlockingQueue<>(5000), threadFactory, handler); } @Override @@ -125,46 +139,27 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public List queryGroupList(boolean skipSysGroup, String address) { - HashMap> consumerGroupMap = Maps.newHashMap(); - try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { - SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); - for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) { - if (!consumerGroupMap.containsKey(groupName)) { - consumerGroupMap.putIfAbsent(groupName, new ArrayList<>()); - } - List addresses = consumerGroupMap.get(groupName); - addresses.add(brokerData.selectBrokerAddr()); - consumerGroupMap.put(groupName, addresses); + if (isCacheBeingBuilt) { + throw new RuntimeException("Cache is being built, please try again later"); + } + + synchronized (this) { + if (cacheConsumeInfoList.isEmpty() && !isCacheBeingBuilt) { + isCacheBeingBuilt = true; + try { + makeGroupListCache(); + } finally { + isCacheBeingBuilt = false; } } - } catch (Exception err) { - Throwables.throwIfUnchecked(err); - throw new RuntimeException(err); } - List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); - CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size()); - for (Map.Entry> entry : consumerGroupMap.entrySet()) { - String consumerGroup = entry.getKey(); - executorService.submit(() -> { - try { - GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address); - consumeInfo.setAddress(entry.getValue()); - groupConsumeInfoList.add(consumeInfo); - } catch (Exception e) { - logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); - } finally { - countDownLatch.countDown(); - } - }); - } - try { - countDownLatch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.error("query consumerGroup countDownLatch await Exception", e); + + if (cacheConsumeInfoList.isEmpty()) { + throw new RuntimeException("No consumer group information available"); } + List groupConsumeInfoList = new ArrayList<>(cacheConsumeInfoList); + if (!skipSysGroup) { groupConsumeInfoList.stream().map(group -> { if (SYSTEM_GROUP_SET.contains(group.getGroup())) { @@ -177,6 +172,71 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return groupConsumeInfoList; } + + public void makeGroupListCache() { + HashMap> consumerGroupMap = Maps.newHashMap(); + SubscriptionGroupWrapper subscriptionGroupWrapper = null; + try { + ClusterInfo clusterInfo = clusterInfoService.get(); + for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { + subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); + for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) { + if (!consumerGroupMap.containsKey(groupName)) { + consumerGroupMap.putIfAbsent(groupName, new ArrayList<>()); + } + List addresses = consumerGroupMap.get(groupName); + addresses.add(brokerData.selectBrokerAddr()); + consumerGroupMap.put(groupName, addresses); + } + } + } catch (Exception err) { + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); + } + + if (subscriptionGroupWrapper != null && subscriptionGroupWrapper.getSubscriptionGroupTable().isEmpty()) { + logger.warn("No subscription group information available"); + isCacheBeingBuilt = false; + return; + } + final ConcurrentMap subscriptionGroupTable = subscriptionGroupWrapper.getSubscriptionGroupTable(); + List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); + CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size()); + for (Map.Entry> entry : consumerGroupMap.entrySet()) { + String consumerGroup = entry.getKey(); + executorService.submit(() -> { + try { + GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, ""); + consumeInfo.setAddress(entry.getValue()); + if (SYSTEM_GROUP_SET.contains(consumerGroup)) { + consumeInfo.setSubGroupType("SYSTEM"); + } else { + consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly() ? "FIFO" : "NORMAL"); + } + consumeInfo.setGroup(consumerGroup); + consumeInfo.setUpdateTime(new Date()); + groupConsumeInfoList.add(consumeInfo); + } catch (Exception e) { + logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); + } finally { + countDownLatch.countDown(); + } + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interruption occurred while waiting for task completion", e); + } + logger.info("All consumer group query tasks have been completed"); + isCacheBeingBuilt = false; + Collections.sort(groupConsumeInfoList); + + cacheConsumeInfoList.clear(); + cacheConsumeInfoList.addAll(groupConsumeInfoList); + } + @Override public GroupConsumeInfo queryGroup(String consumerGroup, String address) { GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); @@ -184,16 +244,14 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum ConsumeStats consumeStats = null; try { consumeStats = mqAdminExt.examineConsumeStats(consumerGroup); - } - catch (Exception e) { + } catch (Exception e) { logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); } - + if (consumeStats != null) { + groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps()); + groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); + } ConsumerConnection consumerConnection = null; - boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup) - .stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig) - .allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly); - try { if (StringUtils.isNotEmpty(address)) { consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup); @@ -203,31 +261,15 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } catch (Exception e) { logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); } - - groupConsumeInfo.setGroup(consumerGroup); - if (SYSTEM_GROUP_SET.contains(consumerGroup)) { - groupConsumeInfo.setSubGroupType("SYSTEM"); - } else if (isFifoType) { - groupConsumeInfo.setSubGroupType("FIFO"); - } else { - groupConsumeInfo.setSubGroupType("NORMAL"); - } - - if (consumeStats != null) { - groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); - groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); - } - if (consumerConnection != null) { groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size()); groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel()); groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType()); groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion())); } - } - catch (Exception e) { + } catch (Exception e) { logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " - + consumerGroup, e); + + consumerGroup, e); } return groupConsumeInfo; } @@ -252,8 +294,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum ConsumeStats consumeStats = null; try { consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); - } - catch (Exception e) { + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } @@ -295,8 +336,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum results.put(messageQueue, clinetId); } } - } - catch (Exception err) { + } catch (Exception err) { logger.error("op=getClientConnection_error", err); } return results; @@ -311,14 +351,12 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum List topicConsumerInfoList = null; try { topicConsumerInfoList = queryConsumeStatsList(topic, group); - } - catch (Exception ignore) { + } catch (Exception ignore) { } group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0)); } return group2ConsumerInfoMap; - } - catch (Exception e) { + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } @@ -330,7 +368,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) { try { Map rollbackStatsMap = - mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce()); + mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce()); ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); List rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList(); for (Map.Entry rollbackStatsEntty : rollbackStatsMap.entrySet()) { @@ -341,8 +379,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum rollbackStatsList.add(rollbackStats); } groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); - } - catch (MQClientException e) { + } catch (MQClientException e) { if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { try { ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); @@ -350,17 +387,14 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList); groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); continue; - } - catch (Exception err) { + } catch (Exception err) { logger.error("op=resetOffset_which_not_online_error", err); } - } - else { + } else { logger.error("op=resetOffset_error", e); } groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); - } - catch (Exception e) { + } catch (Exception e) { logger.error("op=resetOffset_error", e); groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); } @@ -372,17 +406,21 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum public List examineSubscriptionGroupConfig(String group) { List consumerConfigInfoList = Lists.newArrayList(); try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = clusterInfoService.get(); for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); - SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); + SubscriptionGroupConfig subscriptionGroupConfig = null; + try { + subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); + } catch (Exception e) { + logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, group); + } if (subscriptionGroupConfig == null) { continue; } consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig)); } - } - catch (Exception e) { + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } @@ -399,7 +437,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum deleteInNsFlag = true; } try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = clusterInfoService.get(); for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) { logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true); @@ -407,8 +445,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag); deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag); } - } - catch (Exception e) { + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } @@ -430,13 +467,12 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = clusterInfoService.get(); for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), - consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) { + consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) { mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig()); } - } - catch (Exception err) { + } catch (Exception err) { Throwables.throwIfUnchecked(err); throw new RuntimeException(err); } @@ -451,8 +487,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) { brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList()); } - } - catch (Exception e) { + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } @@ -476,10 +511,29 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) { try { return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); - } - catch (Exception e) { + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } } + + @Override + public GroupConsumeInfo refreshGroup(String address, String consumerGroup) { + + if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) { + throw new RuntimeException("Cache is being built or empty, please try again later"); + } + synchronized (cacheConsumeInfoList) { + for (int i = 0; i < cacheConsumeInfoList.size(); i++) { + GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList.get(i); + if (groupConsumeInfo.getGroup().equals(consumerGroup)) { + GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, ""); + updatedInfo.setUpdateTime(new Date()); + cacheConsumeInfoList.set(i, updatedInfo); + return updatedInfo; + } + } + } + throw new RuntimeException("No consumer group information available"); + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index 4f34fc6..14c00a2 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -43,28 +43,35 @@ import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicTypeList; import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta; import org.apache.rocketmq.dashboard.service.AbstractCommonService; +import org.apache.rocketmq.dashboard.service.ClusterInfoService; import org.apache.rocketmq.dashboard.service.TopicService; +import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl; +import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.tools.command.CommandUtil; import org.joor.Reflect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -73,9 +80,19 @@ import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { + private Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class); + + @Autowired + private ClusterInfoService clusterInfoService; + + private final ConcurrentMap routeCache = new ConcurrentHashMap<>(); + private final Object cacheLock = new Object(); + @Autowired private RMQConfigure configure; + private final ConcurrentMap topicConfigCache = new ConcurrentHashMap<>(); + @Override public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq) { try { @@ -105,37 +122,63 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public TopicTypeList examineAllTopicType() { - ArrayList topicTypes = new ArrayList<>(); - ArrayList names = new ArrayList<>(); - ArrayList messageTypes = new ArrayList<>(); - TopicList topicList = fetchAllTopicList(false, false); - checkTopicType(topicList, topicTypes); - topicTypes.sort((t1, t2) -> t1.getTopicName().compareTo(t2.getTopicName())); - for (TopicTypeMeta topicTypeMeta : topicTypes) { - names.add(topicTypeMeta.getTopicName()); - messageTypes.add(topicTypeMeta.getMessageType()); - } + List messageTypes = new ArrayList<>(); + List names = new ArrayList<>(); + ClusterInfo clusterInfo = clusterInfoService.get(); + TopicList sysTopics = getSystemTopicList(); + clusterInfo.getBrokerAddrTable().values().forEach(brokerAddr -> { + try { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = mqAdminExt.getAllTopicConfig(brokerAddr.getBrokerAddrs().get(0L), 10000L); + for (TopicConfig topicConfig : topicConfigSerializeWrapper.getTopicConfigTable().values()) { + TopicTypeMeta topicType = classifyTopicType(topicConfig.getTopicName(), topicConfigSerializeWrapper.getTopicConfigTable().get(topicConfig.getTopicName()).getAttributes(),sysTopics.getTopicList()); + if (names.contains(topicType.getTopicName())) { + continue; + } + names.add(topicType.getTopicName()); + messageTypes.add(topicType.getMessageType()); + } + } catch (Exception e) { + logger.warn("Failed to classify topic type for broker: " + brokerAddr, e); + } + }); + sysTopics.getTopicList().forEach(topicName -> { + String sysTopicName = String.format("%s%s", "%SYS%", topicName); + if (!names.contains(sysTopicName)) { + names.add(sysTopicName); + messageTypes.add("SYSTEM"); + } + }); + return new TopicTypeList(names, messageTypes); } - private void checkTopicType(TopicList topicList, ArrayList topicTypes) { - for (String topicName : topicList.getTopicList()) { - TopicTypeMeta topicType = new TopicTypeMeta(); - topicType.setTopicName(topicName); - if (topicName.startsWith("%R")) { - topicType.setMessageType("RETRY"); - } else if (topicName.startsWith("%D")) { - topicType.setMessageType("DELAY"); - } else if (topicName.startsWith("%S")) { - topicType.setMessageType("SYSTEM"); - } else { - List topicConfigInfos = examineTopicConfig(topicName); - if (!CollectionUtils.isEmpty(topicConfigInfos)) { - topicType.setMessageType(topicConfigInfos.get(0).getMessageType()); - } - } - topicTypes.add(topicType); + private TopicTypeMeta classifyTopicType(String topicName, Map attributes, Set sysTopics) { + TopicTypeMeta topicType = new TopicTypeMeta(); + topicType.setTopicName(topicName); + + if (topicName.startsWith("%R")) { + topicType.setMessageType("RETRY"); + return topicType; + } else if (topicName.startsWith("%D")) { + topicType.setMessageType("DLQ"); + return topicType; + } else if (sysTopics.contains(topicName) || topicName.startsWith("rmq_sys") || topicName.equals("DefaultHeartBeatSyncerTopic")) { + topicType.setMessageType("SYSTEM"); + topicType.setTopicName(String.format("%s%s", "%SYS%", topicName)); + return topicType; } + if (attributes == null || attributes.isEmpty()) { + topicType.setMessageType("UNSPECIFIED"); + return topicType; + } + + String messageType = attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.UNSPECIFIED.name(); + } + topicType.setMessageType(messageType); + + return topicType; } @Override @@ -150,11 +193,24 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public TopicRouteData route(String topic) { - try { - return mqAdminExt.examineTopicRouteInfo(topic); - } catch (Exception ex) { - Throwables.throwIfUnchecked(ex); - throw new RuntimeException(ex); + TopicRouteData cachedData = routeCache.get(topic); + if (cachedData != null) { + return cachedData; + } + + synchronized (cacheLock) { + cachedData = routeCache.get(topic); + if (cachedData != null) { + return cachedData; + } + try { + TopicRouteData freshData = mqAdminExt.examineTopicRouteInfo(topic); + routeCache.put(topic, freshData); + return freshData; + } catch (Exception ex) { + Throwables.throwIfUnchecked(ex); + throw new RuntimeException(ex); + } } } @@ -170,6 +226,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { + MQAdminExtImpl.clearTopicConfigCache(); TopicConfig topicConfig = new TopicConfig(); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); String messageType = topicCreateOrUpdateRequest.getMessageType(); @@ -189,12 +246,15 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } } - @Override public TopicConfig examineTopicConfig(String topic, String brokerName) { - ClusterInfo clusterInfo = null; try { - clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); + ClusterInfo clusterInfo = clusterInfoService.get(); + + BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName); + if (brokerData == null) { + throw new RuntimeException("Broker not found: " + brokerName); + } + return mqAdminExt.examineTopicConfig(brokerData.selectBrokerAddr(), topic); } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); @@ -371,6 +431,14 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } + @Override + public boolean refreshTopicList() { + routeCache.clear(); + clusterInfoService.refresh(); + MQAdminExtImpl.clearTopicConfigCache(); + return true; + } + private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { if (!traceEnabled) { return; diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java index d58668b..c9a870c 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import javax.annotation.Resource; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.dashboard.service.ConsumerService; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.TopicList; @@ -59,6 +60,9 @@ public class DashboardCollectTask { @Resource private DashboardCollectService dashboardCollectService; + @Resource + private ConsumerService consumerService; + private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class); @Resource @@ -89,6 +93,13 @@ public class DashboardCollectTask { } } + @Scheduled(cron = "0 0 2 * * ?") + public void collectConsumer() { + consumerService.queryGroupList(false, null); + } + + + @Scheduled(cron = "0 0/1 * * * ?") public void collectBroker() { if (!rmqConfigure.isEnableDashBoardCollect()) { diff --git a/src/main/resources/static/src/consumer.js b/src/main/resources/static/src/consumer.js index 8cf845b..8d5f2c7 100644 --- a/src/main/resources/static/src/consumer.js +++ b/src/main/resources/static/src/consumer.js @@ -70,6 +70,34 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific } $scope.filterList($scope.paginationConf.currentPage) }; + $scope.refreshConsumerGroup = function (groupName) { + //Show loader + $('#loaderConsumer').removeClass("hide-myloader"); + + $http({ + method: "GET", + url: "/consumer/group.refresh", + params: { + address: $scope.isRmqVersionV5() ? localStorage.getItem('proxyAddr') : null, + consumerGroup: groupName + } + }).success(function (resp) { + if (resp.status == 0) { + for (var i = 0; i < $scope.allConsumerGrouopList.length; i++) { + if ($scope.allConsumerGrouopList[i].group === groupName) { + $scope.allConsumerGrouopList[i] = resp.data; + break; + } + } + $scope.showConsumerGroupList($scope.paginationConf.currentPage, $scope.allConsumerGrouopList.length); + //Hide loader + $('#loaderConsumer').addClass("hide-myloader"); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + } + $scope.refreshConsumerData = function () { //Show loader $('#loaderConsumer').removeClass("hide-myloader"); @@ -421,4 +449,4 @@ module.controller('consumerTopicViewDialogController', ['$scope', 'ngDialog', '$ }); }; }] -); \ No newline at end of file +); diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index 2c1450d..7fbb042 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -135,4 +135,5 @@ var en = { "MESSAGE_TYPE_FIFO": "FIFO", "MESSAGE_TYPE_DELAY": "DELAY", "MESSAGE_TYPE_TRANSACTION": "TRANSACTION", + "UPDATE_TIME": "Update Time", } diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index 2f0e6f3..ec2ebdd 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -136,4 +136,5 @@ var zh = { "MESSAGE_TYPE_FIFO": "顺序消息", "MESSAGE_TYPE_DELAY": "定时/延时消息", "MESSAGE_TYPE_TRANSACTION": "事务消息", -} \ No newline at end of file + "UPDATE_TIME": "更新时间", +} diff --git a/src/main/resources/static/src/message.js b/src/main/resources/static/src/message.js index a496285..c980d9e 100644 --- a/src/main/resources/static/src/message.js +++ b/src/main/resources/static/src/message.js @@ -277,4 +277,4 @@ module.controller('messageDetailViewDialogController', ['$scope', 'ngDialog', '$ $scope.messageTrackShowList = canShowList; }); }] -); \ No newline at end of file +); diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index 7ad997c..13c3dbb 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -59,7 +59,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati $scope.userRole = $window.sessionStorage.getItem("userrole"); $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); - $scope.refreshTopicList = function () { + $scope.getTopicList = function () { $http({ method: "GET", url: "topic/list.queryTopicType" @@ -77,7 +77,34 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati }); }; - $scope.refreshTopicList(); + $scope.refreshTopicList = function () { + $http({ + method: "POST", + url: "topic/refresh" + }).success(function (resp) { + if (resp.status == 0 && resp.data == true) { + $http({ + method: "GET", + url: "topic/list.queryTopicType" + }).success(function (resp1) { + if (resp1.status == 0) { + $scope.allTopicNameList = resp1.data.topicNameList; + $scope.allMessageTypeList = resp1.data.messageTypeList; + console.log($scope.allTopicNameList); + console.log(JSON.stringify(resp1)); + $scope.showTopicList(1, $scope.allTopicNameList.length); + } else { + Notification.error({message: resp1.errMsg, delay: 5000}); + } + }); + + } else { + Notification.error({message: resp.errMsg, delay: 5000}); + } + }); + }; + + $scope.getTopicList(); $scope.filterStr = ""; $scope.$watch('filterStr', function () { @@ -127,17 +154,17 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati $scope.filterByType = function (str, type) { if ($scope.filterRetry) { - if (str.startsWith("%R")) { + if (type.includes("RETRY")) { return true } } if ($scope.filterDLQ) { - if (str.startsWith("%D")) { + if (type.includes("DLQ")) { return true } } if ($scope.filterSystem) { - if (str.startsWith("%S")) { + if (type.includes("SYSTEM")) { return true } } @@ -386,10 +413,6 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati if (resp.status == 0) { console.log(resp); ngDialog.open({ - preCloseCallback: function (value) { - // Refresh topic list - $scope.refreshTopicList(); - }, template: 'topicModifyDialog', controller: 'topicModifyDialogController', data: { @@ -540,4 +563,4 @@ module.controller('routerViewDialogController', ['$scope', 'ngDialog', '$http', }) }; }] -); \ No newline at end of file +); diff --git a/src/main/resources/static/view/pages/consumer.html b/src/main/resources/static/view/pages/consumer.html index 68c7786..c187f72 100644 --- a/src/main/resources/static/view/pages/consumer.html +++ b/src/main/resources/static/view/pages/consumer.html @@ -33,9 +33,6 @@ - {{'AUTO_REFRESH' | translate}} @@ -53,6 +50,7 @@ {{ 'MODE' | translate}} TPS {{ 'DELAY' | translate}} + {{ 'UPDATE_TIME' | translate}} {{ 'OPERATION' | translate}} {{consumerGroup.messageModel}} {{consumerGroup.consumeTps}} {{consumerGroup.diffTotal}} + {{consumerGroup.updateTime}} + @@ -568,4 +570,4 @@ - \ No newline at end of file +