mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2025-09-10 03:29:59 +08:00
This commit is contained in:
@@ -355,6 +355,7 @@ const remoteApi = {
|
||||
*/
|
||||
resendMessageDirectly: async (msgId, consumerGroup, topic) => {
|
||||
topic = encodeURIComponent(topic)
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/message/consumeMessageDirectly.do?msgId=${msgId}&consumerGroup=${consumerGroup}&topic=${topic}`), {
|
||||
method: 'POST',
|
||||
@@ -392,6 +393,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
refreshConsumerGroup: async (consumerGroup) => {
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/group.refresh?consumerGroup=${consumerGroup}`));
|
||||
const data = await response.json();
|
||||
@@ -443,6 +445,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
fetchBrokerNameList: async (consumerGroup) => {
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/fetchBrokerNameList.query?consumerGroup=${consumerGroup}`));
|
||||
const data = await response.json();
|
||||
@@ -454,6 +457,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
deleteConsumerGroup: async (groupName, brokerNameList) => {
|
||||
groupName = encodeURIComponent(groupName)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl("/consumer/deleteSubGroup.do"), {
|
||||
method: 'POST',
|
||||
@@ -471,6 +475,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
queryConsumerConfig: async (consumerGroup) => {
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/examineSubscriptionGroupConfig.query?consumerGroup=${consumerGroup}`));
|
||||
const data = await response.json();
|
||||
@@ -499,6 +504,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
queryTopicByConsumer: async (consumerGroup, address) => {
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/queryTopicByConsumer.query?consumerGroup=${consumerGroup}&address=${address}`));
|
||||
const data = await response.json();
|
||||
@@ -510,6 +516,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
queryConsumerConnection: async (consumerGroup, address) => {
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerConnection.query?consumerGroup=${consumerGroup}&address=${address}`));
|
||||
const data = await response.json();
|
||||
@@ -521,6 +528,7 @@ const remoteApi = {
|
||||
},
|
||||
|
||||
queryConsumerRunningInfo: async (consumerGroup, clientId, jstack = false) => {
|
||||
consumerGroup = encodeURIComponent(consumerGroup)
|
||||
try {
|
||||
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerRunningInfo.query?consumerGroup=${consumerGroup}&clientId=${clientId}&jstack=${jstack}`));
|
||||
const data = await response.json();
|
||||
|
@@ -21,7 +21,7 @@ import {remoteApi} from '../../api/remoteApi/remoteApi';
|
||||
import {useLanguage} from '../../i18n/LanguageContext';
|
||||
|
||||
|
||||
const ClientInfoModal = ({visible, group, address, onCancel}) => {
|
||||
const ClientInfoModal = ({visible, group, address, onCancel, messageApi}) => {
|
||||
const {t} = useLanguage();
|
||||
const [loading, setLoading] = useState(false);
|
||||
const [connectionData, setConnectionData] = useState(null);
|
||||
@@ -34,7 +34,11 @@ const ClientInfoModal = ({visible, group, address, onCancel}) => {
|
||||
try {
|
||||
const connResponse = await remoteApi.queryConsumerConnection(group, address);
|
||||
|
||||
if (connResponse.status === 0) setConnectionData(connResponse.data);
|
||||
if (connResponse.status === 0) {
|
||||
setConnectionData(connResponse.data);
|
||||
}else{
|
||||
messageApi.error(connResponse.errMsg);
|
||||
}
|
||||
} finally {
|
||||
setLoading(false);
|
||||
}
|
||||
|
@@ -20,7 +20,7 @@ import {Modal, Spin, Table} from 'antd';
|
||||
import {remoteApi} from '../../api/remoteApi/remoteApi';
|
||||
import {useLanguage} from '../../i18n/LanguageContext';
|
||||
|
||||
const ConsumerDetailModal = ({visible, group, address, onCancel}) => {
|
||||
const ConsumerDetailModal = ({visible, group, address, onCancel ,messageApi}) => {
|
||||
const {t} = useLanguage();
|
||||
const [loading, setLoading] = useState(false);
|
||||
const [details, setDetails] = useState([]);
|
||||
@@ -34,6 +34,10 @@ const ConsumerDetailModal = ({visible, group, address, onCancel}) => {
|
||||
const response = await remoteApi.queryTopicByConsumer(group, address);
|
||||
if (response.status === 0) {
|
||||
setDetails(response.data);
|
||||
}else {
|
||||
// Handle error case
|
||||
messageApi.error(response.errMsg);
|
||||
setDetails([]);
|
||||
}
|
||||
} finally {
|
||||
setLoading(false);
|
||||
|
@@ -18,15 +18,12 @@
|
||||
import React, {useEffect, useState} from 'react';
|
||||
import {Button, Checkbox, Modal, notification, Spin} from 'antd';
|
||||
import {remoteApi} from '../../api/remoteApi/remoteApi';
|
||||
import {useLanguage} from '../../i18n/LanguageContext';
|
||||
|
||||
const DeleteConsumerModal = ({visible, group, onCancel, onSuccess}) => {
|
||||
const {t} = useLanguage();
|
||||
const DeleteConsumerModal = ({visible, group, onCancel, onSuccess, t}) => {
|
||||
const [brokerList, setBrokerList] = useState([]);
|
||||
const [selectedBrokers, setSelectedBrokers] = useState([]);
|
||||
const [loading, setLoading] = useState(false);
|
||||
|
||||
// 获取Broker列表
|
||||
useEffect(() => {
|
||||
const fetchBrokers = async () => {
|
||||
if (!visible) return;
|
||||
@@ -45,7 +42,6 @@ const DeleteConsumerModal = ({visible, group, onCancel, onSuccess}) => {
|
||||
fetchBrokers();
|
||||
}, [visible, group]);
|
||||
|
||||
// 处理删除提交
|
||||
const handleDelete = async () => {
|
||||
if (selectedBrokers.length === 0) {
|
||||
notification.warning({message: t.PLEASE_SELECT_BROKER});
|
||||
|
@@ -592,9 +592,10 @@ export const translations = {
|
||||
"EXPRESSION_TYPE": "Expression Type",
|
||||
"SUB_VERSION": "Sub Version",
|
||||
"CODE_SET": "Code Set",
|
||||
"TAGS_SET": "Tags Set"
|
||||
|
||||
|
||||
"TAGS_SET": "Tags Set",
|
||||
"DELETE_CONSUMER_GROUP": "Delete Consumer Group",
|
||||
"SELECT_DELETE_BROKERS": "Please select brokers to delete consumer group",
|
||||
"CONFIRM_DELETE": "Confirm Delete",
|
||||
}
|
||||
|
||||
};
|
||||
|
@@ -288,7 +288,6 @@ const ConsumerGroupList = () => {
|
||||
setShowConfig(true);
|
||||
};
|
||||
|
||||
// 修改操作按钮的点击处理函数
|
||||
const handleClient = (group, address) => {
|
||||
setSelectedGroup(group);
|
||||
setSelectedAddress(address);
|
||||
@@ -550,12 +549,12 @@ const ConsumerGroupList = () => {
|
||||
/>
|
||||
</Spin>
|
||||
|
||||
{/* 模态框组件保持不变 */}
|
||||
<ClientInfoModal
|
||||
visible={showClientInfo}
|
||||
group={selectedGroup}
|
||||
address={selectedAddress}
|
||||
onCancel={() => setShowClientInfo(false)}
|
||||
messageApi={messageApi}
|
||||
/>
|
||||
|
||||
<ConsumerDetailModal
|
||||
@@ -563,6 +562,7 @@ const ConsumerGroupList = () => {
|
||||
group={selectedGroup}
|
||||
address={selectedAddress}
|
||||
onCancel={() => setShowConsumeDetail(false)}
|
||||
messageApi={messageApi}
|
||||
/>
|
||||
|
||||
<ConsumerConfigModal
|
||||
@@ -579,6 +579,7 @@ const ConsumerGroupList = () => {
|
||||
group={selectedGroup}
|
||||
onCancel={() => setShowDeleteModal(false)}
|
||||
onSuccess={loadConsumerGroups}
|
||||
t={t}
|
||||
/>
|
||||
</div>
|
||||
</>
|
||||
|
@@ -283,6 +283,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
|
||||
groupName = getConsumerGroup(groupName);
|
||||
List<ConsumeStats> consumeStatses = new ArrayList<>();
|
||||
String topic = null;
|
||||
try {
|
||||
@@ -295,9 +296,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
List<TopicConsumerInfo> res = new ArrayList<>();
|
||||
String finalGroupName = groupName;
|
||||
consumeStatses.forEach(consumeStats -> {
|
||||
if (consumeStats != null && consumeStats.getOffsetTable() != null && !consumeStats.getOffsetTable().isEmpty()) {
|
||||
res.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName));
|
||||
res.addAll(toTopicConsumerInfoList(topic, consumeStats, finalGroupName));
|
||||
}
|
||||
});
|
||||
return res;
|
||||
@@ -305,6 +307,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) {
|
||||
groupName = getConsumerGroup(groupName);
|
||||
ConsumeStats consumeStats = null;
|
||||
try {
|
||||
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
|
||||
@@ -316,6 +319,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
}
|
||||
|
||||
private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) {
|
||||
groupName = getConsumerGroup(groupName);
|
||||
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
|
||||
@Override
|
||||
public boolean apply(MessageQueue o) {
|
||||
@@ -339,6 +343,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
}
|
||||
|
||||
private Map<MessageQueue, String> getClientConnection(String groupName) {
|
||||
groupName = getConsumerGroup(groupName);
|
||||
Map<MessageQueue, String> results = Maps.newHashMap();
|
||||
try {
|
||||
ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName);
|
||||
@@ -417,7 +422,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) {
|
||||
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String consumerGroup) {
|
||||
consumerGroup = getConsumerGroup(consumerGroup);
|
||||
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
|
||||
try {
|
||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
||||
@@ -425,9 +431,9 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
|
||||
SubscriptionGroupConfig subscriptionGroupConfig = null;
|
||||
try {
|
||||
subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
|
||||
subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, consumerGroup);
|
||||
} catch (Exception e) {
|
||||
logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, group);
|
||||
logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, consumerGroup);
|
||||
}
|
||||
if (subscriptionGroupConfig == null) {
|
||||
continue;
|
||||
@@ -480,6 +486,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
|
||||
consumerConfigInfo.getSubscriptionGroupConfig().setGroupName(getConsumerGroup(consumerConfigInfo.getSubscriptionGroupConfig().getGroupName()));
|
||||
try {
|
||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
||||
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
|
||||
@@ -495,6 +502,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
|
||||
group = getConsumerGroup(group);
|
||||
Set<String> brokerNameSet = Sets.newHashSet();
|
||||
try {
|
||||
List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group);
|
||||
@@ -511,6 +519,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
|
||||
consumerGroup = getConsumerGroup(consumerGroup);
|
||||
try {
|
||||
String[] addresses = address.split(",");
|
||||
String addr = addresses[0];
|
||||
@@ -523,6 +532,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) {
|
||||
consumerGroup = getConsumerGroup(consumerGroup);
|
||||
try {
|
||||
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
|
||||
} catch (Exception e) {
|
||||
@@ -533,7 +543,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
|
||||
@Override
|
||||
public GroupConsumeInfo refreshGroup(String address, String consumerGroup) {
|
||||
|
||||
if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) {
|
||||
throw new RuntimeException("Cache is being built or empty, please try again later");
|
||||
}
|
||||
@@ -541,7 +550,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
for (int i = 0; i < cacheConsumeInfoList.size(); i++) {
|
||||
GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList.get(i);
|
||||
if (groupConsumeInfo.getGroup().equals(consumerGroup)) {
|
||||
GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, "");
|
||||
GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, address);
|
||||
updatedInfo.setUpdateTime(new Date());
|
||||
updatedInfo.setGroup(consumerGroup);
|
||||
updatedInfo.setAddress(consumerGroupMap.get(consumerGroup));
|
||||
@@ -559,4 +568,11 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
||||
consumerGroupMap.clear();
|
||||
return queryGroupList(false, address);
|
||||
}
|
||||
|
||||
public String getConsumerGroup(String consumerGroup) {
|
||||
if (consumerGroup != null && consumerGroup.startsWith("%SYS%")) {
|
||||
return consumerGroup.substring(5); // Remove "%SYS%" prefix
|
||||
}
|
||||
return consumerGroup;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user