mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2025-09-10 11:40:01 +08:00
fix queryConsumerByTopic when address like "a,b"
This commit is contained in:
@@ -28,8 +28,10 @@ import java.util.Arrays;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@@ -234,17 +236,19 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
|
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
|
||||||
ConsumeStats consumeStats;
|
List<TopicConsumerInfo> result = new LinkedList<>();
|
||||||
String topic = null;
|
String topic = null;
|
||||||
try {
|
try {
|
||||||
String[] addresses = address.split(",");
|
String[] addresses = address.split(",");
|
||||||
String addr = addresses[0];
|
for (String addr : addresses) {
|
||||||
consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
|
ConsumeStats consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
|
||||||
|
result.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName));
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return toTopicConsumerInfoList(topic, consumeStats, groupName);
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -463,9 +467,16 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
@Override
|
@Override
|
||||||
public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
|
public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
|
||||||
try {
|
try {
|
||||||
|
ConsumerConnection result = new ConsumerConnection();
|
||||||
String[] addresses = address.split(",");
|
String[] addresses = address.split(",");
|
||||||
String addr = addresses[0];
|
for (String addr : addresses) {
|
||||||
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
|
final ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
|
||||||
|
if (Objects.isNull(consumerConnection)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
result.getConnectionSet().addAll(consumerConnection.getConnectionSet());
|
||||||
|
}
|
||||||
|
return result;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
Reference in New Issue
Block a user