diff --git a/pom.xml b/pom.xml index 171169c..f547b7f 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ 2.2.2.RELEASE 3.3.3 2.3.1 + 2.4.3 @@ -218,6 +219,11 @@ ${mockito-inline.version} test + + org.apache.commons + commons-pool2 + ${commons-pool2.version} + @@ -251,7 +257,7 @@ - + com.spotify docker-maven-plugin @@ -331,7 +337,7 @@ - + org.apache.rat apache-rat-plugin 0.12 diff --git a/src/main/java/org/apache/rocketmq/dashboard/App.java b/src/main/java/org/apache/rocketmq/dashboard/App.java index 7761ce3..9e88277 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/App.java +++ b/src/main/java/org/apache/rocketmq/dashboard/App.java @@ -19,11 +19,14 @@ package org.apache.rocketmq.dashboard; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; +import org.springframework.context.annotation.EnableMBeanExport; +import org.springframework.jmx.support.RegistrationPolicy; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling @ServletComponentScan +@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING) public class App { public static void main(String[] args) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java new file mode 100644 index 0000000..8bbf19d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.admin; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminExt; + +@Slf4j +public class MQAdminFactory { + private RMQConfigure rmqConfigure; + + public MQAdminFactory(RMQConfigure rmqConfigure) { + this.rmqConfigure = rmqConfigure; + } + + public MQAdminExt getInstance() throws Exception { + RPCHook rpcHook = null; + final String accessKey = rmqConfigure.getAccessKey(); + final String secretKey = rmqConfigure.getSecretKey(); + boolean isEnableAcl = StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey); + if (isEnableAcl) { + rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + } + DefaultMQAdminExt mqAdminExt = null; + if (rmqConfigure.getTimeoutMillis() == null) { + mqAdminExt = new DefaultMQAdminExt(rpcHook); + } else { + mqAdminExt = new DefaultMQAdminExt(rpcHook, rmqConfigure.getTimeoutMillis()); + } + mqAdminExt.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel())); + mqAdminExt.setUseTLS(rmqConfigure.isUseTLS()); + mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + mqAdminExt.start(); + log.info("create MQAdmin instance {} success.", mqAdminExt); + return mqAdminExt; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java new file mode 100644 index 0000000..b68f931 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.admin; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.PooledObjectFactory; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.tools.admin.MQAdminExt; + +@Slf4j +public class MQAdminPooledObjectFactory implements PooledObjectFactory { + + private MQAdminFactory mqAdminFactory; + + @Override + public PooledObject makeObject() throws Exception { + DefaultPooledObject pooledObject = new DefaultPooledObject<>( + mqAdminFactory.getInstance()); + return pooledObject; + } + + @Override + public void destroyObject(PooledObject p) { + MQAdminExt mqAdmin = p.getObject(); + if (mqAdmin != null) { + try { + mqAdmin.shutdown(); + } catch (Exception e) { + log.warn("MQAdminExt shutdown err", e); + } + } + log.info("destroy object {}", p.getObject()); + } + + @Override + public boolean validateObject(PooledObject p) { + MQAdminExt mqAdmin = p.getObject(); + ClusterInfo clusterInfo = null; + try { + clusterInfo = mqAdmin.examineBrokerClusterInfo(); + } catch (Exception e) { + log.warn("validate object {} err", p.getObject(), e); + } + if (clusterInfo == null || MapUtils.isEmpty(clusterInfo.getBrokerAddrTable())) { + log.warn("validateObject failed, clusterInfo = {}", clusterInfo); + return false; + } + return true; + } + + @Override + public void activateObject(PooledObject p) { + } + + @Override + public void passivateObject(PooledObject p) { + } + + public void setMqAdminFactory(MQAdminFactory mqAdminFactory) { + this.mqAdminFactory = mqAdminFactory; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MqAdminExtObjectPool.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MqAdminExtObjectPool.java new file mode 100644 index 0000000..976c009 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MqAdminExtObjectPool.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.admin; + +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MqAdminExtObjectPool { + + @Autowired + private RMQConfigure rmqConfigure; + + @Bean + public GenericObjectPool mqAdminExtPool() { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setTestWhileIdle(true); + genericObjectPoolConfig.setMaxWaitMillis(10000); + genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(20000); + MQAdminPooledObjectFactory mqAdminPooledObjectFactory = new MQAdminPooledObjectFactory(); + MQAdminFactory mqAdminFactory = new MQAdminFactory(rmqConfigure); + mqAdminPooledObjectFactory.setMqAdminFactory(mqAdminFactory); + GenericObjectPool genericObjectPool = new GenericObjectPool( + mqAdminPooledObjectFactory, + genericObjectPoolConfig); + return genericObjectPool; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java b/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java index f1d7b18..56f37f6 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java +++ b/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java @@ -16,28 +16,24 @@ */ package org.apache.rocketmq.dashboard.aspect.admin; -import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod; -import org.apache.rocketmq.dashboard.config.RMQConfigure; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.rocketmq.dashboard.service.client.MQAdminInstance; +import org.apache.rocketmq.tools.admin.MQAdminExt; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; -import org.aspectj.lang.reflect.MethodSignature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.lang.reflect.Method; - @Aspect @Service +@Slf4j public class MQAdminAspect { - private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class); @Autowired - private RMQConfigure rmqConfigure; + private GenericObjectPool mqAdminExtPool; public MQAdminAspect() { } @@ -47,30 +43,16 @@ public class MQAdminAspect { } - @Pointcut("@annotation(org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod)") - public void multiMQAdminMethodPointCut() { - - } - - @Around(value = "mQAdminMethodPointCut() || multiMQAdminMethodPointCut()") + @Around(value = "mQAdminMethodPointCut()") public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable { long start = System.currentTimeMillis(); Object obj = null; try { - MethodSignature signature = (MethodSignature)joinPoint.getSignature(); - Method method = signature.getMethod(); - MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class); - if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) { - MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS()); - } - else { - MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS()); - } + MQAdminInstance.createMQAdmin(mqAdminExtPool); obj = joinPoint.proceed(); - } - finally { - MQAdminInstance.destroyMQAdminInstance(); - logger.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start); + } finally { + MQAdminInstance.returnMQAdmin(mqAdminExtPool); + log.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start); } return obj; } diff --git a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/annotation/MultiMQAdminCmdMethod.java b/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/annotation/MultiMQAdminCmdMethod.java deleted file mode 100644 index d62f167..0000000 --- a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/annotation/MultiMQAdminCmdMethod.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.dashboard.aspect.admin.annotation; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Target({ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -public @interface MultiMQAdminCmdMethod { - long timeoutMillis() default 0; -} diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java index 09e3c71..43a7a2b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java +++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java @@ -58,6 +58,8 @@ public class RMQConfigure { private boolean useTLS = false; + private Long timeoutMillis; + public String getAccessKey() { return accessKey; } @@ -148,6 +150,14 @@ public class RMQConfigure { this.useTLS = useTLS; } + public Long getTimeoutMillis() { + return timeoutMillis; + } + + public void setTimeoutMillis(Long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + // Error Page process logic, move to a central configure later @Bean public ErrorPageRegistrar errorPageRegistrar() { diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java index 373db5b..fbcb7c6 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java @@ -51,8 +51,7 @@ public class TopicController { @RequestMapping(value = "/list.query", method = RequestMethod.GET) @ResponseBody - public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) - throws MQClientException, RemotingException, InterruptedException { + public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) { boolean flag = false; if ("true".equals(skipSysProcess)) { flag = true; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java index 6f40db0..8d9ae4f 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java @@ -16,29 +16,27 @@ */ package org.apache.rocketmq.dashboard.service.client; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.acl.common.AclClientRPCHook; -import org.apache.rocketmq.acl.common.SessionCredentials; -import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.joor.Reflect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MQAdminInstance { - private static final ThreadLocal MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal(); - private static final ThreadLocal INIT_COUNTER = new ThreadLocal(); + + private static final Logger LOGGER = LoggerFactory.getLogger(MQAdminInstance.class); + private static final ThreadLocal MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<>(); public static MQAdminExt threadLocalMQAdminExt() { - DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); - if (defaultMQAdminExt == null) { + MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); + if (mqAdminExt == null) { throw new IllegalStateException("defaultMQAdminExt should be init before you get this"); } - return defaultMQAdminExt; + return mqAdminExt; } public static RemotingClient threadLocalRemotingClient() { @@ -51,44 +49,27 @@ public class MQAdminInstance { DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl"); return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance"); } - public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey, boolean useTLS) throws MQClientException { - Integer nowCount = INIT_COUNTER.get(); - if (nowCount == null) { - RPCHook rpcHook = null; - boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey); - if (isEnableAcl) { - rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); - } - DefaultMQAdminExt defaultMQAdminExt; - if (timeoutMillis > 0) { - defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis); - } - else { - defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); - } - defaultMQAdminExt.setUseTLS(useTLS); - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - defaultMQAdminExt.start(); - MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt); - INIT_COUNTER.set(1); - } - else { - INIT_COUNTER.set(nowCount + 1); - } + public static void createMQAdmin(GenericObjectPool mqAdminExtPool) { + try { + // Get the mqAdmin instance from the object pool + MQAdminExt mqAdminExt = mqAdminExtPool.borrowObject(); + MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt); + } catch (Exception e) { + LOGGER.error("get mqAdmin from pool error", e); + } } - public static void destroyMQAdminInstance() { - Integer nowCount = INIT_COUNTER.get() - 1; - if (nowCount > 0) { - INIT_COUNTER.set(nowCount); - return; - } + public static void returnMQAdmin(GenericObjectPool mqAdminExtPool) { MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); if (mqAdminExt != null) { - mqAdminExt.shutdown(); - MQ_ADMIN_EXT_THREAD_LOCAL.remove(); - INIT_COUNTER.remove(); + try { + // After execution, return the mqAdmin instance to the object pool + mqAdminExtPool.returnObject(mqAdminExt); + } catch (Exception e) { + LOGGER.error("return mqAdmin to pool error", e); + } } + MQ_ADMIN_EXT_THREAD_LOCAL.remove(); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 4e69e83..37073d7 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -43,7 +43,6 @@ import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.QueueStatInfo; @@ -64,7 +63,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); @Override - @MultiMQAdminCmdMethod public List queryGroupList() { Set consumerGroupSet = Sets.newHashSet(); try { @@ -86,7 +84,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public GroupConsumeInfo queryGroup(String consumerGroup) { GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); try { @@ -133,7 +130,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public List queryConsumeStatsList(final String topic, String groupName) { ConsumeStats consumeStats = null; try { @@ -184,7 +180,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public Map queryConsumeStatsListByTopicName(String topic) { Map group2ConsumerInfoMap = Maps.newHashMap(); try { @@ -206,7 +201,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public Map resetOffset(ResetOffsetRequest resetOffsetRequest) { Map groupRollbackStats = Maps.newHashMap(); for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) { @@ -251,7 +245,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public List examineSubscriptionGroupConfig(String group) { List consumerConfigInfoList = Lists.newArrayList(); try { @@ -272,7 +265,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) { try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); @@ -303,7 +295,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - @MultiMQAdminCmdMethod public Set fetchBrokerNameSetBySubscriptionGroup(String group) { Set brokerNameSet = Sets.newHashSet(); try { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java index 7b56220..2d98e12 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java @@ -21,11 +21,14 @@ import com.google.common.collect.Maps; import java.util.List; import java.util.Map; import javax.annotation.Resource; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.OpsService; import org.apache.rocketmq.dashboard.service.checker.CheckerType; import org.apache.rocketmq.dashboard.service.checker.RocketMqChecker; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @@ -34,6 +37,9 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService @Resource private RMQConfigure configure; + @Autowired + private GenericObjectPool mqAdminExtPool; + @Resource private List rocketMqCheckerList; @@ -49,6 +55,8 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService @Override public void updateNameSvrAddrList(String nameSvrAddrList) { configure.setNamesrvAddr(nameSvrAddrList); + // when update namesrvAddr, clean the mqAdminExt objects pool. + mqAdminExtPool.clear(); } @Override @@ -67,12 +75,14 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService @Override public boolean updateIsVIPChannel(String useVIPChannel) { configure.setIsVIPChannel(useVIPChannel); + mqAdminExtPool.clear(); return true; } @Override public boolean updateUseTLS(boolean useTLS) { configure.setUseTLS(useTLS); + mqAdminExtPool.clear(); return true; } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java index 2e00223..c09568d 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java @@ -24,7 +24,6 @@ import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; @@ -69,12 +68,11 @@ public class DashboardCollectTask { private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class); @Scheduled(cron = "30 0/1 * * * ?") - @MultiMQAdminCmdMethod(timeoutMillis = 5000) public void collectTopic() { if (!rmqConfigure.isEnableDashBoardCollect()) { return; } - + Date date = new Date(); Stopwatch stopwatch = Stopwatch.createUnstarted(); try { @@ -83,8 +81,8 @@ public class DashboardCollectTask { this.addSystemTopic(); for (String topic : topicSet) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) - || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX) - || TopicValidator.isSystemTopic(topic)) { + || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX) + || TopicValidator.isSystemTopic(topic)) { continue; } TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic); @@ -323,7 +321,7 @@ public class DashboardCollectTask { } return newTpsList; } - + private void addSystemTopic() throws Exception { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); HashMap> clusterTable = clusterInfo.getClusterAddrTable(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0ad1228..335ac69 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -35,6 +35,8 @@ logging.config=classpath:logback.xml rocketmq.config.namesrvAddr= #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= +#timeout for mqadminExt, default 5000ms +rocketmq.config.timeoutMillis= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java index 9874a09..a76dc6c 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java @@ -19,13 +19,16 @@ package org.apache.rocketmq.dashboard.admin; import java.lang.reflect.Field; import java.lang.reflect.Method; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.rocketmq.dashboard.aspect.admin.MQAdminAspect; -import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod; -import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminExt; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.reflect.MethodSignature; import org.junit.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,19 +40,20 @@ public class MQAdminAspectTest { ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class); MethodSignature signature = mock(MethodSignature.class); Method method = mock(Method.class); - MultiMQAdminCmdMethod annotationValue = mock(MultiMQAdminCmdMethod.class); - when(annotationValue.timeoutMillis()).thenReturn(0L).thenReturn(3000L); - when(method.getAnnotation(MultiMQAdminCmdMethod.class)).thenReturn(annotationValue); when(signature.getMethod()).thenReturn(method); when(joinPoint.getSignature()).thenReturn(signature); - RMQConfigure rmqConfigure = mock(RMQConfigure.class); - when(rmqConfigure.getAccessKey()).thenReturn("rocketmq"); - when(rmqConfigure.getSecretKey()).thenReturn("12345678"); - Field field = mqAdminAspect.getClass().getDeclaredField("rmqConfigure"); + GenericObjectPool mqAdminExtPool = mock(GenericObjectPool.class); + when(mqAdminExtPool.borrowObject()) + .thenThrow(new RuntimeException("borrowObject exception")) + .thenReturn(new DefaultMQAdminExt()); + doNothing().doThrow(new RuntimeException("returnObject exception")) + .when(mqAdminExtPool).returnObject(any()); + Field field = mqAdminAspect.getClass().getDeclaredField("mqAdminExtPool"); field.setAccessible(true); - field.set(mqAdminAspect, rmqConfigure); - + field.set(mqAdminAspect, mqAdminExtPool); + // exception + mqAdminAspect.aroundMQAdminMethod(joinPoint); mqAdminAspect.aroundMQAdminMethod(joinPoint); } } diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java index 32a82bd..28a898d 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java @@ -65,7 +65,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; import org.apache.rocketmq.tools.admin.api.MessageTrack; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -121,24 +120,12 @@ public class MQAdminExtImplTest { defaultMQAdminExt = mock(DefaultMQAdminExt.class); threadLocal.set(defaultMQAdminExt); - field = MQAdminInstance.class.getDeclaredField("INIT_COUNTER"); - field.setAccessible(true); - object = field.get(mqAdminExtImpl); - assertNotNull(object); - ThreadLocal threadLocal1 = (ThreadLocal) object; - threadLocal1.set(1); - ReflectionTestUtils.setField(defaultMQAdminExt, "defaultMQAdminExtImpl", defaultMQAdminExtImpl); ReflectionTestUtils.setField(defaultMQAdminExtImpl, "mqClientInstance", mqClientInstance); ReflectionTestUtils.setField(mqClientInstance, "mQClientAPIImpl", mQClientAPIImpl); ReflectionTestUtils.setField(mQClientAPIImpl, "remotingClient", remotingClient); } - @After - public void destroy() { - MQAdminInstance.destroyMQAdminInstance(); - } - @Test public void testUpdateBrokerConfig() throws Exception { assertNotNull(mqAdminExtImpl); diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java new file mode 100644 index 0000000..6859927 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.admin; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.dashboard.util.MockObjectUtil; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.util.ReflectionTestUtils; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MQAdminPoolTest { + + private MqAdminExtObjectPool mqAdminExtObjectPool; + + private GenericObjectPool pool; + + private MQAdminPooledObjectFactory mqAdminPooledObjectFactory; + + @Before + public void init() { + mqAdminExtObjectPool = new MqAdminExtObjectPool(); + RMQConfigure rmqConfigure = mock(RMQConfigure.class); + when(rmqConfigure.getNamesrvAddr()).thenReturn("127.0.0.1:9876"); + when(rmqConfigure.getAccessKey()).thenReturn("rocketmq"); + when(rmqConfigure.getSecretKey()).thenReturn("12345678"); + ReflectionTestUtils.setField(mqAdminExtObjectPool, "rmqConfigure", rmqConfigure); + pool = mqAdminExtObjectPool.mqAdminExtPool(); + mqAdminPooledObjectFactory = (MQAdminPooledObjectFactory) pool.getFactory(); + } + + @Test + public void testMakeObject() throws Exception { + PooledObject mqAdmin = mqAdminPooledObjectFactory.makeObject(); + Assert.assertNotNull(mqAdmin); + } + + @Test + public void testDestroyObject() { + PooledObject mqAdmin = mock(PooledObject.class); + Assert.assertNotNull(mqAdmin); + MQAdminExt mqAdminExt = mock(MQAdminExt.class); + doNothing().doThrow(new RuntimeException("shutdown exception")).when(mqAdminExt).shutdown(); + when(mqAdmin.getObject()).thenReturn(mqAdminExt); + // shutdown + mqAdminPooledObjectFactory.destroyObject(mqAdmin); + // exception + mqAdminPooledObjectFactory.destroyObject(mqAdmin); + } + + @Test + public void testValidateObject() throws Exception { + PooledObject mqAdmin = mock(PooledObject.class); + Assert.assertNotNull(mqAdmin); + MQAdminExt mqAdminExt = mock(MQAdminExt.class); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); + clusterInfo.getBrokerAddrTable().clear(); + when(mqAdminExt.examineBrokerClusterInfo()) + .thenThrow(new RuntimeException("examineBrokerClusterInfo exception")) + .thenReturn(null) + .thenReturn(new ClusterInfo()) + .thenReturn(clusterInfo) + .thenReturn(MockObjectUtil.createClusterInfo()); + when(mqAdmin.getObject()).thenReturn(mqAdminExt); + // exception + Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin)); + // clusterInfo == null + Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin)); + // clusterInfo.getBrokerAddrTable() == null + Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin)); + // clusterInfo.getBrokerAddrTable().size() <= 0 + Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin)); + // pass validate + Assert.assertTrue(mqAdminPooledObjectFactory.validateObject(mqAdmin)); + + } +} diff --git a/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java b/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java index 462e648..56c48e3 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java @@ -40,6 +40,7 @@ public class RMQConfigureTest { rmqConfigure.setLoginRequired(true); rmqConfigure.setMsgTrackTopicName(null); rmqConfigure.setNamesrvAddr("127.0.0.1:9876"); + rmqConfigure.setTimeoutMillis(3000L); } @Test @@ -56,6 +57,7 @@ public class RMQConfigureTest { Assert.assertTrue(rmqConfigure.isLoginRequired()); Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC); Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876"); + Assert.assertEquals(rmqConfigure.getTimeoutMillis().longValue(), 3000L); ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar(); registrar.registerErrorPages(new ErrorPageRegistry() { @Override diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java index 37edbfb..f864755 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java @@ -19,14 +19,17 @@ package org.apache.rocketmq.dashboard.controller; import java.util.ArrayList; import java.util.List; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.rocketmq.dashboard.service.checker.RocketMqChecker; import org.apache.rocketmq.dashboard.service.checker.impl.ClusterHealthCheckerImpl; import org.apache.rocketmq.dashboard.service.checker.impl.TopicOnlyOneBrokerCheckerImpl; import org.apache.rocketmq.dashboard.service.impl.OpsServiceImpl; +import org.apache.rocketmq.tools.admin.MQAdminExt; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.Spy; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; @@ -44,6 +47,9 @@ public class OpsControllerTest extends BaseControllerTest { @Spy private OpsServiceImpl opsService; + @Mock + private GenericObjectPool mqAdminExtPool; + @Before public void init() { super.mockRmqConfigure(); @@ -67,6 +73,7 @@ public class OpsControllerTest extends BaseControllerTest { final String url = "/ops/updateNameSvrAddr.do"; { doNothing().when(configure).setNamesrvAddr(anyString()); + doNothing().when(mqAdminExtPool).clear(); } requestBuilder = MockMvcRequestBuilders.post(url); requestBuilder.param("nameSvrAddrList", "127.0.0.1:9876"); @@ -89,6 +96,20 @@ public class OpsControllerTest extends BaseControllerTest { .andExpect(jsonPath("$.data").value(true)); } + + @Test + public void testUpdateUseTLS() throws Exception { + final String url = "/ops/updateUseTLS.do"; + { + doNothing().when(configure).setUseTLS(true); + } + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.param("useTLS", "true"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data").value(true)); + } + @Test public void testClusterStatus() throws Exception { final String url = "/ops/rocketMqStatus.query";