Compare commits

...

3 Commits

Author SHA1 Message Date
RongtongJin
cd262da8b1 Update Notice to 2025. 2025-07-31 11:34:24 +08:00
Crazylychee
37dbd7f327 [ISSUE #348] Fix Some interaction issues with the consumer interface (#349) 2025-07-31 11:24:42 +08:00
strangelookingnerd
79556420f5 Use provided scope for lombok (#350) 2025-07-31 11:24:01 +08:00
9 changed files with 51 additions and 20 deletions

2
NOTICE
View File

@@ -1,5 +1,5 @@
Apache RocketMQ Apache RocketMQ
Copyright 2016-2022 The Apache Software Foundation Copyright 2016-2025 The Apache Software Foundation
This product includes software developed at This product includes software developed at
The Apache Software Foundation (http://www.apache.org/). The Apache Software Foundation (http://www.apache.org/).

View File

@@ -355,6 +355,7 @@ const remoteApi = {
*/ */
resendMessageDirectly: async (msgId, consumerGroup, topic) => { resendMessageDirectly: async (msgId, consumerGroup, topic) => {
topic = encodeURIComponent(topic) topic = encodeURIComponent(topic)
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/message/consumeMessageDirectly.do?msgId=${msgId}&consumerGroup=${consumerGroup}&topic=${topic}`), { const response = await remoteApi._fetch(remoteApi.buildUrl(`/message/consumeMessageDirectly.do?msgId=${msgId}&consumerGroup=${consumerGroup}&topic=${topic}`), {
method: 'POST', method: 'POST',
@@ -392,6 +393,7 @@ const remoteApi = {
}, },
refreshConsumerGroup: async (consumerGroup) => { refreshConsumerGroup: async (consumerGroup) => {
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/group.refresh?consumerGroup=${consumerGroup}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/group.refresh?consumerGroup=${consumerGroup}`));
const data = await response.json(); const data = await response.json();
@@ -443,6 +445,7 @@ const remoteApi = {
}, },
fetchBrokerNameList: async (consumerGroup) => { fetchBrokerNameList: async (consumerGroup) => {
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/fetchBrokerNameList.query?consumerGroup=${consumerGroup}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/fetchBrokerNameList.query?consumerGroup=${consumerGroup}`));
const data = await response.json(); const data = await response.json();
@@ -454,6 +457,7 @@ const remoteApi = {
}, },
deleteConsumerGroup: async (groupName, brokerNameList) => { deleteConsumerGroup: async (groupName, brokerNameList) => {
groupName = encodeURIComponent(groupName)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl("/consumer/deleteSubGroup.do"), { const response = await remoteApi._fetch(remoteApi.buildUrl("/consumer/deleteSubGroup.do"), {
method: 'POST', method: 'POST',
@@ -471,6 +475,7 @@ const remoteApi = {
}, },
queryConsumerConfig: async (consumerGroup) => { queryConsumerConfig: async (consumerGroup) => {
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/examineSubscriptionGroupConfig.query?consumerGroup=${consumerGroup}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/examineSubscriptionGroupConfig.query?consumerGroup=${consumerGroup}`));
const data = await response.json(); const data = await response.json();
@@ -499,6 +504,7 @@ const remoteApi = {
}, },
queryTopicByConsumer: async (consumerGroup, address) => { queryTopicByConsumer: async (consumerGroup, address) => {
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/queryTopicByConsumer.query?consumerGroup=${consumerGroup}&address=${address}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/queryTopicByConsumer.query?consumerGroup=${consumerGroup}&address=${address}`));
const data = await response.json(); const data = await response.json();
@@ -510,6 +516,7 @@ const remoteApi = {
}, },
queryConsumerConnection: async (consumerGroup, address) => { queryConsumerConnection: async (consumerGroup, address) => {
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerConnection.query?consumerGroup=${consumerGroup}&address=${address}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerConnection.query?consumerGroup=${consumerGroup}&address=${address}`));
const data = await response.json(); const data = await response.json();
@@ -521,6 +528,7 @@ const remoteApi = {
}, },
queryConsumerRunningInfo: async (consumerGroup, clientId, jstack = false) => { queryConsumerRunningInfo: async (consumerGroup, clientId, jstack = false) => {
consumerGroup = encodeURIComponent(consumerGroup)
try { try {
const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerRunningInfo.query?consumerGroup=${consumerGroup}&clientId=${clientId}&jstack=${jstack}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/consumer/consumerRunningInfo.query?consumerGroup=${consumerGroup}&clientId=${clientId}&jstack=${jstack}`));
const data = await response.json(); const data = await response.json();

View File

@@ -21,7 +21,7 @@ import {remoteApi} from '../../api/remoteApi/remoteApi';
import {useLanguage} from '../../i18n/LanguageContext'; import {useLanguage} from '../../i18n/LanguageContext';
const ClientInfoModal = ({visible, group, address, onCancel}) => { const ClientInfoModal = ({visible, group, address, onCancel, messageApi}) => {
const {t} = useLanguage(); const {t} = useLanguage();
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
const [connectionData, setConnectionData] = useState(null); const [connectionData, setConnectionData] = useState(null);
@@ -34,7 +34,11 @@ const ClientInfoModal = ({visible, group, address, onCancel}) => {
try { try {
const connResponse = await remoteApi.queryConsumerConnection(group, address); const connResponse = await remoteApi.queryConsumerConnection(group, address);
if (connResponse.status === 0) setConnectionData(connResponse.data); if (connResponse.status === 0) {
setConnectionData(connResponse.data);
}else{
messageApi.error(connResponse.errMsg);
}
} finally { } finally {
setLoading(false); setLoading(false);
} }

View File

@@ -20,7 +20,7 @@ import {Modal, Spin, Table} from 'antd';
import {remoteApi} from '../../api/remoteApi/remoteApi'; import {remoteApi} from '../../api/remoteApi/remoteApi';
import {useLanguage} from '../../i18n/LanguageContext'; import {useLanguage} from '../../i18n/LanguageContext';
const ConsumerDetailModal = ({visible, group, address, onCancel}) => { const ConsumerDetailModal = ({visible, group, address, onCancel ,messageApi}) => {
const {t} = useLanguage(); const {t} = useLanguage();
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
const [details, setDetails] = useState([]); const [details, setDetails] = useState([]);
@@ -34,6 +34,10 @@ const ConsumerDetailModal = ({visible, group, address, onCancel}) => {
const response = await remoteApi.queryTopicByConsumer(group, address); const response = await remoteApi.queryTopicByConsumer(group, address);
if (response.status === 0) { if (response.status === 0) {
setDetails(response.data); setDetails(response.data);
}else {
// Handle error case
messageApi.error(response.errMsg);
setDetails([]);
} }
} finally { } finally {
setLoading(false); setLoading(false);

View File

@@ -18,15 +18,12 @@
import React, {useEffect, useState} from 'react'; import React, {useEffect, useState} from 'react';
import {Button, Checkbox, Modal, notification, Spin} from 'antd'; import {Button, Checkbox, Modal, notification, Spin} from 'antd';
import {remoteApi} from '../../api/remoteApi/remoteApi'; import {remoteApi} from '../../api/remoteApi/remoteApi';
import {useLanguage} from '../../i18n/LanguageContext';
const DeleteConsumerModal = ({visible, group, onCancel, onSuccess}) => { const DeleteConsumerModal = ({visible, group, onCancel, onSuccess, t}) => {
const {t} = useLanguage();
const [brokerList, setBrokerList] = useState([]); const [brokerList, setBrokerList] = useState([]);
const [selectedBrokers, setSelectedBrokers] = useState([]); const [selectedBrokers, setSelectedBrokers] = useState([]);
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
// 获取Broker列表
useEffect(() => { useEffect(() => {
const fetchBrokers = async () => { const fetchBrokers = async () => {
if (!visible) return; if (!visible) return;
@@ -45,7 +42,6 @@ const DeleteConsumerModal = ({visible, group, onCancel, onSuccess}) => {
fetchBrokers(); fetchBrokers();
}, [visible, group]); }, [visible, group]);
// 处理删除提交
const handleDelete = async () => { const handleDelete = async () => {
if (selectedBrokers.length === 0) { if (selectedBrokers.length === 0) {
notification.warning({message: t.PLEASE_SELECT_BROKER}); notification.warning({message: t.PLEASE_SELECT_BROKER});

View File

@@ -592,9 +592,10 @@ export const translations = {
"EXPRESSION_TYPE": "Expression Type", "EXPRESSION_TYPE": "Expression Type",
"SUB_VERSION": "Sub Version", "SUB_VERSION": "Sub Version",
"CODE_SET": "Code Set", "CODE_SET": "Code Set",
"TAGS_SET": "Tags Set" "TAGS_SET": "Tags Set",
"DELETE_CONSUMER_GROUP": "Delete Consumer Group",
"SELECT_DELETE_BROKERS": "Please select brokers to delete consumer group",
"CONFIRM_DELETE": "Confirm Delete",
} }
}; };

View File

@@ -288,7 +288,6 @@ const ConsumerGroupList = () => {
setShowConfig(true); setShowConfig(true);
}; };
// 修改操作按钮的点击处理函数
const handleClient = (group, address) => { const handleClient = (group, address) => {
setSelectedGroup(group); setSelectedGroup(group);
setSelectedAddress(address); setSelectedAddress(address);
@@ -550,12 +549,12 @@ const ConsumerGroupList = () => {
/> />
</Spin> </Spin>
{/* 模态框组件保持不变 */}
<ClientInfoModal <ClientInfoModal
visible={showClientInfo} visible={showClientInfo}
group={selectedGroup} group={selectedGroup}
address={selectedAddress} address={selectedAddress}
onCancel={() => setShowClientInfo(false)} onCancel={() => setShowClientInfo(false)}
messageApi={messageApi}
/> />
<ConsumerDetailModal <ConsumerDetailModal
@@ -563,6 +562,7 @@ const ConsumerGroupList = () => {
group={selectedGroup} group={selectedGroup}
address={selectedAddress} address={selectedAddress}
onCancel={() => setShowConsumeDetail(false)} onCancel={() => setShowConsumeDetail(false)}
messageApi={messageApi}
/> />
<ConsumerConfigModal <ConsumerConfigModal
@@ -579,6 +579,7 @@ const ConsumerGroupList = () => {
group={selectedGroup} group={selectedGroup}
onCancel={() => setShowDeleteModal(false)} onCancel={() => setShowDeleteModal(false)}
onSuccess={loadConsumerGroups} onSuccess={loadConsumerGroups}
t={t}
/> />
</div> </div>
</> </>

View File

@@ -250,6 +250,7 @@
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>${lombok.version}</version> <version>${lombok.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>

View File

@@ -283,6 +283,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) { public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
groupName = getConsumerGroup(groupName);
List<ConsumeStats> consumeStatses = new ArrayList<>(); List<ConsumeStats> consumeStatses = new ArrayList<>();
String topic = null; String topic = null;
try { try {
@@ -295,9 +296,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
throw new RuntimeException(e); throw new RuntimeException(e);
} }
List<TopicConsumerInfo> res = new ArrayList<>(); List<TopicConsumerInfo> res = new ArrayList<>();
String finalGroupName = groupName;
consumeStatses.forEach(consumeStats -> { consumeStatses.forEach(consumeStats -> {
if (consumeStats != null && consumeStats.getOffsetTable() != null && !consumeStats.getOffsetTable().isEmpty()) { if (consumeStats != null && consumeStats.getOffsetTable() != null && !consumeStats.getOffsetTable().isEmpty()) {
res.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName)); res.addAll(toTopicConsumerInfoList(topic, consumeStats, finalGroupName));
} }
}); });
return res; return res;
@@ -305,6 +307,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) { public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) {
groupName = getConsumerGroup(groupName);
ConsumeStats consumeStats = null; ConsumeStats consumeStats = null;
try { try {
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
@@ -316,6 +319,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) { private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) {
groupName = getConsumerGroup(groupName);
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
@Override @Override
public boolean apply(MessageQueue o) { public boolean apply(MessageQueue o) {
@@ -339,6 +343,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
private Map<MessageQueue, String> getClientConnection(String groupName) { private Map<MessageQueue, String> getClientConnection(String groupName) {
groupName = getConsumerGroup(groupName);
Map<MessageQueue, String> results = Maps.newHashMap(); Map<MessageQueue, String> results = Maps.newHashMap();
try { try {
ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName); ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName);
@@ -417,7 +422,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) { public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String consumerGroup) {
consumerGroup = getConsumerGroup(consumerGroup);
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList(); List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
try { try {
ClusterInfo clusterInfo = clusterInfoService.get(); ClusterInfo clusterInfo = clusterInfoService.get();
@@ -425,9 +431,9 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
SubscriptionGroupConfig subscriptionGroupConfig = null; SubscriptionGroupConfig subscriptionGroupConfig = null;
try { try {
subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, consumerGroup);
} catch (Exception e) { } catch (Exception e) {
logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, group); logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, consumerGroup);
} }
if (subscriptionGroupConfig == null) { if (subscriptionGroupConfig == null) {
continue; continue;
@@ -480,6 +486,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
consumerConfigInfo.getSubscriptionGroupConfig().setGroupName(getConsumerGroup(consumerConfigInfo.getSubscriptionGroupConfig().getGroupName()));
try { try {
ClusterInfo clusterInfo = clusterInfoService.get(); ClusterInfo clusterInfo = clusterInfoService.get();
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
@@ -495,6 +502,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) { public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
group = getConsumerGroup(group);
Set<String> brokerNameSet = Sets.newHashSet(); Set<String> brokerNameSet = Sets.newHashSet();
try { try {
List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group); List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group);
@@ -511,6 +519,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public ConsumerConnection getConsumerConnection(String consumerGroup, String address) { public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
consumerGroup = getConsumerGroup(consumerGroup);
try { try {
String[] addresses = address.split(","); String[] addresses = address.split(",");
String addr = addresses[0]; String addr = addresses[0];
@@ -523,6 +532,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) { public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) {
consumerGroup = getConsumerGroup(consumerGroup);
try { try {
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
} catch (Exception e) { } catch (Exception e) {
@@ -533,7 +543,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public GroupConsumeInfo refreshGroup(String address, String consumerGroup) { public GroupConsumeInfo refreshGroup(String address, String consumerGroup) {
if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) { if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) {
throw new RuntimeException("Cache is being built or empty, please try again later"); throw new RuntimeException("Cache is being built or empty, please try again later");
} }
@@ -541,7 +550,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
for (int i = 0; i < cacheConsumeInfoList.size(); i++) { for (int i = 0; i < cacheConsumeInfoList.size(); i++) {
GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList.get(i); GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList.get(i);
if (groupConsumeInfo.getGroup().equals(consumerGroup)) { if (groupConsumeInfo.getGroup().equals(consumerGroup)) {
GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, ""); GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, address);
updatedInfo.setUpdateTime(new Date()); updatedInfo.setUpdateTime(new Date());
updatedInfo.setGroup(consumerGroup); updatedInfo.setGroup(consumerGroup);
updatedInfo.setAddress(consumerGroupMap.get(consumerGroup)); updatedInfo.setAddress(consumerGroupMap.get(consumerGroup));
@@ -559,4 +568,11 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
consumerGroupMap.clear(); consumerGroupMap.clear();
return queryGroupList(false, address); return queryGroupList(false, address);
} }
public String getConsumerGroup(String consumerGroup) {
if (consumerGroup != null && consumerGroup.startsWith("%SYS%")) {
return consumerGroup.substring(5); // Remove "%SYS%" prefix
}
return consumerGroup;
}
} }