mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-02-22 13:55:43 +08:00
* Add Acl menu, support config acl. * Optimize one line code. * Add some unit tests for acl. * Add permission control by role and optimize some code. * The secret keys are hidden by asterisks. * Search acl data will exclude secretKey info if the login role is not admin in the background. * Optimize some code again. * recover default application.yml config
This commit is contained in:
@@ -0,0 +1,146 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.dashboard.controller;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.common.AclConfig;
|
||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||
import org.apache.rocketmq.dashboard.config.RMQConfigure;
|
||||
import org.apache.rocketmq.dashboard.model.User;
|
||||
import org.apache.rocketmq.dashboard.model.UserInfo;
|
||||
import org.apache.rocketmq.dashboard.model.request.AclRequest;
|
||||
import org.apache.rocketmq.dashboard.permisssion.Permission;
|
||||
import org.apache.rocketmq.dashboard.service.AclService;
|
||||
import org.apache.rocketmq.dashboard.support.JsonResult;
|
||||
import org.apache.rocketmq.dashboard.util.WebUtil;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/acl")
|
||||
@Permission
|
||||
public class AclController {
|
||||
|
||||
@Resource
|
||||
private AclService aclService;
|
||||
|
||||
@Resource
|
||||
private RMQConfigure configure;
|
||||
|
||||
@GetMapping("/enable.query")
|
||||
public Object isEnableAcl() {
|
||||
return new JsonResult<>(configure.isACLEnabled());
|
||||
}
|
||||
|
||||
@GetMapping("/config.query")
|
||||
public AclConfig getAclConfig(HttpServletRequest request) {
|
||||
if (!configure.isLoginRequired()) {
|
||||
return aclService.getAclConfig(false);
|
||||
}
|
||||
UserInfo userInfo = (UserInfo) WebUtil.getValueFromSession(request, WebUtil.USER_INFO);
|
||||
// if user info is null but reach here, must exclude secret key for safety.
|
||||
return aclService.getAclConfig(userInfo == null || userInfo.getUser().getType() != User.ADMIN);
|
||||
}
|
||||
|
||||
@PostMapping("/add.do")
|
||||
public Object addAclConfig(@RequestBody PlainAccessConfig config) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getAccessKey()), "accessKey is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getSecretKey()), "secretKey is null");
|
||||
aclService.addAclConfig(config);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/delete.do")
|
||||
public Object deleteAclConfig(@RequestBody PlainAccessConfig config) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getAccessKey()), "accessKey is null");
|
||||
aclService.deleteAclConfig(config);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/update.do")
|
||||
public Object updateAclConfig(@RequestBody PlainAccessConfig config) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getSecretKey()), "secretKey is null");
|
||||
aclService.updateAclConfig(config);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/topic/add.do")
|
||||
public Object addAclTopicConfig(@RequestBody AclRequest request) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getAccessKey()), "accessKey is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getSecretKey()), "secretKey is null");
|
||||
Preconditions.checkArgument(CollectionUtils.isNotEmpty(request.getConfig().getTopicPerms()), "topic perms is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getTopicPerm()), "topic perm is null");
|
||||
aclService.addOrUpdateAclTopicConfig(request);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/group/add.do")
|
||||
public Object addAclGroupConfig(@RequestBody AclRequest request) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getAccessKey()), "accessKey is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getSecretKey()), "secretKey is null");
|
||||
Preconditions.checkArgument(CollectionUtils.isNotEmpty(request.getConfig().getGroupPerms()), "group perms is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getGroupPerm()), "group perm is null");
|
||||
aclService.addOrUpdateAclGroupConfig(request);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/perm/delete.do")
|
||||
public Object deletePermConfig(@RequestBody AclRequest request) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getAccessKey()), "accessKey is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getSecretKey()), "secretKey is null");
|
||||
aclService.deletePermConfig(request);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/sync.do")
|
||||
public Object syncConfig(@RequestBody PlainAccessConfig config) {
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getAccessKey()), "accessKey is null");
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getSecretKey()), "secretKey is null");
|
||||
aclService.syncData(config);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/white/list/add.do")
|
||||
public Object addWhiteList(@RequestBody List<String> whiteList) {
|
||||
Preconditions.checkArgument(CollectionUtils.isNotEmpty(whiteList), "white list is null");
|
||||
aclService.addWhiteList(whiteList);
|
||||
return true;
|
||||
}
|
||||
|
||||
@DeleteMapping("/white/list/delete.do")
|
||||
public Object deleteWhiteAddr(@RequestParam String request) {
|
||||
aclService.deleteWhiteAddr(request);
|
||||
return true;
|
||||
}
|
||||
|
||||
@PostMapping("/white/list/sync.do")
|
||||
public Object synchronizeWhiteList(@RequestBody List<String> whiteList) {
|
||||
Preconditions.checkArgument(CollectionUtils.isNotEmpty(whiteList), "white list is null");
|
||||
aclService.synchronizeWhiteList(whiteList);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.dashboard.model.request;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||
|
||||
@Data
|
||||
public class AclRequest {
|
||||
|
||||
private PlainAccessConfig config;
|
||||
|
||||
private String topicPerm;
|
||||
|
||||
private String groupPerm;
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.dashboard.service;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.rocketmq.common.AclConfig;
|
||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||
import org.apache.rocketmq.dashboard.model.request.AclRequest;
|
||||
|
||||
public interface AclService {
|
||||
|
||||
AclConfig getAclConfig(boolean excludeSecretKey);
|
||||
|
||||
void addAclConfig(PlainAccessConfig config);
|
||||
|
||||
void deleteAclConfig(PlainAccessConfig config);
|
||||
|
||||
void updateAclConfig(PlainAccessConfig config);
|
||||
|
||||
void addOrUpdateAclTopicConfig(AclRequest request);
|
||||
|
||||
void addOrUpdateAclGroupConfig(AclRequest request);
|
||||
|
||||
void deletePermConfig(AclRequest request);
|
||||
|
||||
void syncData(PlainAccessConfig config);
|
||||
|
||||
void addWhiteList(List<String> whiteList);
|
||||
|
||||
void deleteWhiteAddr(String addr);
|
||||
|
||||
void synchronizeWhiteList(List<String> whiteList);
|
||||
}
|
||||
@@ -91,29 +91,34 @@ public class MQAdminExtImpl implements MQAdminExt {
|
||||
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
|
||||
}
|
||||
|
||||
@Override public void createAndUpdatePlainAccessConfig(String addr,
|
||||
@Override
|
||||
public void createAndUpdatePlainAccessConfig(String addr,
|
||||
PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
|
||||
|
||||
MQAdminInstance.threadLocalMQAdminExt().createAndUpdatePlainAccessConfig(addr, plainAccessConfig);
|
||||
}
|
||||
|
||||
@Override public void deletePlainAccessConfig(String addr,
|
||||
@Override
|
||||
public void deletePlainAccessConfig(String addr,
|
||||
String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
|
||||
|
||||
MQAdminInstance.threadLocalMQAdminExt().deletePlainAccessConfig(addr, accessKey);
|
||||
}
|
||||
|
||||
@Override public void updateGlobalWhiteAddrConfig(String addr,
|
||||
@Override
|
||||
public void updateGlobalWhiteAddrConfig(String addr,
|
||||
String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
|
||||
|
||||
MQAdminInstance.threadLocalMQAdminExt().updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs);
|
||||
}
|
||||
|
||||
@Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
|
||||
@Override
|
||||
public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
|
||||
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override public AclConfig examineBrokerClusterAclConfig(
|
||||
@Override
|
||||
public AclConfig examineBrokerClusterAclConfig(
|
||||
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
|
||||
return null;
|
||||
return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterAclConfig(addr);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,359 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.dashboard.service.impl;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.AclConfig;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
|
||||
import org.apache.rocketmq.common.protocol.route.BrokerData;
|
||||
import org.apache.rocketmq.dashboard.model.request.AclRequest;
|
||||
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
||||
import org.apache.rocketmq.dashboard.service.AclService;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class AclServiceImpl extends AbstractCommonService implements AclService {
|
||||
|
||||
@Override
|
||||
public AclConfig getAclConfig(boolean excludeSecretKey) {
|
||||
try {
|
||||
Optional<String> addr = getMasterSet().stream().findFirst();
|
||||
if (addr.isPresent()) {
|
||||
if (!excludeSecretKey) {
|
||||
return mqAdminExt.examineBrokerClusterAclConfig(addr.get());
|
||||
} else {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr.get());
|
||||
if (CollectionUtils.isNotEmpty(aclConfig.getPlainAccessConfigs())) {
|
||||
aclConfig.getPlainAccessConfigs().forEach(pac -> pac.setSecretKey(null));
|
||||
}
|
||||
return aclConfig;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("getAclConfig error.", e);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
AclConfig aclConfig = new AclConfig();
|
||||
aclConfig.setGlobalWhiteAddrs(Collections.emptyList());
|
||||
aclConfig.setPlainAccessConfigs(Collections.emptyList());
|
||||
return aclConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAclConfig(PlainAccessConfig config) {
|
||||
try {
|
||||
Set<String> masterSet = getMasterSet();
|
||||
|
||||
if (masterSet.isEmpty()) {
|
||||
throw new IllegalStateException("broker addr list is empty");
|
||||
}
|
||||
// check to see if account is exists
|
||||
for (String addr : masterSet) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
List<PlainAccessConfig> plainAccessConfigs = aclConfig.getPlainAccessConfigs();
|
||||
for (PlainAccessConfig pac : plainAccessConfigs) {
|
||||
if (pac.getAccessKey().equals(config.getAccessKey())) {
|
||||
throw new IllegalArgumentException(String.format("broker: %s, exist accessKey: %s", addr, config.getAccessKey()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// all broker
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAclConfig(PlainAccessConfig config) {
|
||||
try {
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
log.info("Start to delete acl [{}] from broker [{}]", config.getAccessKey(), addr);
|
||||
if (isExistAccessKey(config.getAccessKey(), addr)) {
|
||||
mqAdminExt.deletePlainAccessConfig(addr, config.getAccessKey());
|
||||
}
|
||||
log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateAclConfig(PlainAccessConfig config) {
|
||||
try {
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
if (aclConfig.getPlainAccessConfigs() != null) {
|
||||
PlainAccessConfig remoteConfig = null;
|
||||
for (PlainAccessConfig pac : aclConfig.getPlainAccessConfigs()) {
|
||||
if (pac.getAccessKey().equals(config.getAccessKey())) {
|
||||
remoteConfig = pac;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (remoteConfig != null) {
|
||||
remoteConfig.setSecretKey(config.getSecretKey());
|
||||
remoteConfig.setAdmin(config.isAdmin());
|
||||
config = remoteConfig;
|
||||
}
|
||||
}
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addOrUpdateAclTopicConfig(AclRequest request) {
|
||||
try {
|
||||
PlainAccessConfig addConfig = request.getConfig();
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
PlainAccessConfig remoteConfig = null;
|
||||
if (aclConfig.getPlainAccessConfigs() != null) {
|
||||
for (PlainAccessConfig config : aclConfig.getPlainAccessConfigs()) {
|
||||
if (config.getAccessKey().equals(addConfig.getAccessKey())) {
|
||||
remoteConfig = config;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (remoteConfig == null) {
|
||||
// Maybe the broker no acl config of the access key, therefore add it;
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, addConfig);
|
||||
} else {
|
||||
if (remoteConfig.getTopicPerms() == null) {
|
||||
remoteConfig.setTopicPerms(new ArrayList<>());
|
||||
}
|
||||
removeExist(remoteConfig.getTopicPerms(), request.getTopicPerm().split("=")[0]);
|
||||
remoteConfig.getTopicPerms().add(request.getTopicPerm());
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, remoteConfig);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addOrUpdateAclGroupConfig(AclRequest request) {
|
||||
try {
|
||||
PlainAccessConfig addConfig = request.getConfig();
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
PlainAccessConfig remoteConfig = null;
|
||||
if (aclConfig.getPlainAccessConfigs() != null) {
|
||||
for (PlainAccessConfig config : aclConfig.getPlainAccessConfigs()) {
|
||||
if (config.getAccessKey().equals(addConfig.getAccessKey())) {
|
||||
remoteConfig = config;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (remoteConfig == null) {
|
||||
// May be the broker no acl config of the access key, therefore add it;
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, addConfig);
|
||||
} else {
|
||||
if (remoteConfig.getGroupPerms() == null) {
|
||||
remoteConfig.setGroupPerms(new ArrayList<>());
|
||||
}
|
||||
removeExist(remoteConfig.getGroupPerms(), request.getGroupPerm().split("=")[0]);
|
||||
remoteConfig.getGroupPerms().add(request.getGroupPerm());
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, remoteConfig);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deletePermConfig(AclRequest request) {
|
||||
try {
|
||||
PlainAccessConfig deleteConfig = request.getConfig();
|
||||
|
||||
String topic = StringUtils.isNotEmpty(request.getTopicPerm()) ? request.getTopicPerm().split("=")[0] : null;
|
||||
String group = StringUtils.isNotEmpty(request.getGroupPerm()) ? request.getGroupPerm().split("=")[0] : null;
|
||||
if (deleteConfig.getTopicPerms() != null && topic != null) {
|
||||
removeExist(deleteConfig.getTopicPerms(), topic);
|
||||
}
|
||||
if (deleteConfig.getGroupPerms() != null && group != null) {
|
||||
removeExist(deleteConfig.getGroupPerms(), group);
|
||||
}
|
||||
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
PlainAccessConfig remoteConfig = null;
|
||||
if (aclConfig.getPlainAccessConfigs() != null) {
|
||||
for (PlainAccessConfig config : aclConfig.getPlainAccessConfigs()) {
|
||||
if (config.getAccessKey().equals(deleteConfig.getAccessKey())) {
|
||||
remoteConfig = config;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (remoteConfig == null) {
|
||||
// Maybe the broker no acl config of the access key, therefore add it;
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, deleteConfig);
|
||||
} else {
|
||||
if (remoteConfig.getTopicPerms() != null && topic != null) {
|
||||
removeExist(remoteConfig.getTopicPerms(), topic);
|
||||
}
|
||||
if (remoteConfig.getGroupPerms() != null && group != null) {
|
||||
removeExist(remoteConfig.getGroupPerms(), group);
|
||||
}
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, remoteConfig);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void syncData(PlainAccessConfig config) {
|
||||
try {
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addWhiteList(List<String> whiteList) {
|
||||
if (whiteList == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
if (aclConfig.getGlobalWhiteAddrs() != null) {
|
||||
aclConfig.setGlobalWhiteAddrs(Stream.of(whiteList, aclConfig.getGlobalWhiteAddrs()).flatMap(Collection::stream).distinct().collect(Collectors.toList()));
|
||||
} else {
|
||||
aclConfig.setGlobalWhiteAddrs(whiteList);
|
||||
}
|
||||
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteWhiteAddr(String deleteAddr) {
|
||||
try {
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
if (aclConfig.getGlobalWhiteAddrs() == null || aclConfig.getGlobalWhiteAddrs().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
aclConfig.getGlobalWhiteAddrs().remove(deleteAddr);
|
||||
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void synchronizeWhiteList(List<String> whiteList) {
|
||||
if (whiteList == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
for (String addr : getBrokerAddrs()) {
|
||||
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ","));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeExist(List<String> list, String name) {
|
||||
Iterator<String> iterator = list.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String v = iterator.next();
|
||||
String cmp = v.split("=")[0];
|
||||
if (cmp.equals(name)) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isExistAccessKey(String accessKey,
|
||||
String addr) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
|
||||
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
|
||||
List<PlainAccessConfig> plainAccessConfigs = aclConfig.getPlainAccessConfigs();
|
||||
if (plainAccessConfigs == null || plainAccessConfigs.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
for (PlainAccessConfig config : plainAccessConfigs) {
|
||||
if (accessKey.equals(config.getAccessKey())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private Set<BrokerData> getBrokerDataSet() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
|
||||
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||
Map<String, BrokerData> brokerDataMap = clusterInfo.getBrokerAddrTable();
|
||||
return new HashSet<>(brokerDataMap.values());
|
||||
}
|
||||
|
||||
private Set<String> getMasterSet() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
|
||||
return getBrokerDataSet().stream().map(data -> data.getBrokerAddrs().get(MixAll.MASTER_ID)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private Set<String> getBrokerAddrs() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
|
||||
Set<String> brokerAddrs = new HashSet<>();
|
||||
getBrokerDataSet().forEach(data -> brokerAddrs.addAll(data.getBrokerAddrs().values()));
|
||||
return brokerAddrs;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user