This commit is contained in:
loongs-zhang
2024-11-14 11:13:33 +08:00
committed by GitHub

View File

@@ -28,8 +28,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -234,17 +236,19 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
ConsumeStats consumeStats;
List<TopicConsumerInfo> result = new LinkedList<>();
String topic = null;
try {
String[] addresses = address.split(",");
String addr = addresses[0];
consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
for (String addr : addresses) {
ConsumeStats consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
result.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName));
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return toTopicConsumerInfoList(topic, consumeStats, groupName);
return result;
}
@Override
@@ -463,9 +467,16 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override
public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
try {
ConsumerConnection result = new ConsumerConnection();
String[] addresses = address.split(",");
String addr = addresses[0];
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
for (String addr : addresses) {
final ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
if (Objects.isNull(consumerConnection)) {
continue;
}
result.getConnectionSet().addAll(consumerConnection.getConnectionSet());
}
return result;
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);