[ISSUE #16]Use an object pool to manage DefaultMQAdminExt objects. (#17)

* [ISSUE #16]Use an object pool to manage DefaultMQAdminExt objects.

* Modify the code

* Optimize the code

Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
zhangjidi2016
2021-09-09 00:13:44 +08:00
committed by GitHub
parent 5b2a027cd8
commit 975449e268
19 changed files with 393 additions and 146 deletions

View File

@@ -19,11 +19,14 @@ package org.apache.rocketmq.dashboard;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.EnableMBeanExport;
import org.springframework.jmx.support.RegistrationPolicy;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@ServletComponentScan
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
public class App {
public static void main(String[] args) {

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@Slf4j
public class MQAdminFactory {
private RMQConfigure rmqConfigure;
public MQAdminFactory(RMQConfigure rmqConfigure) {
this.rmqConfigure = rmqConfigure;
}
public MQAdminExt getInstance() throws Exception {
RPCHook rpcHook = null;
final String accessKey = rmqConfigure.getAccessKey();
final String secretKey = rmqConfigure.getSecretKey();
boolean isEnableAcl = StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey);
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
DefaultMQAdminExt mqAdminExt = null;
if (rmqConfigure.getTimeoutMillis() == null) {
mqAdminExt = new DefaultMQAdminExt(rpcHook);
} else {
mqAdminExt = new DefaultMQAdminExt(rpcHook, rmqConfigure.getTimeoutMillis());
}
mqAdminExt.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel()));
mqAdminExt.setUseTLS(rmqConfigure.isUseTLS());
mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
mqAdminExt.start();
log.info("create MQAdmin instance {} success.", mqAdminExt);
return mqAdminExt;
}
}

View File

@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@Slf4j
public class MQAdminPooledObjectFactory implements PooledObjectFactory<MQAdminExt> {
private MQAdminFactory mqAdminFactory;
@Override
public PooledObject<MQAdminExt> makeObject() throws Exception {
DefaultPooledObject<MQAdminExt> pooledObject = new DefaultPooledObject<>(
mqAdminFactory.getInstance());
return pooledObject;
}
@Override
public void destroyObject(PooledObject<MQAdminExt> p) {
MQAdminExt mqAdmin = p.getObject();
if (mqAdmin != null) {
try {
mqAdmin.shutdown();
} catch (Exception e) {
log.warn("MQAdminExt shutdown err", e);
}
}
log.info("destroy object {}", p.getObject());
}
@Override
public boolean validateObject(PooledObject<MQAdminExt> p) {
MQAdminExt mqAdmin = p.getObject();
ClusterInfo clusterInfo = null;
try {
clusterInfo = mqAdmin.examineBrokerClusterInfo();
} catch (Exception e) {
log.warn("validate object {} err", p.getObject(), e);
}
if (clusterInfo == null || MapUtils.isEmpty(clusterInfo.getBrokerAddrTable())) {
log.warn("validateObject failed, clusterInfo = {}", clusterInfo);
return false;
}
return true;
}
@Override
public void activateObject(PooledObject<MQAdminExt> p) {
}
@Override
public void passivateObject(PooledObject<MQAdminExt> p) {
}
public void setMqAdminFactory(MQAdminFactory mqAdminFactory) {
this.mqAdminFactory = mqAdminFactory;
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqAdminExtObjectPool {
@Autowired
private RMQConfigure rmqConfigure;
@Bean
public GenericObjectPool<MQAdminExt> mqAdminExtPool() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestWhileIdle(true);
genericObjectPoolConfig.setMaxWaitMillis(10000);
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(20000);
MQAdminPooledObjectFactory mqAdminPooledObjectFactory = new MQAdminPooledObjectFactory();
MQAdminFactory mqAdminFactory = new MQAdminFactory(rmqConfigure);
mqAdminPooledObjectFactory.setMqAdminFactory(mqAdminFactory);
GenericObjectPool<MQAdminExt> genericObjectPool = new GenericObjectPool<MQAdminExt>(
mqAdminPooledObjectFactory,
genericObjectPoolConfig);
return genericObjectPool;
}
}

View File

@@ -16,28 +16,24 @@
*/
package org.apache.rocketmq.dashboard.aspect.admin;
import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.service.client.MQAdminInstance;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Method;
@Aspect
@Service
@Slf4j
public class MQAdminAspect {
private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
@Autowired
private RMQConfigure rmqConfigure;
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
public MQAdminAspect() {
}
@@ -47,30 +43,16 @@ public class MQAdminAspect {
}
@Pointcut("@annotation(org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod)")
public void multiMQAdminMethodPointCut() {
}
@Around(value = "mQAdminMethodPointCut() || multiMQAdminMethodPointCut()")
@Around(value = "mQAdminMethodPointCut()")
public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
Object obj = null;
try {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS());
}
else {
MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS());
}
MQAdminInstance.createMQAdmin(mqAdminExtPool);
obj = joinPoint.proceed();
}
finally {
MQAdminInstance.destroyMQAdminInstance();
logger.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
} finally {
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
log.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
}
return obj;
}

