[ISSUE #317] Removed useless topic cache

This commit is contained in:
Crazylychee
2025-06-24 15:21:57 +08:00
committed by GitHub
parent b43c7abe52
commit a5138eb0d8
11 changed files with 31 additions and 108 deletions

View File

@@ -55,11 +55,6 @@ public class TopicController {
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
}
@RequestMapping(value = "/refresh", method = {RequestMethod.POST})
@ResponseBody
public Object refresh() {
return topicService.refreshTopicList();
}
@RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
@ResponseBody

View File

@@ -54,5 +54,4 @@ public interface TopicService {
SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest);
boolean refreshTopicList();
}

View File

@@ -63,10 +63,12 @@ import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -84,8 +86,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@@ -93,14 +93,9 @@ import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
public class MQAdminExtImpl implements MQAdminExt {
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
private static final ConcurrentMap<String, TopicConfigSerializeWrapper> TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>();
public MQAdminExtImpl() {}
public static void clearTopicConfigCache() {
TOPIC_CONFIG_CACHE.clear();
}
@Override
public void updateBrokerConfig(String brokerAddr, Properties properties)
@@ -157,15 +152,11 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException {
TopicConfigSerializeWrapper cachedWrapper = TOPIC_CONFIG_CACHE.get(addr);
if (cachedWrapper != null && cachedWrapper.getTopicConfigTable().containsKey(topic)) {
return cachedWrapper.getTopicConfigTable().get(topic);
}
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null;
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header);
RemotingCommand response;
try {
response = remotingClient.invokeSync(addr, request, 3000);
} catch (Exception err) {
@@ -174,11 +165,11 @@ public class MQAdminExtImpl implements MQAdminExt {
}
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
TopicConfigSerializeWrapper topicConfigSerializeWrapper =
decode(response.getBody(), TopicConfigSerializeWrapper.class);
TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper);
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
TopicConfigAndQueueMapping topicConfigAndQueueMapping = decode(response.getBody(), TopicConfigAndQueueMapping.class);
if (topicConfigAndQueueMapping == null) {
throw new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist: " + topic);
}
return topicConfigAndQueueMapping;
}
default:
throw new MQBrokerException(response.getCode(), response.getRemark());

View File

@@ -56,6 +56,7 @@ public class LoginServiceImpl implements LoginService {
if (username != null) {
UserInfo userInfo = userInfoProvider.getUserInfoByUsername(username);
if (userInfo == null) {
auth(request, response);
return false;
}
UserInfoContext.set(WebUtil.USER_NAME, userInfo);

View File

@@ -45,7 +45,6 @@ import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.dashboard.service.TopicService;
import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
@@ -71,7 +70,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -85,9 +83,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Autowired
private ClusterInfoService clusterInfoService;
private final ConcurrentMap<String, TopicRouteData> routeCache = new ConcurrentHashMap<>();
private final Object cacheLock = new Object();
private transient DefaultMQProducer systemTopicProducer;
private final Object producerLock = new Object();
@@ -195,24 +190,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Override
public TopicRouteData route(String topic) {
TopicRouteData cachedData = routeCache.get(topic);
if (cachedData != null) {
return cachedData;
}
synchronized (cacheLock) {
cachedData = routeCache.get(topic);
if (cachedData != null) {
return cachedData;
}
try {
TopicRouteData freshData = mqAdminExt.examineTopicRouteInfo(topic);
routeCache.put(topic, freshData);
return freshData;
} catch (Exception ex) {
Throwables.throwIfUnchecked(ex);
throw new RuntimeException(ex);
}
try {
return mqAdminExt.examineTopicRouteInfo(topic);
} catch (Exception ex) {
Throwables.throwIfUnchecked(ex);
throw new RuntimeException(ex);
}
}
@@ -228,7 +210,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Override
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
MQAdminExtImpl.clearTopicConfigCache();
TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
String messageType = topicCreateOrUpdateRequest.getMessageType();
@@ -455,13 +436,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
}
@Override
public boolean refreshTopicList() {
routeCache.clear();
clusterInfoService.refresh();
MQAdminExtImpl.clearTopicConfigCache();
return true;
}
private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
if (!traceEnabled) {

View File

@@ -24,13 +24,12 @@ import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.User;
import org.apache.rocketmq.dashboard.service.UserService;
import org.apache.rocketmq.dashboard.service.provider.UserInfoProvider;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class UserServiceImpl implements UserService {

View File

@@ -17,15 +17,14 @@
package org.apache.rocketmq.dashboard.service.provider;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class UserInfoProviderImpl implements UserInfoProvider {