[ISSUE #773] add useTLS configure for rocketmq console

This commit is contained in:
yuz10
2021-08-05 08:45:26 +08:00
committed by GitHub
parent c9d7cb20e7
commit 05b7ad2101
13 changed files with 81 additions and 19 deletions

View File

@@ -61,10 +61,10 @@ public class MQAdminAspect {
Method method = signature.getMethod();
MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS());
}
else {
MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS());
}
obj = joinPoint.proceed();
}
@@ -74,4 +74,4 @@ public class MQAdminAspect {
}
return obj;
}
}
}

View File

@@ -55,6 +55,8 @@ public class RMQConfigure {
private String secretKey;
private boolean useTLS = false;
public String getAccessKey() {
return accessKey;
}
@@ -134,6 +136,14 @@ public class RMQConfigure {
this.loginRequired = loginRequired;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
// Error Page process logic, move to a central configure later
@Bean
public ErrorPageRegistrar errorPageRegistrar() {

View File

@@ -57,4 +57,12 @@ public class OpsController {
public Object clusterStatus() {
return opsService.rocketMqStatusCheck();
}
@RequestMapping(value = "/updateUseTLS.do", method = RequestMethod.POST)
@ResponseBody
public Object updateUseTLS(@RequestParam String useTLS) {
opsService.updateUseTLS(Boolean.parseBoolean(useTLS));
return true;
}
}

View File

@@ -29,4 +29,6 @@ public interface OpsService {
Map<CheckerType,Object> rocketMqStatusCheck();
boolean updateIsVIPChannel(String useVIPChannel);
boolean updateUseTLS(boolean useTLS);
}

View File

@@ -51,7 +51,7 @@ public class MQAdminInstance {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey) throws MQClientException {
public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey, boolean useTLS) throws MQClientException {
Integer nowCount = INIT_COUNTER.get();
if (nowCount == null) {
RPCHook rpcHook = null;
@@ -66,6 +66,7 @@ public class MQAdminInstance {
else {
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
}
defaultMQAdminExt.setUseTLS(useTLS);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.start();
MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);

View File

@@ -122,7 +122,7 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
List<MessageView> messageViewList = Lists.newArrayList();
try {
String subExpression = "*";
@@ -255,7 +255,7 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
long total = 0;
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
@@ -396,7 +396,7 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
List<MessageView> messageViews = new ArrayList<>();
long offset = query.getPageNum() * query.getPageSize();
@@ -536,7 +536,9 @@ public class MessageServiceImpl implements MessageService {
}
}
public DefaultMQPullConsumer buildDefaultMQPullConsumer(RPCHook rpcHook) {
return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
public DefaultMQPullConsumer buildDefaultMQPullConsumer(RPCHook rpcHook, boolean useTLS) {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
consumer.setUseTLS(useTLS);
return consumer;
}
}

View File

@@ -42,6 +42,7 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
Map<String, Object> homePageInfoMap = Maps.newHashMap();
homePageInfoMap.put("namesvrAddrList", Splitter.on(";").splitToList(configure.getNamesrvAddr()));
homePageInfoMap.put("useVIPChannel", Boolean.valueOf(configure.getIsVIPChannel()));
homePageInfoMap.put("useTLS", configure.isUseTLS());
return homePageInfoMap;
}
@@ -68,4 +69,10 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
configure.setIsVIPChannel(useVIPChannel);
return true;
}
@Override
public boolean updateUseTLS(boolean useTLS) {
configure.setUseTLS(useTLS);
return true;
}
}

View File

@@ -200,8 +200,10 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
return true;
}
public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook) {
return new DefaultMQProducer(producerGroup, rpcHook);
public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean useTLS) {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup, rpcHook);
producer.setUseTLS(useTLS);
return producer;
}
private TopicList getSystemTopicList() {
@@ -210,7 +212,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, configure.isUseTLS());
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
@@ -232,9 +234,9 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
configure.getAccessKey(),
configure.getSecretKey()
));
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, configure.isUseTLS());
} else {
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, null);
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, null, configure.isUseTLS());
}
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));

