diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 9bc37ab..ff0ce35 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -28,8 +28,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -234,17 +236,19 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public List queryConsumeStatsListByGroupName(String groupName, String address) { - ConsumeStats consumeStats; + List result = new LinkedList<>(); String topic = null; try { String[] addresses = address.split(","); - String addr = addresses[0]; - consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000); + for (String addr : addresses) { + ConsumeStats consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000); + result.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName)); + } } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - return toTopicConsumerInfoList(topic, consumeStats, groupName); + return result; } @Override @@ -463,9 +467,16 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public ConsumerConnection getConsumerConnection(String consumerGroup, String address) { try { + ConsumerConnection result = new ConsumerConnection(); String[] addresses = address.split(","); - String addr = addresses[0]; - return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr); + for (String addr : addresses) { + final ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr); + if (Objects.isNull(consumerConnection)) { + continue; + } + result.getConnectionSet().addAll(consumerConnection.getConnectionSet()); + } + return result; } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e);