diff --git a/frontend-new/src/api/remoteApi/remoteApi.js b/frontend-new/src/api/remoteApi/remoteApi.js index fd6c7fe..329628d 100644 --- a/frontend-new/src/api/remoteApi/remoteApi.js +++ b/frontend-new/src/api/remoteApi/remoteApi.js @@ -355,6 +355,7 @@ const remoteApi = { */ resendMessageDirectly: async (msgId, consumerGroup, topic) => { topic = encodeURIComponent(topic) + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/message/consumeMessageDirectly.do?msgId=${msgId}&consumerGroup=${consumerGroup}&topic=${topic}`), { method: 'POST', @@ -392,6 +393,7 @@ const remoteApi = { }, refreshConsumerGroup: async (consumerGroup) => { + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/group.refresh?consumerGroup=${consumerGroup}`)); const data = await response.json(); @@ -443,6 +445,7 @@ const remoteApi = { }, fetchBrokerNameList: async (consumerGroup) => { + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/fetchBrokerNameList.query?consumerGroup=${consumerGroup}`)); const data = await response.json(); @@ -454,6 +457,7 @@ const remoteApi = { }, deleteConsumerGroup: async (groupName, brokerNameList) => { + groupName = encodeURIComponent(groupName) try { const response = await remoteApi._fetch(remoteApi.buildUrl("/consumer/deleteSubGroup.do"), { method: 'POST', @@ -471,6 +475,7 @@ const remoteApi = { }, queryConsumerConfig: async (consumerGroup) => { + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/examineSubscriptionGroupConfig.query?consumerGroup=${consumerGroup}`)); const data = await response.json(); @@ -499,6 +504,7 @@ const remoteApi = { }, queryTopicByConsumer: async (consumerGroup, address) => { + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/queryTopicByConsumer.query?consumerGroup=${consumerGroup}&address=${address}`)); const data = await response.json(); @@ -510,6 +516,7 @@ const remoteApi = { }, queryConsumerConnection: async (consumerGroup, address) => { + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerConnection.query?consumerGroup=${consumerGroup}&address=${address}`)); const data = await response.json(); @@ -521,6 +528,7 @@ const remoteApi = { }, queryConsumerRunningInfo: async (consumerGroup, clientId, jstack = false) => { + consumerGroup = encodeURIComponent(consumerGroup) try { const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerRunningInfo.query?consumerGroup=${consumerGroup}&clientId=${clientId}&jstack=${jstack}`)); const data = await response.json(); diff --git a/frontend-new/src/components/consumer/ClientInfoModal.jsx b/frontend-new/src/components/consumer/ClientInfoModal.jsx index 4cf5de5..21d21a5 100644 --- a/frontend-new/src/components/consumer/ClientInfoModal.jsx +++ b/frontend-new/src/components/consumer/ClientInfoModal.jsx @@ -21,7 +21,7 @@ import {remoteApi} from '../../api/remoteApi/remoteApi'; import {useLanguage} from '../../i18n/LanguageContext'; -const ClientInfoModal = ({visible, group, address, onCancel}) => { +const ClientInfoModal = ({visible, group, address, onCancel, messageApi}) => { const {t} = useLanguage(); const [loading, setLoading] = useState(false); const [connectionData, setConnectionData] = useState(null); @@ -34,7 +34,11 @@ const ClientInfoModal = ({visible, group, address, onCancel}) => { try { const connResponse = await remoteApi.queryConsumerConnection(group, address); - if (connResponse.status === 0) setConnectionData(connResponse.data); + if (connResponse.status === 0) { + setConnectionData(connResponse.data); + }else{ + messageApi.error(connResponse.errMsg); + } } finally { setLoading(false); } diff --git a/frontend-new/src/components/consumer/ConsumerDetailModal.jsx b/frontend-new/src/components/consumer/ConsumerDetailModal.jsx index b46ffe7..7d70f2e 100644 --- a/frontend-new/src/components/consumer/ConsumerDetailModal.jsx +++ b/frontend-new/src/components/consumer/ConsumerDetailModal.jsx @@ -20,7 +20,7 @@ import {Modal, Spin, Table} from 'antd'; import {remoteApi} from '../../api/remoteApi/remoteApi'; import {useLanguage} from '../../i18n/LanguageContext'; -const ConsumerDetailModal = ({visible, group, address, onCancel}) => { +const ConsumerDetailModal = ({visible, group, address, onCancel ,messageApi}) => { const {t} = useLanguage(); const [loading, setLoading] = useState(false); const [details, setDetails] = useState([]); @@ -34,6 +34,10 @@ const ConsumerDetailModal = ({visible, group, address, onCancel}) => { const response = await remoteApi.queryTopicByConsumer(group, address); if (response.status === 0) { setDetails(response.data); + }else { + // Handle error case + messageApi.error(response.errMsg); + setDetails([]); } } finally { setLoading(false); diff --git a/frontend-new/src/components/consumer/DeleteConsumerModal.jsx b/frontend-new/src/components/consumer/DeleteConsumerModal.jsx index 504459e..7381869 100644 --- a/frontend-new/src/components/consumer/DeleteConsumerModal.jsx +++ b/frontend-new/src/components/consumer/DeleteConsumerModal.jsx @@ -18,15 +18,12 @@ import React, {useEffect, useState} from 'react'; import {Button, Checkbox, Modal, notification, Spin} from 'antd'; import {remoteApi} from '../../api/remoteApi/remoteApi'; -import {useLanguage} from '../../i18n/LanguageContext'; -const DeleteConsumerModal = ({visible, group, onCancel, onSuccess}) => { - const {t} = useLanguage(); +const DeleteConsumerModal = ({visible, group, onCancel, onSuccess, t}) => { const [brokerList, setBrokerList] = useState([]); const [selectedBrokers, setSelectedBrokers] = useState([]); const [loading, setLoading] = useState(false); - // 获取Broker列表 useEffect(() => { const fetchBrokers = async () => { if (!visible) return; @@ -45,7 +42,6 @@ const DeleteConsumerModal = ({visible, group, onCancel, onSuccess}) => { fetchBrokers(); }, [visible, group]); - // 处理删除提交 const handleDelete = async () => { if (selectedBrokers.length === 0) { notification.warning({message: t.PLEASE_SELECT_BROKER}); diff --git a/frontend-new/src/i18n/index.js b/frontend-new/src/i18n/index.js index 1486991..28a882e 100644 --- a/frontend-new/src/i18n/index.js +++ b/frontend-new/src/i18n/index.js @@ -592,9 +592,10 @@ export const translations = { "EXPRESSION_TYPE": "Expression Type", "SUB_VERSION": "Sub Version", "CODE_SET": "Code Set", - "TAGS_SET": "Tags Set" - - + "TAGS_SET": "Tags Set", + "DELETE_CONSUMER_GROUP": "Delete Consumer Group", + "SELECT_DELETE_BROKERS": "Please select brokers to delete consumer group", + "CONFIRM_DELETE": "Confirm Delete", } }; diff --git a/frontend-new/src/pages/Consumer/consumer.jsx b/frontend-new/src/pages/Consumer/consumer.jsx index 650bafa..3377e7a 100644 --- a/frontend-new/src/pages/Consumer/consumer.jsx +++ b/frontend-new/src/pages/Consumer/consumer.jsx @@ -288,7 +288,6 @@ const ConsumerGroupList = () => { setShowConfig(true); }; - // 修改操作按钮的点击处理函数 const handleClient = (group, address) => { setSelectedGroup(group); setSelectedAddress(address); @@ -550,12 +549,12 @@ const ConsumerGroupList = () => { /> - {/* 模态框组件保持不变 */} setShowClientInfo(false)} + messageApi={messageApi} /> { group={selectedGroup} address={selectedAddress} onCancel={() => setShowConsumeDetail(false)} + messageApi={messageApi} /> { group={selectedGroup} onCancel={() => setShowDeleteModal(false)} onSuccess={loadConsumerGroups} + t={t} /> diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index e307966..160da1a 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -283,6 +283,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public List queryConsumeStatsListByGroupName(String groupName, String address) { + groupName = getConsumerGroup(groupName); List consumeStatses = new ArrayList<>(); String topic = null; try { @@ -295,9 +296,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum throw new RuntimeException(e); } List res = new ArrayList<>(); + String finalGroupName = groupName; consumeStatses.forEach(consumeStats -> { if (consumeStats != null && consumeStats.getOffsetTable() != null && !consumeStats.getOffsetTable().isEmpty()) { - res.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName)); + res.addAll(toTopicConsumerInfoList(topic, consumeStats, finalGroupName)); } }); return res; @@ -305,6 +307,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public List queryConsumeStatsList(final String topic, String groupName) { + groupName = getConsumerGroup(groupName); ConsumeStats consumeStats = null; try { consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); @@ -316,6 +319,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } private List toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) { + groupName = getConsumerGroup(groupName); List mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate() { @Override public boolean apply(MessageQueue o) { @@ -339,6 +343,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } private Map getClientConnection(String groupName) { + groupName = getConsumerGroup(groupName); Map results = Maps.newHashMap(); try { ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName); @@ -417,7 +422,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public List examineSubscriptionGroupConfig(String group) { + public List examineSubscriptionGroupConfig(String consumerGroup) { + consumerGroup = getConsumerGroup(consumerGroup); List consumerConfigInfoList = Lists.newArrayList(); try { ClusterInfo clusterInfo = clusterInfoService.get(); @@ -425,9 +431,9 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); SubscriptionGroupConfig subscriptionGroupConfig = null; try { - subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); + subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, consumerGroup); } catch (Exception e) { - logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, group); + logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, consumerGroup); } if (subscriptionGroupConfig == null) { continue; @@ -480,6 +486,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { + consumerConfigInfo.getSubscriptionGroupConfig().setGroupName(getConsumerGroup(consumerConfigInfo.getSubscriptionGroupConfig().getGroupName())); try { ClusterInfo clusterInfo = clusterInfoService.get(); for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), @@ -495,6 +502,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public Set fetchBrokerNameSetBySubscriptionGroup(String group) { + group = getConsumerGroup(group); Set brokerNameSet = Sets.newHashSet(); try { List consumerConfigInfoList = examineSubscriptionGroupConfig(group); @@ -511,6 +519,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public ConsumerConnection getConsumerConnection(String consumerGroup, String address) { + consumerGroup = getConsumerGroup(consumerGroup); try { String[] addresses = address.split(","); String addr = addresses[0]; @@ -523,6 +532,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) { + consumerGroup = getConsumerGroup(consumerGroup); try { return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); } catch (Exception e) { @@ -533,7 +543,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum @Override public GroupConsumeInfo refreshGroup(String address, String consumerGroup) { - if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) { throw new RuntimeException("Cache is being built or empty, please try again later"); } @@ -541,7 +550,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum for (int i = 0; i < cacheConsumeInfoList.size(); i++) { GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList.get(i); if (groupConsumeInfo.getGroup().equals(consumerGroup)) { - GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, ""); + GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, address); updatedInfo.setUpdateTime(new Date()); updatedInfo.setGroup(consumerGroup); updatedInfo.setAddress(consumerGroupMap.get(consumerGroup)); @@ -559,4 +568,11 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum consumerGroupMap.clear(); return queryGroupList(false, address); } + + public String getConsumerGroup(String consumerGroup) { + if (consumerGroup != null && consumerGroup.startsWith("%SYS%")) { + return consumerGroup.substring(5); // Remove "%SYS%" prefix + } + return consumerGroup; + } }