From a5138eb0d817687462299d15eb2e81f89c137a7e Mon Sep 17 00:00:00 2001 From: Crazylychee <110229037+Crazylychee@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:21:57 +0800 Subject: [PATCH] [ISSUE #317] Removed useless topic cache --- frontend-new/src/api/remoteApi/remoteApi.js | 16 --------- .../topic/SendTopicMessageDialog.jsx | 7 ++-- frontend-new/src/pages/Acl/acl.jsx | 2 +- frontend-new/src/pages/Topic/topic.jsx | 30 +++------------- .../dashboard/controller/TopicController.java | 5 --- .../dashboard/service/TopicService.java | 1 - .../service/client/MQAdminExtImpl.java | 31 ++++++---------- .../service/impl/LoginServiceImpl.java | 1 + .../service/impl/TopicServiceImpl.java | 36 +++---------------- .../service/impl/UserServiceImpl.java | 5 ++- .../provider/UserInfoProviderImpl.java | 5 ++- 11 files changed, 31 insertions(+), 108 deletions(-) diff --git a/frontend-new/src/api/remoteApi/remoteApi.js b/frontend-new/src/api/remoteApi/remoteApi.js index 3a442c1..048ede9 100644 --- a/frontend-new/src/api/remoteApi/remoteApi.js +++ b/frontend-new/src/api/remoteApi/remoteApi.js @@ -536,22 +536,6 @@ const remoteApi = { } }, - refreshTopicList: async () => { - try { - const response = await remoteApi._fetch(remoteApi.buildUrl("/topic/refresh"), { - method: 'POST' - }); - const result = await response.json(); - if (result.status === 0 && result.data === true) { - return remoteApi.queryTopicList(); - } - return result; - } catch (error) { - console.error("Error refreshing topic list:", error); - return { status: 1, errMsg: "Failed to refresh topic list" }; - } - }, - deleteTopic: async (topic) => { try { const url = remoteApi.buildUrl(`/topic/deleteTopic.do?topic=${encodeURIComponent(topic)}`); diff --git a/frontend-new/src/components/topic/SendTopicMessageDialog.jsx b/frontend-new/src/components/topic/SendTopicMessageDialog.jsx index 7b9a010..002634d 100644 --- a/frontend-new/src/components/topic/SendTopicMessageDialog.jsx +++ b/frontend-new/src/components/topic/SendTopicMessageDialog.jsx @@ -15,7 +15,7 @@ * limitations under the License. */ -import {Button, Checkbox, Form, Input, message, Modal} from "antd"; +import {Button, Checkbox, Form, Input, Modal} from "antd"; import React, {useEffect} from "react"; import {remoteApi} from "../../api/remoteApi/remoteApi"; @@ -26,6 +26,7 @@ const SendTopicMessageDialog = ({ setSendResultData, setIsSendResultModalVisible, setIsSendTopicMessageModalVisible, + message, t, }) => { const [form] = Form.useForm(); @@ -46,8 +47,8 @@ const SendTopicMessageDialog = ({ const handleSendTopicMessage = async () => { try { - const values = await form.validateFields(); // ๐Ÿ‘ˆ ไปŽ่กจๅ•่Žทๅ–ๆœ€ๆ–ฐๅ€ผ - const result = await remoteApi.sendTopicMessage(values); // ๐Ÿ‘ˆ ็”จ่กจๅ•ๆ•ฐๆฎๅ‘้€ + const values = await form.validateFields(); + const result = await remoteApi.sendTopicMessage(values); if (result.status === 0) { setSendResultData(result.data); setIsSendResultModalVisible(true); diff --git a/frontend-new/src/pages/Acl/acl.jsx b/frontend-new/src/pages/Acl/acl.jsx index 00e89a2..dd8989e 100644 --- a/frontend-new/src/pages/Acl/acl.jsx +++ b/frontend-new/src/pages/Acl/acl.jsx @@ -488,7 +488,7 @@ const Acl = () => { dataIndex: 'userStatus', key: 'userStatus', render: (status) => ( - {status} + {status} ), }, { diff --git a/frontend-new/src/pages/Topic/topic.jsx b/frontend-new/src/pages/Topic/topic.jsx index 5363a2b..aa41d0d 100644 --- a/frontend-new/src/pages/Topic/topic.jsx +++ b/frontend-new/src/pages/Topic/topic.jsx @@ -172,28 +172,7 @@ const DeployHistoryList = () => { } }; - const refreshTopicList = async () => { - setLoading(true); - try { - const result = await remoteApi.refreshTopicList(); - if (result.status === 0) { - setAllTopicList(result.data.topicNameList); - setAllMessageTypeList(result.data.messageTypeList); - setPaginationConf(prev => ({ - ...prev, - total: result.data.topicNameList.length - })); - messageApi.success(t.REFRESHING_TOPIC_LIST); - } else { - messageApi.error(result.errMsg); - } - } catch (error) { - console.error("Error refreshing topic list:", error); - messageApi.error("Failed to refresh topic list"); - } finally { - setLoading(false); - } - }; + const filterList = (currentPage) => { const lowExceptStr = filterStr.toLowerCase(); @@ -298,7 +277,7 @@ const DeployHistoryList = () => { messageApi.success(t.TOPIC_OPERATION_SUCCESS); closeAddUpdateDialog(); if(!isUpdateMode) { - refreshTopicList(); + await getTopicList() } } else { messageApi.error(result.errMsg); @@ -316,7 +295,7 @@ const DeployHistoryList = () => { if (result.status === 0) { messageApi.success(`${t.TOPIC} [${topicToDelete}] ${t.DELETED_SUCCESSFULLY}`); setAllTopicList(allTopicList.filter(topic => topic !== topicToDelete)); - await refreshTopicList() + await getTopicList() } else { messageApi.error(result.errMsg); } @@ -614,7 +593,7 @@ const DeployHistoryList = () => { )} - @@ -714,6 +693,7 @@ const DeployHistoryList = () => { setIsSendResultModalVisible={setIsSendResultModalVisible} setIsSendTopicMessageModalVisible={setIsSendTopicMessageModalVisible} sendTopicMessageData={sendTopicMessageData} + message={messageApi} t={t} /> diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java index 039983a..7b9f4e1 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java @@ -55,11 +55,6 @@ public class TopicController { return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq); } - @RequestMapping(value = "/refresh", method = {RequestMethod.POST}) - @ResponseBody - public Object refresh() { - return topicService.refreshTopicList(); - } @RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET) @ResponseBody diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java index fe2c1ed..cae9e27 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java @@ -54,5 +54,4 @@ public interface TopicService { SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest); - boolean refreshTopicList(); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 15270bc..c1e5fd1 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -63,10 +63,12 @@ import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; @@ -84,8 +86,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; @@ -93,14 +93,9 @@ import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; public class MQAdminExtImpl implements MQAdminExt { private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class); - private static final ConcurrentMap TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>(); public MQAdminExtImpl() {} - public static void clearTopicConfigCache() { - TOPIC_CONFIG_CACHE.clear(); - } - @Override public void updateBrokerConfig(String brokerAddr, Properties properties) @@ -157,15 +152,11 @@ public class MQAdminExtImpl implements MQAdminExt { @Override public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException { - TopicConfigSerializeWrapper cachedWrapper = TOPIC_CONFIG_CACHE.get(addr); - - if (cachedWrapper != null && cachedWrapper.getTopicConfigTable().containsKey(topic)) { - return cachedWrapper.getTopicConfigTable().get(topic); - } - RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); - RemotingCommand response = null; + GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); + header.setTopic(topic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header); + RemotingCommand response; try { response = remotingClient.invokeSync(addr, request, 3000); } catch (Exception err) { @@ -174,11 +165,11 @@ public class MQAdminExtImpl implements MQAdminExt { } switch (response.getCode()) { case ResponseCode.SUCCESS: { - TopicConfigSerializeWrapper topicConfigSerializeWrapper = - decode(response.getBody(), TopicConfigSerializeWrapper.class); - - TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper); - return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); + TopicConfigAndQueueMapping topicConfigAndQueueMapping = decode(response.getBody(), TopicConfigAndQueueMapping.class); + if (topicConfigAndQueueMapping == null) { + throw new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist: " + topic); + } + return topicConfigAndQueueMapping; } default: throw new MQBrokerException(response.getCode(), response.getRemark()); diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/LoginServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/LoginServiceImpl.java index 7358382..31f1613 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/LoginServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/LoginServiceImpl.java @@ -56,6 +56,7 @@ public class LoginServiceImpl implements LoginService { if (username != null) { UserInfo userInfo = userInfoProvider.getUserInfoByUsername(username); if (userInfo == null) { + auth(request, response); return false; } UserInfoContext.set(WebUtil.USER_NAME, userInfo); diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index a135dff..39a9bf1 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -45,7 +45,6 @@ import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.ClusterInfoService; import org.apache.rocketmq.dashboard.service.TopicService; -import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl; import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; @@ -71,7 +70,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -85,9 +83,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Autowired private ClusterInfoService clusterInfoService; - private final ConcurrentMap routeCache = new ConcurrentHashMap<>(); - private final Object cacheLock = new Object(); - private transient DefaultMQProducer systemTopicProducer; private final Object producerLock = new Object(); @@ -195,24 +190,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public TopicRouteData route(String topic) { - TopicRouteData cachedData = routeCache.get(topic); - if (cachedData != null) { - return cachedData; - } - - synchronized (cacheLock) { - cachedData = routeCache.get(topic); - if (cachedData != null) { - return cachedData; - } - try { - TopicRouteData freshData = mqAdminExt.examineTopicRouteInfo(topic); - routeCache.put(topic, freshData); - return freshData; - } catch (Exception ex) { - Throwables.throwIfUnchecked(ex); - throw new RuntimeException(ex); - } + try { + return mqAdminExt.examineTopicRouteInfo(topic); + } catch (Exception ex) { + Throwables.throwIfUnchecked(ex); + throw new RuntimeException(ex); } } @@ -228,7 +210,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { - MQAdminExtImpl.clearTopicConfigCache(); TopicConfig topicConfig = new TopicConfig(); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); String messageType = topicCreateOrUpdateRequest.getMessageType(); @@ -455,13 +436,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } - @Override - public boolean refreshTopicList() { - routeCache.clear(); - clusterInfoService.refresh(); - MQAdminExtImpl.clearTopicConfigCache(); - return true; - } private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { if (!traceEnabled) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/UserServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/UserServiceImpl.java index 117f1d6..4b8e191 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/UserServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/UserServiceImpl.java @@ -24,13 +24,12 @@ import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.User; import org.apache.rocketmq.dashboard.service.UserService; import org.apache.rocketmq.dashboard.service.provider.UserInfoProvider; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Service public class UserServiceImpl implements UserService { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/provider/UserInfoProviderImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/provider/UserInfoProviderImpl.java index 2e9d37a..a5aaf6b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/provider/UserInfoProviderImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/provider/UserInfoProviderImpl.java @@ -17,15 +17,14 @@ package org.apache.rocketmq.dashboard.service.provider; import org.apache.rocketmq.dashboard.service.ClusterInfoService; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Service public class UserInfoProviderImpl implements UserInfoProvider {