View File

@@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.aspect.admin.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiMQAdminCmdMethod {
long timeoutMillis() default 0;
}

View File

@@ -58,6 +58,8 @@ public class RMQConfigure {
private boolean useTLS = false;
private Long timeoutMillis;
public String getAccessKey() {
return accessKey;
}
@@ -148,6 +150,14 @@ public class RMQConfigure {
this.useTLS = useTLS;
}
public Long getTimeoutMillis() {
return timeoutMillis;
}
public void setTimeoutMillis(Long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
// Error Page process logic, move to a central configure later
@Bean
public ErrorPageRegistrar errorPageRegistrar() {

View File

@@ -51,8 +51,7 @@ public class TopicController {
@RequestMapping(value = "/list.query", method = RequestMethod.GET)
@ResponseBody
public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess)
throws MQClientException, RemotingException, InterruptedException {
public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) {
boolean flag = false;
if ("true".equals(skipSysProcess)) {
flag = true;

View File

@@ -16,29 +16,27 @@
*/
package org.apache.rocketmq.dashboard.service.client;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQAdminInstance {
private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>();
private static final Logger LOGGER = LoggerFactory.getLogger(MQAdminInstance.class);
private static final ThreadLocal<MQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<>();
public static MQAdminExt threadLocalMQAdminExt() {
DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (defaultMQAdminExt == null) {
MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (mqAdminExt == null) {
throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
}
return defaultMQAdminExt;
return mqAdminExt;
}
public static RemotingClient threadLocalRemotingClient() {
@@ -51,44 +49,27 @@ public class MQAdminInstance {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey, boolean useTLS) throws MQClientException {
Integer nowCount = INIT_COUNTER.get();
if (nowCount == null) {
RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey);
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
DefaultMQAdminExt defaultMQAdminExt;
if (timeoutMillis > 0) {
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis);
}
else {
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
}
defaultMQAdminExt.setUseTLS(useTLS);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.start();
MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
INIT_COUNTER.set(1);
}
else {
INIT_COUNTER.set(nowCount + 1);
}
public static void createMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
try {
// Get the mqAdmin instance from the object pool
MQAdminExt mqAdminExt = mqAdminExtPool.borrowObject();
MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt);
} catch (Exception e) {
LOGGER.error("get mqAdmin from pool error", e);
}
}
public static void destroyMQAdminInstance() {
Integer nowCount = INIT_COUNTER.get() - 1;
if (nowCount > 0) {
INIT_COUNTER.set(nowCount);
return;
}
public static void returnMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (mqAdminExt != null) {
mqAdminExt.shutdown();
MQ_ADMIN_EXT_THREAD_LOCAL.remove();
INIT_COUNTER.remove();
try {
// After execution, return the mqAdmin instance to the object pool
mqAdminExtPool.returnObject(mqAdminExt);
} catch (Exception e) {
LOGGER.error("return mqAdmin to pool error", e);
}
}
MQ_ADMIN_EXT_THREAD_LOCAL.remove();
}
}

View File

@@ -43,7 +43,6 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
@@ -64,7 +63,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Override
@MultiMQAdminCmdMethod
public List<GroupConsumeInfo> queryGroupList() {
Set<String> consumerGroupSet = Sets.newHashSet();
try {
@@ -86,7 +84,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public GroupConsumeInfo queryGroup(String consumerGroup) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try {
@@ -133,7 +130,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) {
ConsumeStats consumeStats = null;
try {
@@ -184,7 +180,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public Map<String /*groupName*/, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic) {
Map<String, TopicConsumerInfo> group2ConsumerInfoMap = Maps.newHashMap();
try {
@@ -206,7 +201,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) {
Map<String, ConsumerGroupRollBackStat> groupRollbackStats = Maps.newHashMap();
for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) {
@@ -251,7 +245,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) {
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
try {
@@ -272,7 +265,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
@@ -303,7 +295,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
@MultiMQAdminCmdMethod
public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
Set<String> brokerNameSet = Sets.newHashSet();
try {

View File

@@ -21,11 +21,14 @@ import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.OpsService;
import org.apache.rocketmq.dashboard.service.checker.CheckerType;
import org.apache.rocketmq.dashboard.service.checker.RocketMqChecker;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@@ -34,6 +37,9 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
@Resource
private RMQConfigure configure;
@Autowired
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
@Resource
private List<RocketMqChecker> rocketMqCheckerList;
@@ -49,6 +55,8 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
@Override
public void updateNameSvrAddrList(String nameSvrAddrList) {
configure.setNamesrvAddr(nameSvrAddrList);
// when update namesrvAddr, clean the mqAdminExt objects pool.
mqAdminExtPool.clear();
}
@Override
@@ -67,12 +75,14 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
@Override public boolean updateIsVIPChannel(String useVIPChannel) {
configure.setIsVIPChannel(useVIPChannel);
mqAdminExtPool.clear();
return true;
}
@Override
public boolean updateUseTLS(boolean useTLS) {
configure.setUseTLS(useTLS);
mqAdminExtPool.clear();
return true;
}
}

View File

@@ -24,7 +24,6 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
@@ -69,12 +68,11 @@ public class DashboardCollectTask {
private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class);
@Scheduled(cron = "30 0/1 * * * ?")
@MultiMQAdminCmdMethod(timeoutMillis = 5000)
public void collectTopic() {
if (!rmqConfigure.isEnableDashBoardCollect()) {
return;
}
Date date = new Date();
Stopwatch stopwatch = Stopwatch.createUnstarted();
try {
@@ -83,8 +81,8 @@ public class DashboardCollectTask {
this.addSystemTopic();
for (String topic : topicSet) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
|| TopicValidator.isSystemTopic(topic)) {
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
|| TopicValidator.isSystemTopic(topic)) {
continue;
}
TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
@@ -323,7 +321,7 @@ public class DashboardCollectTask {
}
return newTpsList;
}
private void addSystemTopic() throws Exception {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable();

View File

@@ -35,6 +35,8 @@ logging.config=classpath:logback.xml
rocketmq.config.namesrvAddr=
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#timeout for mqadminExt, default 5000ms
rocketmq.config.timeoutMillis=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true

View File

@@ -19,13 +19,16 @@ package org.apache.rocketmq.dashboard.admin;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.aspect.admin.MQAdminAspect;
import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,19 +40,20 @@ public class MQAdminAspectTest {
ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
MethodSignature signature = mock(MethodSignature.class);
Method method = mock(Method.class);
MultiMQAdminCmdMethod annotationValue = mock(MultiMQAdminCmdMethod.class);
when(annotationValue.timeoutMillis()).thenReturn(0L).thenReturn(3000L);
when(method.getAnnotation(MultiMQAdminCmdMethod.class)).thenReturn(annotationValue);
when(signature.getMethod()).thenReturn(method);
when(joinPoint.getSignature()).thenReturn(signature);
RMQConfigure rmqConfigure = mock(RMQConfigure.class);
when(rmqConfigure.getAccessKey()).thenReturn("rocketmq");
when(rmqConfigure.getSecretKey()).thenReturn("12345678");
Field field = mqAdminAspect.getClass().getDeclaredField("rmqConfigure");
GenericObjectPool<MQAdminExt> mqAdminExtPool = mock(GenericObjectPool.class);
when(mqAdminExtPool.borrowObject())
.thenThrow(new RuntimeException("borrowObject exception"))
.thenReturn(new DefaultMQAdminExt());
doNothing().doThrow(new RuntimeException("returnObject exception"))
.when(mqAdminExtPool).returnObject(any());
Field field = mqAdminAspect.getClass().getDeclaredField("mqAdminExtPool");
field.setAccessible(true);
field.set(mqAdminAspect, rmqConfigure);
field.set(mqAdminAspect, mqAdminExtPool);
// exception
mqAdminAspect.aroundMQAdminMethod(joinPoint);
mqAdminAspect.aroundMQAdminMethod(joinPoint);
}
}

View File

@@ -65,7 +65,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -121,24 +120,12 @@ public class MQAdminExtImplTest {
defaultMQAdminExt = mock(DefaultMQAdminExt.class);
threadLocal.set(defaultMQAdminExt);
field = MQAdminInstance.class.getDeclaredField("INIT_COUNTER");
field.setAccessible(true);
object = field.get(mqAdminExtImpl);
assertNotNull(object);
ThreadLocal<Integer> threadLocal1 = (ThreadLocal<Integer>) object;
threadLocal1.set(1);
ReflectionTestUtils.setField(defaultMQAdminExt, "defaultMQAdminExtImpl", defaultMQAdminExtImpl);
ReflectionTestUtils.setField(defaultMQAdminExtImpl, "mqClientInstance", mqClientInstance);
ReflectionTestUtils.setField(mqClientInstance, "mQClientAPIImpl", mQClientAPIImpl);
ReflectionTestUtils.setField(mQClientAPIImpl, "remotingClient", remotingClient);
}
@After
public void destroy() {
MQAdminInstance.destroyMQAdminInstance();
}
@Test
public void testUpdateBrokerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);

View File

@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.util.ReflectionTestUtils;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MQAdminPoolTest {
private MqAdminExtObjectPool mqAdminExtObjectPool;
private GenericObjectPool<MQAdminExt> pool;
private MQAdminPooledObjectFactory mqAdminPooledObjectFactory;
@Before
public void init() {
mqAdminExtObjectPool = new MqAdminExtObjectPool();
RMQConfigure rmqConfigure = mock(RMQConfigure.class);
when(rmqConfigure.getNamesrvAddr()).thenReturn("127.0.0.1:9876");
when(rmqConfigure.getAccessKey()).thenReturn("rocketmq");
when(rmqConfigure.getSecretKey()).thenReturn("12345678");
ReflectionTestUtils.setField(mqAdminExtObjectPool, "rmqConfigure", rmqConfigure);
pool = mqAdminExtObjectPool.mqAdminExtPool();
mqAdminPooledObjectFactory = (MQAdminPooledObjectFactory) pool.getFactory();
}
@Test
public void testMakeObject() throws Exception {
PooledObject<MQAdminExt> mqAdmin = mqAdminPooledObjectFactory.makeObject();
Assert.assertNotNull(mqAdmin);
}
@Test
public void testDestroyObject() {
PooledObject<MQAdminExt> mqAdmin = mock(PooledObject.class);
Assert.assertNotNull(mqAdmin);
MQAdminExt mqAdminExt = mock(MQAdminExt.class);
doNothing().doThrow(new RuntimeException("shutdown exception")).when(mqAdminExt).shutdown();
when(mqAdmin.getObject()).thenReturn(mqAdminExt);
// shutdown
mqAdminPooledObjectFactory.destroyObject(mqAdmin);
// exception
mqAdminPooledObjectFactory.destroyObject(mqAdmin);
}
@Test
public void testValidateObject() throws Exception {
PooledObject<MQAdminExt> mqAdmin = mock(PooledObject.class);
Assert.assertNotNull(mqAdmin);
MQAdminExt mqAdminExt = mock(MQAdminExt.class);
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
clusterInfo.getBrokerAddrTable().clear();
when(mqAdminExt.examineBrokerClusterInfo())
.thenThrow(new RuntimeException("examineBrokerClusterInfo exception"))
.thenReturn(null)
.thenReturn(new ClusterInfo())
.thenReturn(clusterInfo)
.thenReturn(MockObjectUtil.createClusterInfo());
when(mqAdmin.getObject()).thenReturn(mqAdminExt);
// exception
Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
// clusterInfo == null
Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
// clusterInfo.getBrokerAddrTable() == null
Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
// clusterInfo.getBrokerAddrTable().size() <= 0
Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
// pass validate
Assert.assertTrue(mqAdminPooledObjectFactory.validateObject(mqAdmin));
}
}

View File

@@ -40,6 +40,7 @@ public class RMQConfigureTest {
rmqConfigure.setLoginRequired(true);
rmqConfigure.setMsgTrackTopicName(null);
rmqConfigure.setNamesrvAddr("127.0.0.1:9876");
rmqConfigure.setTimeoutMillis(3000L);
}
@Test
@@ -56,6 +57,7 @@ public class RMQConfigureTest {
Assert.assertTrue(rmqConfigure.isLoginRequired());
Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC);
Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876");
Assert.assertEquals(rmqConfigure.getTimeoutMillis().longValue(), 3000L);
ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar();
registrar.registerErrorPages(new ErrorPageRegistry() {
@Override

View File

@@ -19,14 +19,17 @@ package org.apache.rocketmq.dashboard.controller;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.service.checker.RocketMqChecker;
import org.apache.rocketmq.dashboard.service.checker.impl.ClusterHealthCheckerImpl;
import org.apache.rocketmq.dashboard.service.checker.impl.TopicOnlyOneBrokerCheckerImpl;
import org.apache.rocketmq.dashboard.service.impl.OpsServiceImpl;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
@@ -44,6 +47,9 @@ public class OpsControllerTest extends BaseControllerTest {
@Spy
private OpsServiceImpl opsService;
@Mock
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
@Before
public void init() {
super.mockRmqConfigure();
@@ -67,6 +73,7 @@ public class OpsControllerTest extends BaseControllerTest {
final String url = "/ops/updateNameSvrAddr.do";
{
doNothing().when(configure).setNamesrvAddr(anyString());
doNothing().when(mqAdminExtPool).clear();
}
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.param("nameSvrAddrList", "127.0.0.1:9876");
@@ -89,6 +96,20 @@ public class OpsControllerTest extends BaseControllerTest {
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testUpdateUseTLS() throws Exception {
final String url = "/ops/updateUseTLS.do";
{
doNothing().when(configure).setUseTLS(true);
}
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.param("useTLS", "true");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testClusterStatus() throws Exception {
final String url = "/ops/rocketMqStatus.query";