mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-06-08 22:03:21 +08:00
@@ -45,7 +45,7 @@ import org.springframework.stereotype.Service;
|
||||
public class DashboardCollectServiceImpl implements DashboardCollectService {
|
||||
|
||||
@Resource
|
||||
private RMQConfigure rmqConfigure;
|
||||
private RMQConfigure configure;
|
||||
|
||||
private final static Logger log = LoggerFactory.getLogger(DashboardCollectServiceImpl.class);
|
||||
|
||||
@@ -133,7 +133,7 @@ public class DashboardCollectServiceImpl implements DashboardCollectService {
|
||||
|
||||
@Override
|
||||
public Map<String, List<String>> getBrokerCache(String date) {
|
||||
String dataLocationPath = rmqConfigure.getConsoleCollectData();
|
||||
String dataLocationPath = configure.getConsoleCollectData();
|
||||
File file = new File(dataLocationPath + date + ".json");
|
||||
if (!file.exists()) {
|
||||
log.info(String.format("No dashboard data for broker cache data: %s", date));
|
||||
@@ -144,7 +144,7 @@ public class DashboardCollectServiceImpl implements DashboardCollectService {
|
||||
|
||||
@Override
|
||||
public Map<String, List<String>> getTopicCache(String date) {
|
||||
String dataLocationPath = rmqConfigure.getConsoleCollectData();
|
||||
String dataLocationPath = configure.getConsoleCollectData();
|
||||
File file = new File(dataLocationPath + date + "_topic" + ".json");
|
||||
if (!file.exists()) {
|
||||
log.info(String.format("No dashboard data for data: %s", date));
|
||||
|
||||
@@ -80,7 +80,7 @@ public class MessageServiceImpl implements MessageService {
|
||||
.build();
|
||||
|
||||
@Autowired
|
||||
private RMQConfigure rMQConfigure;
|
||||
private RMQConfigure configure;
|
||||
/**
|
||||
* @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
|
||||
* @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
|
||||
@@ -117,10 +117,10 @@ public class MessageServiceImpl implements MessageService {
|
||||
|
||||
@Override
|
||||
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
||||
RPCHook rpcHook = null;
|
||||
if (isEnableAcl) {
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey()));
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
||||
}
|
||||
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
|
||||
List<MessageView> messageViewList = Lists.newArrayList();
|
||||
@@ -250,10 +250,10 @@ public class MessageServiceImpl implements MessageService {
|
||||
}
|
||||
|
||||
private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
||||
RPCHook rpcHook = null;
|
||||
if (isEnableAcl) {
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey()));
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
||||
}
|
||||
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
|
||||
|
||||
@@ -391,10 +391,10 @@ public class MessageServiceImpl implements MessageService {
|
||||
}
|
||||
|
||||
private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
||||
RPCHook rpcHook = null;
|
||||
if (isEnableAcl) {
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey()));
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
||||
}
|
||||
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
|
||||
List<MessageView> messageViews = new ArrayList<>();
|
||||
|
||||
@@ -59,11 +59,11 @@ public class MessageTraceServiceImpl implements MessageTraceService {
|
||||
private MQAdminExt mqAdminExt;
|
||||
|
||||
@Resource
|
||||
private RMQConfigure rmqConfigure;
|
||||
private RMQConfigure configure;
|
||||
|
||||
@Override
|
||||
public List<MessageTraceView> queryMessageTraceKey(String key) {
|
||||
String queryTopic = rmqConfigure.getMsgTrackTopicName();
|
||||
String queryTopic = configure.getMsgTrackTopicName();
|
||||
if (StringUtils.isEmpty(queryTopic)) {
|
||||
queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ public class MonitorServiceImpl implements MonitorService {
|
||||
|
||||
|
||||
@Resource
|
||||
private RMQConfigure rmqConfigure;
|
||||
private RMQConfigure configure;
|
||||
|
||||
private Map<String, ConsumerMonitorConfig> configMap = new ConcurrentHashMap<>();
|
||||
|
||||
@@ -66,7 +66,7 @@ public class MonitorServiceImpl implements MonitorService {
|
||||
|
||||
//rocketmq.console.data.path/monitor/consumerMonitorConfig.json
|
||||
private String getConsumerMonitorConfigDataPath() {
|
||||
return rmqConfigure.getRocketMqConsoleDataPath() + File.separatorChar + "monitor" + File.separatorChar + "consumerMonitorConfig.json";
|
||||
return configure.getRocketMqConsoleDataPath() + File.separatorChar + "monitor" + File.separatorChar + "consumerMonitorConfig.json";
|
||||
}
|
||||
|
||||
private String getConsumerMonitorConfigDataPathBackUp() {
|
||||
|
||||
@@ -32,7 +32,7 @@ import org.springframework.stereotype.Service;
|
||||
public class OpsServiceImpl extends AbstractCommonService implements OpsService {
|
||||
|
||||
@Resource
|
||||
private RMQConfigure rMQConfigure;
|
||||
private RMQConfigure configure;
|
||||
|
||||
@Resource
|
||||
private List<RocketMqChecker> rocketMqCheckerList;
|
||||
@@ -40,19 +40,19 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
|
||||
@Override
|
||||
public Map<String, Object> homePageInfo() {
|
||||
Map<String, Object> homePageInfoMap = Maps.newHashMap();
|
||||
homePageInfoMap.put("namesvrAddrList", Splitter.on(";").splitToList(rMQConfigure.getNamesrvAddr()));
|
||||
homePageInfoMap.put("useVIPChannel", Boolean.valueOf(rMQConfigure.getIsVIPChannel()));
|
||||
homePageInfoMap.put("namesvrAddrList", Splitter.on(";").splitToList(configure.getNamesrvAddr()));
|
||||
homePageInfoMap.put("useVIPChannel", Boolean.valueOf(configure.getIsVIPChannel()));
|
||||
return homePageInfoMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateNameSvrAddrList(String nameSvrAddrList) {
|
||||
rMQConfigure.setNamesrvAddr(nameSvrAddrList);
|
||||
configure.setNamesrvAddr(nameSvrAddrList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNameSvrList() {
|
||||
return rMQConfigure.getNamesrvAddr();
|
||||
return configure.getNamesrvAddr();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -65,7 +65,7 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
|
||||
}
|
||||
|
||||
@Override public boolean updateIsVIPChannel(String useVIPChannel) {
|
||||
rMQConfigure.setIsVIPChannel(useVIPChannel);
|
||||
configure.setIsVIPChannel(useVIPChannel);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,12 +54,12 @@ import java.util.Set;
|
||||
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
|
||||
|
||||
@Autowired
|
||||
private RMQConfigure rMQConfigure;
|
||||
private RMQConfigure configure;
|
||||
|
||||
@Override
|
||||
public TopicList fetchAllTopicList(boolean skipSysProcess) {
|
||||
try {
|
||||
TopicList allTopics = mqAdminExt.fetchAllTopicList();
|
||||
TopicList allTopics = mqAdminExt.fetchAllTopicList();
|
||||
if (skipSysProcess) {
|
||||
return allTopics;
|
||||
}
|
||||
@@ -67,7 +67,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
TopicList sysTopics = getSystemTopicList();
|
||||
Set<String> topics = new HashSet<>();
|
||||
|
||||
for (String topic: allTopics.getTopicList()) {
|
||||
for (String topic : allTopics.getTopicList()) {
|
||||
if (sysTopics.getTopicList().contains(topic)) {
|
||||
topics.add(String.format("%s%s", "%SYS%", topic));
|
||||
} else {
|
||||
@@ -77,8 +77,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
allTopics.getTopicList().clear();
|
||||
allTopics.getTopicList().addAll(topics);
|
||||
return allTopics;
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
@@ -87,8 +86,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
public TopicStatsTable stats(String topic) {
|
||||
try {
|
||||
return mqAdminExt.examineTopicStats(topic);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
@@ -97,8 +95,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
public TopicRouteData route(String topic) {
|
||||
try {
|
||||
return mqAdminExt.examineTopicRouteInfo(topic);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
} catch (Exception ex) {
|
||||
throw Throwables.propagate(ex);
|
||||
}
|
||||
}
|
||||
@@ -107,8 +104,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
public GroupList queryTopicConsumerInfo(String topic) {
|
||||
try {
|
||||
return mqAdminExt.queryTopicConsumeByWho(topic);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
@@ -123,8 +119,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) {
|
||||
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
|
||||
}
|
||||
}
|
||||
catch (Exception err) {
|
||||
} catch (Exception err) {
|
||||
throw Throwables.propagate(err);
|
||||
}
|
||||
}
|
||||
@@ -134,8 +129,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
ClusterInfo clusterInfo = null;
|
||||
try {
|
||||
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic);
|
||||
@@ -164,13 +158,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName);
|
||||
mqAdminExt.deleteTopicInBroker(masterSet, topic);
|
||||
Set<String> nameServerSet = null;
|
||||
if (StringUtils.isNotBlank(rMQConfigure.getNamesrvAddr())) {
|
||||
String[] ns = rMQConfigure.getNamesrvAddr().split(";");
|
||||
if (StringUtils.isNotBlank(configure.getNamesrvAddr())) {
|
||||
String[] ns = configure.getNamesrvAddr().split(";");
|
||||
nameServerSet = new HashSet<String>(Arrays.asList(ns));
|
||||
}
|
||||
mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
|
||||
}
|
||||
catch (Exception err) {
|
||||
} catch (Exception err) {
|
||||
throw Throwables.propagate(err);
|
||||
}
|
||||
return true;
|
||||
@@ -181,8 +174,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
ClusterInfo clusterInfo = null;
|
||||
try {
|
||||
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||
}
|
||||
catch (Exception err) {
|
||||
} catch (Exception err) {
|
||||
throw Throwables.propagate(err);
|
||||
}
|
||||
for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
|
||||
@@ -198,13 +190,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
ClusterInfo clusterInfo = null;
|
||||
try {
|
||||
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return true;
|
||||
@@ -214,36 +204,33 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
return new DefaultMQProducer(producerGroup, rpcHook);
|
||||
}
|
||||
|
||||
private TopicList getSystemTopicList() {
|
||||
private TopicList getSystemTopicList() {
|
||||
RPCHook rpcHook = null;
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
||||
if (isEnableAcl) {
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(),rMQConfigure.getSecretKey()));
|
||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
||||
}
|
||||
DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
|
||||
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
|
||||
producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
|
||||
producer.setNamesrvAddr(configure.getNamesrvAddr());
|
||||
|
||||
try {
|
||||
producer.start();
|
||||
return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
} finally {
|
||||
producer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
|
||||
DefaultMQProducer producer = null;
|
||||
if (rMQConfigure.isACLEnabled()) {
|
||||
if (configure.isACLEnabled()) {
|
||||
AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(
|
||||
rMQConfigure.getAccessKey(),
|
||||
rMQConfigure.getSecretKey()
|
||||
configure.getAccessKey(),
|
||||
configure.getSecretKey()
|
||||
));
|
||||
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
|
||||
} else {
|
||||
@@ -251,7 +238,7 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
}
|
||||
|
||||
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
|
||||
producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
|
||||
producer.setNamesrvAddr(configure.getNamesrvAddr());
|
||||
try {
|
||||
producer.start();
|
||||
Message msg = new Message(sendTopicMessageRequest.getTopic(),
|
||||
@@ -260,11 +247,9 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
||||
sendTopicMessageRequest.getMessageBody().getBytes()
|
||||
);
|
||||
return producer.send(msg);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
} finally {
|
||||
producer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ public class UserServiceImpl implements UserService, InitializingBean {
|
||||
}
|
||||
}
|
||||
|
||||
/*packaged*/ static class FileBasedUserInfoStore {
|
||||
public static class FileBasedUserInfoStore {
|
||||
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
private static final String FILE_NAME = "users.properties";
|
||||
|
||||
|
||||
Reference in New Issue
Block a user