[ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)

* [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup.

* modify method name

* Optimize the delete logic

* optimize code comments
This commit is contained in:
zhangjidi2016
2022-01-06 08:32:45 +08:00
committed by GitHub
parent 560b56e9dc
commit 529501c007
2 changed files with 30 additions and 0 deletions

View File

@@ -23,12 +23,14 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
@@ -46,6 +48,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo; import org.apache.rocketmq.dashboard.model.QueueStatInfo;
@@ -65,6 +68,9 @@ import static com.google.common.base.Throwables.propagate;
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource
private RMQConfigure configure;
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>(); private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
static { static {
@@ -290,11 +296,21 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) { public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
Set<String> brokerSet = this.fetchBrokerNameSetBySubscriptionGroup(deleteSubGroupRequest.getGroupName());
List<String> brokerList = deleteSubGroupRequest.getBrokerNameList();
boolean deleteInNsFlag = false;
// If the list of brokers passed in by the request contains the list of brokers that the consumer is in, delete RETRY and DLQ topic in namesrv
if (brokerList.containsAll(brokerSet)) {
deleteInNsFlag = true;
}
try { try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) { for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true); mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true);
// Delete %RETRY%+Group and %DLQ%+Group in broker and namesrv
deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
} }
} }
catch (Exception e) { catch (Exception e) {
@@ -303,6 +319,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return true; return true;
} }
private void deleteResources(String topic, String brokerName, ClusterInfo clusterInfo, boolean deleteInNsFlag) throws Exception {
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
Set<String> nameServerSet = null;
if (StringUtils.isNotBlank(configure.getNamesrvAddr())) {
String[] ns = configure.getNamesrvAddr().split(";");
nameServerSet = new HashSet<>(Arrays.asList(ns));
}
if (deleteInNsFlag) {
mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
}
}
@Override @Override
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
try { try {

View File

@@ -182,6 +182,8 @@ public class ConsumerControllerTest extends BaseControllerTest {
final String url = "/consumer/deleteSubGroup.do"; final String url = "/consumer/deleteSubGroup.do";
{ {
doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString()); doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
} }
DeleteSubGroupRequest request = new DeleteSubGroupRequest(); DeleteSubGroupRequest request = new DeleteSubGroupRequest();
request.setBrokerNameList(Lists.newArrayList("broker-a")); request.setBrokerNameList(Lists.newArrayList("broker-a"));