mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-06-08 22:03:21 +08:00
Compare commits
1 Commits
optimze_qu
...
dcaea085f8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dcaea085f8 |
@@ -9,10 +9,3 @@ github:
|
|||||||
squash: true
|
squash: true
|
||||||
merge: false
|
merge: false
|
||||||
rebase: false
|
rebase: false
|
||||||
|
|
||||||
notifications:
|
|
||||||
commits: commits@rocketmq.apache.org
|
|
||||||
issues: commits@rocketmq.apache.org
|
|
||||||
pullrequests: commits@rocketmq.apache.org
|
|
||||||
jobs: commits@rocketmq.apache.org
|
|
||||||
discussions: dev@rocketmq.apache.org
|
|
||||||
|
|||||||
@@ -29,14 +29,6 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.RejectedExecutionHandler;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
@@ -56,7 +48,6 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
|
|||||||
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
|
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
|
||||||
import org.apache.rocketmq.common.protocol.route.BrokerData;
|
import org.apache.rocketmq.common.protocol.route.BrokerData;
|
||||||
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
|
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
|
||||||
import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
||||||
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
||||||
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
||||||
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
||||||
@@ -69,14 +60,12 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
|||||||
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.DisposableBean;
|
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import static com.google.common.base.Throwables.propagate;
|
import static com.google.common.base.Throwables.propagate;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
|
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
|
||||||
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
|
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
@@ -84,31 +73,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
|
|
||||||
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
||||||
|
|
||||||
private ExecutorService executorService;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterPropertiesSet() {
|
|
||||||
Runtime runtime = Runtime.getRuntime();
|
|
||||||
int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
|
|
||||||
int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
|
|
||||||
ThreadFactory threadFactory = new ThreadFactory() {
|
|
||||||
private final AtomicLong threadIndex = new AtomicLong(0);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
|
|
||||||
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
|
|
||||||
new LinkedBlockingQueue<>(5000), threadFactory, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {
|
|
||||||
ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
static {
|
static {
|
||||||
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
|
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
|
||||||
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
|
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
|
||||||
@@ -133,26 +97,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
catch (Exception err) {
|
catch (Exception err) {
|
||||||
throw Throwables.propagate(err);
|
throw Throwables.propagate(err);
|
||||||
}
|
}
|
||||||
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
|
List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
|
|
||||||
for (String consumerGroup : consumerGroupSet) {
|
for (String consumerGroup : consumerGroupSet) {
|
||||||
executorService.submit(() -> {
|
groupConsumeInfoList.add(queryGroup(consumerGroup));
|
||||||
try {
|
|
||||||
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
|
|
||||||
groupConsumeInfoList.add(consumeInfo);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
|
|
||||||
} finally {
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
countDownLatch.await(30, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.error("query consumerGroup countDownLatch await Exception", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!skipSysGroup) {
|
if (!skipSysGroup) {
|
||||||
groupConsumeInfoList.stream().map(group -> {
|
groupConsumeInfoList.stream().map(group -> {
|
||||||
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
|
|||||||
import org.apache.rocketmq.common.protocol.body.Connection;
|
import org.apache.rocketmq.common.protocol.body.Connection;
|
||||||
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
|
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
|
||||||
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
|
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
|
||||||
|
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
|
||||||
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
||||||
import org.apache.rocketmq.dashboard.exception.ServiceException;
|
import org.apache.rocketmq.dashboard.exception.ServiceException;
|
||||||
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
|
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
|
||||||
@@ -67,6 +68,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -215,13 +217,20 @@ public class MessageServiceImpl implements MessageService {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
|
ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
|
||||||
|
MessageExt messageExt = mqAdminExt.viewMessage(topic, msgId);
|
||||||
for (Connection connection : consumerConnection.getConnectionSet()) {
|
for (Connection connection : consumerConnection.getConnectionSet()) {
|
||||||
if (StringUtils.isBlank(connection.getClientId())) {
|
if (StringUtils.isBlank(connection.getClientId())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
ConcurrentMap<String, SubscriptionData> subscriptionTable = consumerConnection.getSubscriptionTable();
|
||||||
|
SubscriptionData subscriptionData = subscriptionTable.get(topic);
|
||||||
|
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
|
||||||
|
|| subscriptionData.getTagsSet().contains(messageExt.getTags())) {
|
||||||
logger.info("clientId={}", connection.getClientId());
|
logger.info("clientId={}", connection.getClientId());
|
||||||
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
|
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
|
||||||
}
|
}
|
||||||
|
throw new IllegalStateException("CONSUMER NOT SUBSCRIPT THIS TAG");
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,10 +25,10 @@
|
|||||||
|
|
||||||
<appender name="FILE"
|
<appender name="FILE"
|
||||||
class="ch.qos.logback.core.rolling.RollingFileAppender">
|
class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
<file>${user.home}/logs/dashboardlogs/rocketmq-dashboard.log</file>
|
<file>${user.home}/logs/consolelogs/rocketmq-console.log</file>
|
||||||
<append>true</append>
|
<append>true</append>
|
||||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||||
<fileNamePattern>${user.home}/logs/dashboardlogs/rocketmq-dashboard-%d{yyyy-MM-dd}.%i.log
|
<fileNamePattern>${user.home}/logs/consolelogs/rocketmq-console-%d{yyyy-MM-dd}.%i.log
|
||||||
</fileNamePattern>
|
</fileNamePattern>
|
||||||
<timeBasedFileNamingAndTriggeringPolicy
|
<timeBasedFileNamingAndTriggeringPolicy
|
||||||
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
||||||
|
|||||||
@@ -67,7 +67,6 @@ public class ConsumerControllerTest extends BaseControllerTest {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws Exception {
|
public void init() throws Exception {
|
||||||
consumerService.afterPropertiesSet();
|
|
||||||
super.mockRmqConfigure();
|
super.mockRmqConfigure();
|
||||||
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
|
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
|
||||||
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
|
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
|
||||||
@@ -94,10 +93,9 @@ public class ConsumerControllerTest extends BaseControllerTest {
|
|||||||
perform = mockMvc.perform(requestBuilder);
|
perform = mockMvc.perform(requestBuilder);
|
||||||
perform.andExpect(status().isOk())
|
perform.andExpect(status().isOk())
|
||||||
.andExpect(jsonPath("$.data", hasSize(2)))
|
.andExpect(jsonPath("$.data", hasSize(2)))
|
||||||
|
.andExpect(jsonPath("$.data[0].group").value("group_test"))
|
||||||
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
|
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
|
||||||
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
|
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
|
||||||
// executorService shutdown
|
|
||||||
consumerService.destroy();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -54,7 +54,6 @@ import org.springframework.http.MediaType;
|
|||||||
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
@@ -136,9 +135,8 @@ public class TopicControllerTest extends BaseControllerTest {
|
|||||||
// 3、filter system topic
|
// 3、filter system topic
|
||||||
requestBuilder = MockMvcRequestBuilders.get(url);
|
requestBuilder = MockMvcRequestBuilders.get(url);
|
||||||
perform = mockMvc.perform(requestBuilder);
|
perform = mockMvc.perform(requestBuilder);
|
||||||
String[] topicString = {"%SYS%system_topic2","common_topic2","%SYS%system_topic1","common_topic1"};
|
|
||||||
perform.andExpect(status().isOk())
|
perform.andExpect(status().isOk())
|
||||||
.andExpect(jsonPath("$.data.topicList").value(containsInAnyOrder(topicString)));
|
.andExpect(jsonPath("$.data.topicList[2]").value("%SYS%system_topic1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user