View File

@@ -31,4 +31,5 @@ rocketmq.config.loginRequired=false
#set the accessKey and secretKey if you used acl
#rocketmq.config.accessKey=
#rocketmq.config.secretKey=
#rocketmq.config.secretKey=
rocketmq.config.useTLS=false

View File

@@ -18,13 +18,15 @@
app.controller('opsController', ['$scope','$location','$http','Notification','remoteApi','tools', function ($scope,$location,$http,Notification,remoteApi,tools) {
$scope.namesvrAddrList = "";
$scope.useVIPChannel = true;
$scope.useTLS = false;
$http({
method: "GET",
url: "ops/homePage.query"
}).success(function (resp) {
if (resp.status == 0) {
$scope.namesvrAddrList = resp.data.namesvrAddrList.join(";");
$scope.useVIPChannel = resp.data.useVIPChannel
$scope.useVIPChannel = resp.data.useVIPChannel;
$scope.useTLS = resp.data.useTLS;
}else{
Notification.error({message: resp.errMsg, delay: 2000});
}
@@ -55,5 +57,18 @@ app.controller('opsController', ['$scope','$location','$http','Notification','re
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
$scope.updateUseTLS = function () {
$http({
method: "POST",
url: "ops/updateUseTLS.do",
params:{useTLS:$scope.useTLS}
}).success(function (resp) {
if (resp.status == 0) {
Notification.info({message: "SUCCESS", delay: 2000});
}else{
Notification.error({message: resp.errMsg, delay: 2000});
}
});
}
}]);

View File

@@ -35,5 +35,18 @@
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-click="updateIsVIPChannel()">{{'UPDATE' | translate}}
</button>
</div>
<br/>
<br/>
<br/>
<h2 class="md-title">useTLS</h2>
<div class="pull-left">
<md-switch class="md-primary" md-no-ink aria-label="Switch No Ink" ng-model="useTLS">
</md-switch>
</div>
<div class="pull-left">
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-click="updateUseTLS()">{{'UPDATE' | translate}}
</button>
</div>
</div>
</div>

View File

@@ -50,6 +50,7 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -88,7 +89,7 @@ public class MessageControllerTest extends BaseControllerTest {
when(pullResult.getNextBeginOffset()).thenReturn(Long.MAX_VALUE);
when(pullResult.getPullStatus()).thenReturn(PullStatus.FOUND);
when(pullResult.getMsgFoundList()).thenReturn(wrappers);
when(messageService.buildDefaultMQPullConsumer(any())).thenReturn(defaultMQPullConsumer);
when(messageService.buildDefaultMQPullConsumer(any(), anyBoolean())).thenReturn(defaultMQPullConsumer);
}
}

View File

@@ -108,7 +108,7 @@ public class TopicControllerTest extends BaseControllerTest {
when(defaultMQProducer.getmQClientFactory()).thenReturn(mqClientInstance);
when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPIImpl);
when(mqClientAPIImpl.getSystemTopicList(anyLong())).thenReturn(sysTopicList);
when(topicService.buildDefaultMQProducer(anyString(), any())).thenReturn(producer);
when(topicService.buildDefaultMQProducer(anyString(), any(), anyBoolean())).thenReturn(producer);
}
final String url = "/topic/list.query";
@@ -246,7 +246,7 @@ public class TopicControllerTest extends BaseControllerTest {
SendResult result = new SendResult(SendStatus.SEND_OK, "7F000001E41A2E5D6D978B82C20F003D",
"0A8E83C300002A9F00000000000013D3", new MessageQueue(), 1000L);
when(producer.send(any(Message.class))).thenReturn(result);
when(topicService.buildDefaultMQProducer(anyString(), any())).thenReturn(producer);
when(topicService.buildDefaultMQProducer(anyString(), any(), anyBoolean())).thenReturn(producer);
}
SendTopicMessageRequest request = new SendTopicMessageRequest();