mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2025-09-11 03:49:06 +08:00
Compare commits
1 Commits
refactor
...
optimze_qu
Author | SHA1 | Date | |
---|---|---|---|
|
e3b769db6a |
@@ -29,6 +29,14 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.RejectedExecutionHandler;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
@@ -48,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
|
|||||||
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
|
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
|
||||||
import org.apache.rocketmq.common.protocol.route.BrokerData;
|
import org.apache.rocketmq.common.protocol.route.BrokerData;
|
||||||
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
|
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
|
||||||
|
import org.apache.rocketmq.common.utils.ThreadUtils;
|
||||||
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
||||||
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
||||||
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
||||||
@@ -60,12 +69,14 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
|||||||
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import static com.google.common.base.Throwables.propagate;
|
import static com.google.common.base.Throwables.propagate;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
|
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
|
||||||
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
|
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
@@ -73,6 +84,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
|
|
||||||
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
||||||
|
|
||||||
|
private ExecutorService executorService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() {
|
||||||
|
Runtime runtime = Runtime.getRuntime();
|
||||||
|
int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
|
||||||
|
int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
|
||||||
|
ThreadFactory threadFactory = new ThreadFactory() {
|
||||||
|
private final AtomicLong threadIndex = new AtomicLong(0);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
|
||||||
|
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
|
||||||
|
new LinkedBlockingQueue<>(5000), threadFactory, handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
static {
|
static {
|
||||||
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
|
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
|
||||||
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
|
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
|
||||||
@@ -97,10 +133,26 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
catch (Exception err) {
|
catch (Exception err) {
|
||||||
throw Throwables.propagate(err);
|
throw Throwables.propagate(err);
|
||||||
}
|
}
|
||||||
List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
|
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
|
||||||
for (String consumerGroup : consumerGroupSet) {
|
for (String consumerGroup : consumerGroupSet) {
|
||||||
groupConsumeInfoList.add(queryGroup(consumerGroup));
|
executorService.submit(() -> {
|
||||||
|
try {
|
||||||
|
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
|
||||||
|
groupConsumeInfoList.add(consumeInfo);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
|
||||||
|
} finally {
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
countDownLatch.await(30, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error("query consumerGroup countDownLatch await Exception", e);
|
||||||
|
}
|
||||||
|
|
||||||
if (!skipSysGroup) {
|
if (!skipSysGroup) {
|
||||||
groupConsumeInfoList.stream().map(group -> {
|
groupConsumeInfoList.stream().map(group -> {
|
||||||
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
||||||
|
@@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws Exception {
|
public void init() throws Exception {
|
||||||
|
consumerService.afterPropertiesSet();
|
||||||
super.mockRmqConfigure();
|
super.mockRmqConfigure();
|
||||||
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
|
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
|
||||||
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
|
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
|
||||||
@@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest {
|
|||||||
perform = mockMvc.perform(requestBuilder);
|
perform = mockMvc.perform(requestBuilder);
|
||||||
perform.andExpect(status().isOk())
|
perform.andExpect(status().isOk())
|
||||||
.andExpect(jsonPath("$.data", hasSize(2)))
|
.andExpect(jsonPath("$.data", hasSize(2)))
|
||||||
.andExpect(jsonPath("$.data[0].group").value("group_test"))
|
|
||||||
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
|
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
|
||||||
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
|
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
|
||||||
|
// executorService shutdown
|
||||||
|
consumerService.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Reference in New Issue
Block a user