Update Rocketmq console 2.0.0 for support acl (#645)

* add accessKey and secretKey config support

* fix checkstyle

* Optimized code

* update acl for send messages

* update for check ACL when sending message

Co-authored-by: Demogorgon314 <wangkai744567028@gmail.com>
Co-authored-by: liwei5 <liwei5@vipkid.com.cn>
This commit is contained in:
francis lee
2020-11-19 10:59:03 +08:00
committed by GitHub
parent 4a67745bc6
commit 81e1292346
6 changed files with 93 additions and 20 deletions

View File

@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.console.aspect.admin;
import java.lang.reflect.Method;
import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.service.client.MQAdminInstance;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@@ -26,13 +26,19 @@ import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Method;
@Aspect
@Service
public class MQAdminAspect {
private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
@Autowired
private RMQConfigure rmqConfigure;
public MQAdminAspect() {
}
@@ -55,10 +61,10 @@ public class MQAdminAspect {
Method method = signature.getMethod();
MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis());
MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
}
else {
MQAdminInstance.initMQAdminInstance(0);
MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
}
obj = joinPoint.proceed();
}

View File

@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.console.config;
import java.io.File;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.slf4j.Logger;
@@ -30,6 +28,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import java.io.File;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
@Configuration
@@ -51,6 +51,26 @@ public class RMQConfigure {
private boolean loginRequired = false;
private String accessKey;
private String secretKey;
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
@@ -62,7 +82,10 @@ public class RMQConfigure {
logger.info("setNameSrvAddrByProperty nameSrvAddr={}", namesrvAddr);
}
}
public boolean isACLEnabled(){
return !(StringUtils.isAnyBlank(this.accessKey, this.secretKey)||
StringUtils.isAnyEmpty(this.accessKey, this.secretKey));
}
public String getRocketMqConsoleDataPath() {
return dataPath;
}

View File

@@ -16,9 +16,13 @@
*/
package org.apache.rocketmq.console.service.client;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
@@ -47,16 +51,20 @@ public class MQAdminInstance {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey) throws MQClientException {
Integer nowCount = INIT_COUNTER.get();
if (nowCount == null) {
RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey);
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
DefaultMQAdminExt defaultMQAdminExt;
if (timeoutMillis > 0) {
defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis);
}
else {
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
}
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.start();

View File

@@ -17,6 +17,7 @@
package org.apache.rocketmq.console.service.impl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
@@ -29,6 +30,8 @@ import java.util.List;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
@@ -38,19 +41,25 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.exception.ServiceException;
import org.apache.rocketmq.console.model.MessageView;
import org.apache.rocketmq.console.service.MessageService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageServiceImpl implements MessageService {
private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private RMQConfigure configure;
/**
* @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
* @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
@@ -59,6 +68,7 @@ public class MessageServiceImpl implements MessageService {
@Resource
private MQAdminExt mqAdminExt;
@Override
public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId) {
try {
@@ -88,7 +98,12 @@ public class MessageServiceImpl implements MessageService {
@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP);
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook);
List<MessageView> messageViewList = Lists.newArrayList();
try {
String subExpression = "*";

View File

@@ -20,13 +20,9 @@ package org.apache.rocketmq.console.service.impl;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
@@ -43,11 +39,17 @@ import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
import org.apache.rocketmq.console.service.AbstractCommonService;
import org.apache.rocketmq.console.service.TopicService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
@@ -209,7 +211,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
}
private TopicList getSystemTopicList() {
DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(),rMQConfigure.getSecretKey()));
}
DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP,rpcHook);
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
@@ -228,7 +235,17 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
DefaultMQProducer producer = null;
if(rMQConfigure.isACLEnabled()){
producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials(
rMQConfigure.getAccessKey(),
rMQConfigure.getSecretKey()
)));
producer.setProducerGroup(MixAll.SELF_TEST_PRODUCER_GROUP);
}else{
producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
}
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
try {

View File

@@ -27,4 +27,8 @@ rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false
rocketmq.config.loginRequired=false
#set the accessKey and secretKey if you used acl
#rocketmq.config.accessKey=
#rocketmq.config.secretKey=