mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2025-09-11 12:05:56 +08:00
Compare commits
4 Commits
dependabot
...
optimze_qu
Author | SHA1 | Date | |
---|---|---|---|
|
e3b769db6a | ||
|
7a54427d9c | ||
|
fc9781e6fc | ||
|
ea8834bacd |
@@ -9,3 +9,10 @@ github:
|
||||
squash: true
|
||||
merge: false
|
||||
rebase: false
|
||||
|
||||
notifications:
|
||||
commits: commits@rocketmq.apache.org
|
||||
issues: commits@rocketmq.apache.org
|
||||
pullrequests: commits@rocketmq.apache.org
|
||||
jobs: commits@rocketmq.apache.org
|
||||
discussions: dev@rocketmq.apache.org
|
||||
|
@@ -3587,11 +3587,11 @@ create-hmac@^1.1.0, create-hmac@^1.1.4, create-hmac@^1.1.7:
|
||||
sha.js "^2.4.8"
|
||||
|
||||
cross-fetch@^3.0.4:
|
||||
version "3.1.5"
|
||||
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.5.tgz#e1389f44d9e7ba767907f7af8454787952ab534f"
|
||||
integrity sha512-lvb1SBsI0Z7GDwmuid+mU3kWVBwTVUbe7S0H52yaaAdQOXq2YktTCZdlAcNKFzE6QtRz0snpw9bNiPeOIkkQvw==
|
||||
version "3.1.4"
|
||||
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.4.tgz#9723f3a3a247bf8b89039f3a380a9244e8fa2f39"
|
||||
integrity sha512-1eAtFWdIubi6T4XPy6ei9iUFoKpUkIF971QLN8lIvvvwueI65+Nw5haMNKUwfJxabqlIIDODJKGrQ66gxC0PbQ==
|
||||
dependencies:
|
||||
node-fetch "2.6.7"
|
||||
node-fetch "2.6.1"
|
||||
|
||||
cross-spawn@7.0.3, cross-spawn@^7.0.0, cross-spawn@^7.0.2:
|
||||
version "7.0.3"
|
||||
@@ -7440,12 +7440,10 @@ no-case@^3.0.4:
|
||||
lower-case "^2.0.2"
|
||||
tslib "^2.0.3"
|
||||
|
||||
node-fetch@2.6.7:
|
||||
version "2.6.7"
|
||||
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.7.tgz#24de9fba827e3b4ae44dc8b20256a379160052ad"
|
||||
integrity sha512-ZjMPFEfVx5j+y2yF35Kzx5sF7kDzxuDj6ziH4FFbOp87zKDZNx8yExJIb05OGF4Nlt9IHFIMBkRl41VdvcNdbQ==
|
||||
dependencies:
|
||||
whatwg-url "^5.0.0"
|
||||
node-fetch@2.6.1:
|
||||
version "2.6.1"
|
||||
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052"
|
||||
integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw==
|
||||
|
||||
node-forge@^0.10.0:
|
||||
version "0.10.0"
|
||||
@@ -10671,11 +10669,6 @@ tr46@^2.0.2:
|
||||
dependencies:
|
||||
punycode "^2.1.1"
|
||||
|
||||
tr46@~0.0.3:
|
||||
version "0.0.3"
|
||||
resolved "https://registry.yarnpkg.com/tr46/-/tr46-0.0.3.tgz#8184fd347dac9cdc185992f3a6622e14b9d9ab6a"
|
||||
integrity sha1-gYT9NH2snNwYWZLzpmIuFLnZq2o=
|
||||
|
||||
tryer@^1.0.1:
|
||||
version "1.0.1"
|
||||
resolved "https://registry.yarnpkg.com/tryer/-/tryer-1.0.1.tgz#f2c85406800b9b0f74c9f7465b81eaad241252f8"
|
||||
@@ -11121,11 +11114,6 @@ web-vitals@^1.0.1:
|
||||
resolved "https://registry.yarnpkg.com/web-vitals/-/web-vitals-1.1.2.tgz#06535308168986096239aa84716e68b4c6ae6d1c"
|
||||
integrity sha512-PFMKIY+bRSXlMxVAQ+m2aw9c/ioUYfDgrYot0YUa+/xa0sakubWhSDyxAKwzymvXVdF4CZI71g06W+mqhzu6ig==
|
||||
|
||||
webidl-conversions@^3.0.0:
|
||||
version "3.0.1"
|
||||
resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-3.0.1.tgz#24534275e2a7bc6be7bc86611cc16ae0a5654871"
|
||||
integrity sha1-JFNCdeKnvGvnvIZhHMFq4KVlSHE=
|
||||
|
||||
webidl-conversions@^5.0.0:
|
||||
version "5.0.0"
|
||||
resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-5.0.0.tgz#ae59c8a00b121543a2acc65c0434f57b0fc11aff"
|
||||
@@ -11272,14 +11260,6 @@ whatwg-mimetype@^2.3.0:
|
||||
resolved "https://registry.yarnpkg.com/whatwg-mimetype/-/whatwg-mimetype-2.3.0.tgz#3d4b1e0312d2079879f826aff18dbeeca5960fbf"
|
||||
integrity sha512-M4yMwr6mAnQz76TbJm914+gPpB/nCwvZbJU28cUD6dR004SAxDLOOSUaB1JDRqLtaOV/vi0IC5lEAGFgrjGv/g==
|
||||
|
||||
whatwg-url@^5.0.0:
|
||||
version "5.0.0"
|
||||
resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-5.0.0.tgz#966454e8765462e37644d3626f6742ce8b70965d"
|
||||
integrity sha1-lmRU6HZUYuN2RNNib2dCzotwll0=
|
||||
dependencies:
|
||||
tr46 "~0.0.3"
|
||||
webidl-conversions "^3.0.0"
|
||||
|
||||
whatwg-url@^8.0.0:
|
||||
version "8.4.0"
|
||||
resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-8.4.0.tgz#50fb9615b05469591d2b2bd6dfaed2942ed72837"
|
||||
|
@@ -29,6 +29,14 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
@@ -48,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
|
||||
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
|
||||
import org.apache.rocketmq.common.protocol.route.BrokerData;
|
||||
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
|
||||
import org.apache.rocketmq.common.utils.ThreadUtils;
|
||||
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
||||
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
||||
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
||||
@@ -60,12 +69,14 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
||||
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import static com.google.common.base.Throwables.propagate;
|
||||
|
||||
@Service
|
||||
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
|
||||
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
|
||||
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
|
||||
|
||||
@Resource
|
||||
@@ -73,6 +84,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
|
||||
int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
|
||||
ThreadFactory threadFactory = new ThreadFactory() {
|
||||
private final AtomicLong threadIndex = new AtomicLong(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
|
||||
}
|
||||
};
|
||||
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
|
||||
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<>(5000), threadFactory, handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static {
|
||||
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
|
||||
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
|
||||
@@ -97,10 +133,26 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
catch (Exception err) {
|
||||
throw Throwables.propagate(err);
|
||||
}
|
||||
List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
|
||||
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
|
||||
for (String consumerGroup : consumerGroupSet) {
|
||||
groupConsumeInfoList.add(queryGroup(consumerGroup));
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
|
||||
groupConsumeInfoList.add(consumeInfo);
|
||||
} catch (Exception e) {
|
||||
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
countDownLatch.await(30, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("query consumerGroup countDownLatch await Exception", e);
|
||||
}
|
||||
|
||||
if (!skipSysGroup) {
|
||||
groupConsumeInfoList.stream().map(group -> {
|
||||
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
||||
|
@@ -25,10 +25,10 @@
|
||||
|
||||
<appender name="FILE"
|
||||
class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${user.home}/logs/consolelogs/rocketmq-console.log</file>
|
||||
<file>${user.home}/logs/dashboardlogs/rocketmq-dashboard.log</file>
|
||||
<append>true</append>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${user.home}/logs/consolelogs/rocketmq-console-%d{yyyy-MM-dd}.%i.log
|
||||
<fileNamePattern>${user.home}/logs/dashboardlogs/rocketmq-dashboard-%d{yyyy-MM-dd}.%i.log
|
||||
</fileNamePattern>
|
||||
<timeBasedFileNamingAndTriggeringPolicy
|
||||
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
||||
|
@@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest {
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
consumerService.afterPropertiesSet();
|
||||
super.mockRmqConfigure();
|
||||
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
|
||||
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
|
||||
@@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest {
|
||||
perform = mockMvc.perform(requestBuilder);
|
||||
perform.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.data", hasSize(2)))
|
||||
.andExpect(jsonPath("$.data[0].group").value("group_test"))
|
||||
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
|
||||
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
|
||||
// executorService shutdown
|
||||
consumerService.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@@ -54,6 +54,7 @@ import org.springframework.http.MediaType;
|
||||
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
@@ -135,8 +136,9 @@ public class TopicControllerTest extends BaseControllerTest {
|
||||
// 3、filter system topic
|
||||
requestBuilder = MockMvcRequestBuilders.get(url);
|
||||
perform = mockMvc.perform(requestBuilder);
|
||||
String[] topicString = {"%SYS%system_topic2","common_topic2","%SYS%system_topic1","common_topic1"};
|
||||
perform.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.data.topicList[2]").value("%SYS%system_topic1"));
|
||||
.andExpect(jsonPath("$.data.topicList").value(containsInAnyOrder(topicString)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Reference in New Issue
Block a user