From e3b769db6aea68279ebbc136910ce304201c1a85 Mon Sep 17 00:00:00 2001 From: zhangjidi Date: Thu, 24 Nov 2022 10:43:07 +0800 Subject: [PATCH] [ISSUE #123]Optimize groupList.query --- .../service/impl/ConsumerServiceImpl.java | 58 ++++++++++++++++++- .../controller/ConsumerControllerTest.java | 4 +- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 3ad85d4..b1011b7 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -29,6 +29,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; @@ -48,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; @@ -60,12 +69,14 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.ConsumerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; import static com.google.common.base.Throwables.propagate; @Service -public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { +public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); @Resource @@ -73,6 +84,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum private static final Set SYSTEM_GROUP_SET = new HashSet<>(); + private ExecutorService executorService; + + @Override + public void afterPropertiesSet() { + Runtime runtime = Runtime.getRuntime(); + int corePoolSize = Math.max(10, runtime.availableProcessors() * 2); + int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2); + ThreadFactory threadFactory = new ThreadFactory() { + private final AtomicLong threadIndex = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet()); + } + }; + RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); + this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(5000), threadFactory, handler); + } + + @Override + public void destroy() { + ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS); + } + static { SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP); SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP); @@ -97,10 +133,26 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum catch (Exception err) { throw Throwables.propagate(err); } - List groupConsumeInfoList = Lists.newArrayList(); + List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); + CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); for (String consumerGroup : consumerGroupSet) { - groupConsumeInfoList.add(queryGroup(consumerGroup)); + executorService.submit(() -> { + try { + GroupConsumeInfo consumeInfo = queryGroup(consumerGroup); + groupConsumeInfoList.add(consumeInfo); + } catch (Exception e) { + logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); + } finally { + countDownLatch.countDown(); + } + }); } + try { + countDownLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("query consumerGroup countDownLatch await Exception", e); + } + if (!skipSysGroup) { groupConsumeInfoList.stream().map(group -> { if (SYSTEM_GROUP_SET.contains(group.getGroup())) { diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java index dfc3c22..b95e80a 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java @@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest { @Before public void init() throws Exception { + consumerService.afterPropertiesSet(); super.mockRmqConfigure(); ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); @@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest { perform = mockMvc.perform(requestBuilder); perform.andExpect(status().isOk()) .andExpect(jsonPath("$.data", hasSize(2))) - .andExpect(jsonPath("$.data[0].group").value("group_test")) .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name())); + // executorService shutdown + consumerService.destroy(); } @Test