diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 2d60501..3ad85d4 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -23,12 +23,14 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; @@ -46,6 +48,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.QueueStatInfo; @@ -65,6 +68,9 @@ import static com.google.common.base.Throwables.propagate; public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); + @Resource + private RMQConfigure configure; + private static final Set SYSTEM_GROUP_SET = new HashSet<>(); static { @@ -290,11 +296,21 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) { + Set brokerSet = this.fetchBrokerNameSetBySubscriptionGroup(deleteSubGroupRequest.getGroupName()); + List brokerList = deleteSubGroupRequest.getBrokerNameList(); + boolean deleteInNsFlag = false; + // If the list of brokers passed in by the request contains the list of brokers that the consumer is in, delete RETRY and DLQ topic in namesrv + if (brokerList.containsAll(brokerSet)) { + deleteInNsFlag = true; + } try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) { logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true); + // Delete %RETRY%+Group and %DLQ%+Group in broker and namesrv + deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag); + deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag); } } catch (Exception e) { @@ -303,6 +319,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return true; } + private void deleteResources(String topic, String brokerName, ClusterInfo clusterInfo, boolean deleteInNsFlag) throws Exception { + mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); + Set nameServerSet = null; + if (StringUtils.isNotBlank(configure.getNamesrvAddr())) { + String[] ns = configure.getNamesrvAddr().split(";"); + nameServerSet = new HashSet<>(Arrays.asList(ns)); + } + if (deleteInNsFlag) { + mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); + } + } + @Override public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { try { diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java index 87c3cc0..dfc3c22 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java @@ -182,6 +182,8 @@ public class ConsumerControllerTest extends BaseControllerTest { final String url = "/consumer/deleteSubGroup.do"; { doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString()); + doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString()); + doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString()); } DeleteSubGroupRequest request = new DeleteSubGroupRequest(); request.setBrokerNameList(Lists.newArrayList("broker-a"));