From 81e12923465e25f0bedd4a349ec990132bac8e57 Mon Sep 17 00:00:00 2001 From: francis lee Date: Thu, 19 Nov 2020 10:59:03 +0800 Subject: [PATCH] Update Rocketmq console 2.0.0 for support acl (#645) * add accessKey and secretKey config support * fix checkstyle * Optimized code * update acl for send messages * update for check ACL when sending message Co-authored-by: Demogorgon314 Co-authored-by: liwei5 --- .../console/aspect/admin/MQAdminAspect.java | 12 +++++-- .../rocketmq/console/config/RMQConfigure.java | 29 ++++++++++++++-- .../service/client/MQAdminInstance.java | 16 ++++++--- .../service/impl/MessageServiceImpl.java | 17 +++++++++- .../service/impl/TopicServiceImpl.java | 33 ++++++++++++++----- src/main/resources/application.properties | 6 +++- 6 files changed, 93 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java b/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java index 8c7cf06..e8f2f4c 100644 --- a/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java +++ b/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java @@ -16,8 +16,8 @@ */ package org.apache.rocketmq.console.aspect.admin; -import java.lang.reflect.Method; import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod; +import org.apache.rocketmq.console.config.RMQConfigure; import org.apache.rocketmq.console.service.client.MQAdminInstance; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -26,13 +26,19 @@ import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.lang.reflect.Method; + @Aspect @Service public class MQAdminAspect { private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class); + @Autowired + private RMQConfigure rmqConfigure; + public MQAdminAspect() { } @@ -55,10 +61,10 @@ public class MQAdminAspect { Method method = signature.getMethod(); MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class); if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) { - MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis()); + MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey()); } else { - MQAdminInstance.initMQAdminInstance(0); + MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey()); } obj = joinPoint.proceed(); } diff --git a/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java index f25ce2c..4fbffa2 100644 --- a/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java +++ b/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.console.config; -import java.io.File; - import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; import org.slf4j.Logger; @@ -30,6 +28,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; +import java.io.File; + import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY; @Configuration @@ -51,6 +51,26 @@ public class RMQConfigure { private boolean loginRequired = false; + private String accessKey; + + private String secretKey; + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + public String getNamesrvAddr() { return namesrvAddr; } @@ -62,7 +82,10 @@ public class RMQConfigure { logger.info("setNameSrvAddrByProperty nameSrvAddr={}", namesrvAddr); } } - + public boolean isACLEnabled(){ + return !(StringUtils.isAnyBlank(this.accessKey, this.secretKey)|| + StringUtils.isAnyEmpty(this.accessKey, this.secretKey)); + } public String getRocketMqConsoleDataPath() { return dataPath; } diff --git a/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java index e914e6c..3fb57d0 100644 --- a/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java +++ b/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java @@ -16,9 +16,13 @@ */ package org.apache.rocketmq.console.service.client; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; @@ -47,16 +51,20 @@ public class MQAdminInstance { DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl"); return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance"); } - - public static void initMQAdminInstance(long timeoutMillis) throws MQClientException { + public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey) throws MQClientException { Integer nowCount = INIT_COUNTER.get(); if (nowCount == null) { + RPCHook rpcHook = null; + boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey); + if (isEnableAcl) { + rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + } DefaultMQAdminExt defaultMQAdminExt; if (timeoutMillis > 0) { - defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis); + defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis); } else { - defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); } defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.start(); diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java index fb41634..34d3994 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.console.service.impl; + import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Throwables; @@ -29,6 +30,8 @@ import java.util.List; import java.util.Set; import javax.annotation.Resource; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.MixAll; @@ -38,19 +41,25 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.console.config.RMQConfigure; import org.apache.rocketmq.console.exception.ServiceException; import org.apache.rocketmq.console.model.MessageView; import org.apache.rocketmq.console.service.MessageService; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageServiceImpl implements MessageService { private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class); + + @Autowired + private RMQConfigure configure; /** * @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64; * @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); @@ -59,6 +68,7 @@ public class MessageServiceImpl implements MessageService { @Resource private MQAdminExt mqAdminExt; + @Override public Pair> viewMessage(String subject, final String msgId) { try { @@ -88,7 +98,12 @@ public class MessageServiceImpl implements MessageService { @Override public List queryMessageByTopic(String topic, final long begin, final long end) { - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP); + boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); + RPCHook rpcHook = null; + if (isEnableAcl) { + rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey())); + } + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook); List messageViewList = Lists.newArrayList(); try { String subExpression = "*"; diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java index 7660eaa..f6211c8 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java @@ -20,13 +20,9 @@ package org.apache.rocketmq.console.service.impl; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.MixAll; @@ -43,11 +39,17 @@ import org.apache.rocketmq.console.model.request.SendTopicMessageRequest; import org.apache.rocketmq.console.model.request.TopicConfigInfo; import org.apache.rocketmq.console.service.AbstractCommonService; import org.apache.rocketmq.console.service.TopicService; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.CommandUtil; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { @@ -209,7 +211,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } private TopicList getSystemTopicList() { - DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP); + RPCHook rpcHook = null; + boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey()); + if (isEnableAcl) { + rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(),rMQConfigure.getSecretKey())); + } + DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP,rpcHook); producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr()); @@ -228,7 +235,17 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { - DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP); + DefaultMQProducer producer = null; + if(rMQConfigure.isACLEnabled()){ + producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials( + rMQConfigure.getAccessKey(), + rMQConfigure.getSecretKey() + ))); + producer.setProducerGroup(MixAll.SELF_TEST_PRODUCER_GROUP); + }else{ + producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP); + } + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr()); try { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index a5d233c..6b1d2df 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -27,4 +27,8 @@ rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required -rocketmq.config.loginRequired=false \ No newline at end of file +rocketmq.config.loginRequired=false + +#set the accessKey and secretKey if you used acl +#rocketmq.config.accessKey= +#rocketmq.config.secretKey= \ No newline at end of file