mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-05-29 22:03:45 +08:00
Compare commits
2 Commits
eb51da6ca4
...
refactor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77de1a9946 | ||
|
|
de152dd6f3 |
@@ -51,19 +51,6 @@ public class ConsumerController {
|
|||||||
return consumerService.queryGroupList(skipSysGroup, address);
|
return consumerService.queryGroupList(skipSysGroup, address);
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = "/group.refresh")
|
|
||||||
@ResponseBody
|
|
||||||
public Object refresh(String address,
|
|
||||||
String consumerGroup) {
|
|
||||||
return consumerService.refreshGroup(address, consumerGroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
@RequestMapping(value = "group.refresh.all")
|
|
||||||
@ResponseBody
|
|
||||||
public Object refreshAll(String address) {
|
|
||||||
return consumerService.refreshAllGroup(address);
|
|
||||||
}
|
|
||||||
|
|
||||||
@RequestMapping(value = "/group.query")
|
@RequestMapping(value = "/group.query")
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public Object groupQuery(@RequestParam String consumerGroup, String address) {
|
public Object groupQuery(@RequestParam String consumerGroup, String address) {
|
||||||
|
|||||||
@@ -56,12 +56,6 @@ public class TopicController {
|
|||||||
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
|
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = "/refresh", method = {RequestMethod.POST})
|
|
||||||
@ResponseBody
|
|
||||||
public Object refresh() {
|
|
||||||
return topicService.refreshTopicList();
|
|
||||||
}
|
|
||||||
|
|
||||||
@RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
|
@RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public Object listTopicType() {
|
public Object listTopicType() {
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ package org.apache.rocketmq.dashboard.model;
|
|||||||
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
|
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
|
||||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
|
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
|
||||||
@@ -32,7 +31,6 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
|
|||||||
private int consumeTps;
|
private int consumeTps;
|
||||||
private long diffTotal = -1;
|
private long diffTotal = -1;
|
||||||
private String subGroupType = "NORMAL";
|
private String subGroupType = "NORMAL";
|
||||||
private Date updateTime;
|
|
||||||
|
|
||||||
|
|
||||||
public String getGroup() {
|
public String getGroup() {
|
||||||
@@ -114,12 +112,4 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
|
|||||||
public void setVersion(String version) {
|
public void setVersion(String version) {
|
||||||
this.version = version;
|
this.version = version;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Date getUpdateTime() {
|
|
||||||
return updateTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setUpdateTime(Date updateTime) {
|
|
||||||
this.updateTime = updateTime;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,72 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.rocketmq.dashboard.service;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
|
|
||||||
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class ClusterInfoService {
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private MQAdminExt mqAdminExt;
|
|
||||||
|
|
||||||
@Value("${rocketmq.cluster.cache.expire:60000}")
|
|
||||||
private long cacheExpireMs;
|
|
||||||
|
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
private final AtomicReference<ClusterInfo> cachedRef = new AtomicReference<>();
|
|
||||||
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
scheduler.scheduleAtFixedRate(this::refresh,
|
|
||||||
0, cacheExpireMs / 2, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ClusterInfo get() {
|
|
||||||
ClusterInfo info = cachedRef.get();
|
|
||||||
return info != null ? info : refresh();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized ClusterInfo refresh() {
|
|
||||||
try {
|
|
||||||
ClusterInfo fresh = mqAdminExt.examineBrokerClusterInfo();
|
|
||||||
cachedRef.set(fresh);
|
|
||||||
return fresh;
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("Refresh cluster info failed", e);
|
|
||||||
ClusterInfo old = cachedRef.get();
|
|
||||||
if (old != null) {
|
|
||||||
return old;
|
|
||||||
}
|
|
||||||
throw new IllegalStateException("No cluster info available", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -17,14 +17,14 @@
|
|||||||
|
|
||||||
package org.apache.rocketmq.dashboard.service;
|
package org.apache.rocketmq.dashboard.service;
|
||||||
|
|
||||||
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
|
|
||||||
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
|
|
||||||
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
|
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
|
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
|
||||||
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
||||||
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
||||||
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
|
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
|
||||||
|
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
|
||||||
|
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
|
||||||
|
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -55,8 +55,4 @@ public interface ConsumerService {
|
|||||||
ConsumerConnection getConsumerConnection(String consumerGroup, String address);
|
ConsumerConnection getConsumerConnection(String consumerGroup, String address);
|
||||||
|
|
||||||
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
|
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
|
||||||
|
|
||||||
Object refreshGroup(String address, String consumerGroup);
|
|
||||||
|
|
||||||
Object refreshAllGroup(String address);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,5 +54,4 @@ public interface TopicService {
|
|||||||
|
|
||||||
SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest);
|
SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest);
|
||||||
|
|
||||||
boolean refreshTopicList();
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,31 +17,29 @@
|
|||||||
package org.apache.rocketmq.dashboard.service.client;
|
package org.apache.rocketmq.dashboard.service.client;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
|
||||||
import java.io.UnsupportedEncodingException;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.client.QueryResult;
|
import org.apache.rocketmq.client.QueryResult;
|
||||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
import org.apache.rocketmq.client.impl.MQAdminImpl;
|
|
||||||
import org.apache.rocketmq.common.AclConfig;
|
import org.apache.rocketmq.common.AclConfig;
|
||||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||||
import org.apache.rocketmq.common.TopicConfig;
|
import org.apache.rocketmq.common.TopicConfig;
|
||||||
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
|
|
||||||
import org.apache.rocketmq.common.message.MessageClientIDSetter;
|
import org.apache.rocketmq.common.message.MessageClientIDSetter;
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
import org.apache.rocketmq.common.message.MessageQueue;
|
import org.apache.rocketmq.common.message.MessageQueue;
|
||||||
import org.apache.rocketmq.common.message.MessageRequestMode;
|
import org.apache.rocketmq.common.message.MessageRequestMode;
|
||||||
|
import org.apache.rocketmq.dashboard.util.JsonUtil;
|
||||||
|
import org.apache.rocketmq.remoting.RemotingClient;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
||||||
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
||||||
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
|
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
|
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
|
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
|
||||||
@@ -68,38 +66,29 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
|||||||
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
|
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
|
||||||
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
|
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
|
||||||
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
||||||
import org.apache.rocketmq.dashboard.util.JsonUtil;
|
|
||||||
import org.apache.rocketmq.remoting.RemotingClient;
|
|
||||||
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
||||||
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
|
|
||||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
|
||||||
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
|
|
||||||
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
||||||
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
||||||
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
|
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
|
||||||
import org.apache.rocketmq.tools.admin.api.MessageTrack;
|
import org.apache.rocketmq.tools.admin.api.MessageTrack;
|
||||||
import org.apache.rocketmq.tools.admin.common.AdminToolResult;
|
import org.apache.rocketmq.tools.admin.common.AdminToolResult;
|
||||||
import org.joor.Reflect;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
|
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class MQAdminExtImpl implements MQAdminExt {
|
public class MQAdminExtImpl implements MQAdminExt {
|
||||||
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
|
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
|
||||||
|
|
||||||
private static final ConcurrentMap<String, TopicConfigSerializeWrapper> TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>();
|
public MQAdminExtImpl() {
|
||||||
|
|
||||||
public MQAdminExtImpl() {}
|
|
||||||
|
|
||||||
public static void clearTopicConfigCache() {
|
|
||||||
TOPIC_CONFIG_CACHE.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateBrokerConfig(String brokerAddr, Properties properties)
|
public void updateBrokerConfig(String brokerAddr, Properties properties)
|
||||||
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
|
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
|
||||||
@@ -155,7 +144,7 @@ public class MQAdminExtImpl implements MQAdminExt {
|
|||||||
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
|
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
|
||||||
RemotingCommand response = null;
|
RemotingCommand response = null;
|
||||||
try {
|
try {
|
||||||
response = remotingClient.invokeSync(addr, request, 8000);
|
response = remotingClient.invokeSync(addr, request, 3000);
|
||||||
}
|
}
|
||||||
catch (Exception err) {
|
catch (Exception err) {
|
||||||
Throwables.throwIfUnchecked(err);
|
Throwables.throwIfUnchecked(err);
|
||||||
@@ -174,27 +163,19 @@ public class MQAdminExtImpl implements MQAdminExt {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException {
|
public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException {
|
||||||
TopicConfigSerializeWrapper cachedWrapper = TOPIC_CONFIG_CACHE.get(addr);
|
|
||||||
|
|
||||||
if (cachedWrapper != null && cachedWrapper.getTopicConfigTable().containsKey(topic)) {
|
|
||||||
return cachedWrapper.getTopicConfigTable().get(topic);
|
|
||||||
}
|
|
||||||
|
|
||||||
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
|
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
|
||||||
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
|
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
|
||||||
RemotingCommand response = null;
|
RemotingCommand response = null;
|
||||||
try {
|
try {
|
||||||
response = remotingClient.invokeSync(addr, request, 3000);
|
response = remotingClient.invokeSync(addr, request, 3000);
|
||||||
} catch (Exception err) {
|
}
|
||||||
|
catch (Exception err) {
|
||||||
Throwables.throwIfUnchecked(err);
|
Throwables.throwIfUnchecked(err);
|
||||||
throw new RuntimeException(err);
|
throw new RuntimeException(err);
|
||||||
}
|
}
|
||||||
switch (response.getCode()) {
|
switch (response.getCode()) {
|
||||||
case ResponseCode.SUCCESS: {
|
case ResponseCode.SUCCESS: {
|
||||||
TopicConfigSerializeWrapper topicConfigSerializeWrapper =
|
TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
|
||||||
decode(response.getBody(), TopicConfigSerializeWrapper.class);
|
|
||||||
|
|
||||||
TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper);
|
|
||||||
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
|
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@@ -480,23 +461,18 @@ public class MQAdminExtImpl implements MQAdminExt {
|
|||||||
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
|
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
|
||||||
try {
|
try {
|
||||||
return viewMessage(msgId);
|
return viewMessage(msgId);
|
||||||
} catch (Exception e) {
|
|
||||||
}
|
}
|
||||||
MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
|
catch (Exception e) {
|
||||||
|
}
|
||||||
|
|
||||||
Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
|
Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
|
||||||
if (clusterList == null || clusterList.isEmpty()) {
|
if (clusterList == null || clusterList.isEmpty()) {
|
||||||
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", topic, msgId, 32,
|
return MQAdminInstance.threadLocalMQAdminExt().queryMessage("", topic, msgId);
|
||||||
0L, Long.MAX_VALUE, true).get();
|
}
|
||||||
if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) {
|
for (String name : clusterList) {
|
||||||
return qr.getMessageList().get(0);
|
MessageExt messageExt = MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId);
|
||||||
}
|
if (messageExt != null) {
|
||||||
} else {
|
return messageExt;
|
||||||
for (String name : clusterList) {
|
|
||||||
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", name, topic, msgId, 32,
|
|
||||||
0L, Long.MAX_VALUE, true).get();
|
|
||||||
if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) {
|
|
||||||
return qr.getMessageList().get(0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@@ -23,18 +23,14 @@ import com.google.common.collect.Iterables;
|
|||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@@ -45,16 +41,11 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
import org.apache.rocketmq.common.MQVersion;
|
import org.apache.rocketmq.common.MQVersion;
|
||||||
import org.apache.rocketmq.common.MixAll;
|
import org.apache.rocketmq.common.MixAll;
|
||||||
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
|
|
||||||
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
|
|
||||||
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
|
|
||||||
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
|
|
||||||
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
|
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
|
||||||
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
|
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
|
||||||
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
|
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
|
||||||
@@ -74,13 +65,15 @@ import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
|
|||||||
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
|
||||||
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
|
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
|
||||||
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
|
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
|
||||||
|
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
|
||||||
|
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
|
||||||
|
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
|
||||||
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
||||||
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.DisposableBean;
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@@ -92,19 +85,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
@Resource
|
@Resource
|
||||||
private RMQConfigure configure;
|
private RMQConfigure configure;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ClusterInfoService clusterInfoService;
|
|
||||||
|
|
||||||
private volatile boolean isCacheBeingBuilt = false;
|
|
||||||
|
|
||||||
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
|
||||||
|
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
|
|
||||||
private final List<GroupConsumeInfo> cacheConsumeInfoList = Collections.synchronizedList(new ArrayList<>());
|
|
||||||
|
|
||||||
private final HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterPropertiesSet() {
|
public void afterPropertiesSet() {
|
||||||
Runtime runtime = Runtime.getRuntime();
|
Runtime runtime = Runtime.getRuntime();
|
||||||
@@ -120,7 +104,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
};
|
};
|
||||||
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
|
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
|
||||||
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
|
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>(5000), threadFactory, handler);
|
new LinkedBlockingQueue<>(5000), threadFactory, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -141,46 +125,11 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) {
|
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) {
|
||||||
if (isCacheBeingBuilt) {
|
HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
|
||||||
throw new RuntimeException("Cache is being built, please try again later");
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (this) {
|
|
||||||
if (cacheConsumeInfoList.isEmpty() && !isCacheBeingBuilt) {
|
|
||||||
isCacheBeingBuilt = true;
|
|
||||||
try {
|
|
||||||
makeGroupListCache();
|
|
||||||
} finally {
|
|
||||||
isCacheBeingBuilt = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cacheConsumeInfoList.isEmpty()) {
|
|
||||||
throw new RuntimeException("No consumer group information available");
|
|
||||||
}
|
|
||||||
|
|
||||||
List<GroupConsumeInfo> groupConsumeInfoList = new ArrayList<>(cacheConsumeInfoList);
|
|
||||||
|
|
||||||
if (!skipSysGroup) {
|
|
||||||
groupConsumeInfoList.stream().map(group -> {
|
|
||||||
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
|
||||||
group.setGroup(String.format("%s%s", "%SYS%", group.getGroup()));
|
|
||||||
}
|
|
||||||
return group;
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
Collections.sort(groupConsumeInfoList);
|
|
||||||
return groupConsumeInfoList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void makeGroupListCache() {
|
|
||||||
SubscriptionGroupWrapper subscriptionGroupWrapper = null;
|
|
||||||
try {
|
try {
|
||||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||||
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
|
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
|
||||||
subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
|
SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
|
||||||
for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
|
for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
|
||||||
if (!consumerGroupMap.containsKey(groupName)) {
|
if (!consumerGroupMap.containsKey(groupName)) {
|
||||||
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
|
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
|
||||||
@@ -194,28 +143,14 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
Throwables.throwIfUnchecked(err);
|
Throwables.throwIfUnchecked(err);
|
||||||
throw new RuntimeException(err);
|
throw new RuntimeException(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subscriptionGroupWrapper != null && subscriptionGroupWrapper.getSubscriptionGroupTable().isEmpty()) {
|
|
||||||
logger.warn("No subscription group information available");
|
|
||||||
isCacheBeingBuilt = false;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = subscriptionGroupWrapper.getSubscriptionGroupTable();
|
|
||||||
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
|
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
|
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
|
||||||
for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
|
for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
|
||||||
String consumerGroup = entry.getKey();
|
String consumerGroup = entry.getKey();
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
try {
|
try {
|
||||||
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, "");
|
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address);
|
||||||
consumeInfo.setAddress(entry.getValue());
|
consumeInfo.setAddress(entry.getValue());
|
||||||
if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
|
|
||||||
consumeInfo.setSubGroupType("SYSTEM");
|
|
||||||
} else {
|
|
||||||
consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly() ? "FIFO" : "NORMAL");
|
|
||||||
}
|
|
||||||
consumeInfo.setGroup(consumerGroup);
|
|
||||||
consumeInfo.setUpdateTime(new Date());
|
|
||||||
groupConsumeInfoList.add(consumeInfo);
|
groupConsumeInfoList.add(consumeInfo);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
|
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
|
||||||
@@ -225,17 +160,21 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
countDownLatch.await();
|
countDownLatch.await(30, TimeUnit.SECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
logger.error("query consumerGroup countDownLatch await Exception", e);
|
||||||
logger.error("Interruption occurred while waiting for task completion", e);
|
|
||||||
}
|
}
|
||||||
logger.info("All consumer group query tasks have been completed");
|
|
||||||
isCacheBeingBuilt = false;
|
|
||||||
Collections.sort(groupConsumeInfoList);
|
|
||||||
|
|
||||||
cacheConsumeInfoList.clear();
|
if (!skipSysGroup) {
|
||||||
cacheConsumeInfoList.addAll(groupConsumeInfoList);
|
groupConsumeInfoList.stream().map(group -> {
|
||||||
|
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
|
||||||
|
group.setGroup(String.format("%s%s", "%SYS%", group.getGroup()));
|
||||||
|
}
|
||||||
|
return group;
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
Collections.sort(groupConsumeInfoList);
|
||||||
|
return groupConsumeInfoList;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -245,14 +184,16 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
ConsumeStats consumeStats = null;
|
ConsumeStats consumeStats = null;
|
||||||
try {
|
try {
|
||||||
consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
|
consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
|
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
|
||||||
}
|
}
|
||||||
if (consumeStats != null) {
|
|
||||||
groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
|
|
||||||
groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
|
|
||||||
}
|
|
||||||
ConsumerConnection consumerConnection = null;
|
ConsumerConnection consumerConnection = null;
|
||||||
|
boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup)
|
||||||
|
.stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig)
|
||||||
|
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (StringUtils.isNotEmpty(address)) {
|
if (StringUtils.isNotEmpty(address)) {
|
||||||
consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
|
consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
|
||||||
@@ -262,15 +203,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
|
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
groupConsumeInfo.setGroup(consumerGroup);
|
||||||
|
if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
|
||||||
|
groupConsumeInfo.setSubGroupType("SYSTEM");
|
||||||
|
} else if (isFifoType) {
|
||||||
|
groupConsumeInfo.setSubGroupType("FIFO");
|
||||||
|
} else {
|
||||||
|
groupConsumeInfo.setSubGroupType("NORMAL");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (consumeStats != null) {
|
||||||
|
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
|
||||||
|
groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
|
||||||
|
}
|
||||||
|
|
||||||
if (consumerConnection != null) {
|
if (consumerConnection != null) {
|
||||||
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
|
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
|
||||||
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
|
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
|
||||||
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
|
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
|
||||||
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
|
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, "
|
logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, "
|
||||||
+ consumerGroup, e);
|
+ consumerGroup, e);
|
||||||
}
|
}
|
||||||
return groupConsumeInfo;
|
return groupConsumeInfo;
|
||||||
}
|
}
|
||||||
@@ -295,7 +252,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
ConsumeStats consumeStats = null;
|
ConsumeStats consumeStats = null;
|
||||||
try {
|
try {
|
||||||
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
|
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -337,7 +295,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
results.put(messageQueue, clinetId);
|
results.put(messageQueue, clinetId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception err) {
|
}
|
||||||
|
catch (Exception err) {
|
||||||
logger.error("op=getClientConnection_error", err);
|
logger.error("op=getClientConnection_error", err);
|
||||||
}
|
}
|
||||||
return results;
|
return results;
|
||||||
@@ -352,12 +311,14 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
List<TopicConsumerInfo> topicConsumerInfoList = null;
|
List<TopicConsumerInfo> topicConsumerInfoList = null;
|
||||||
try {
|
try {
|
||||||
topicConsumerInfoList = queryConsumeStatsList(topic, group);
|
topicConsumerInfoList = queryConsumeStatsList(topic, group);
|
||||||
} catch (Exception ignore) {
|
}
|
||||||
|
catch (Exception ignore) {
|
||||||
}
|
}
|
||||||
group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0));
|
group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0));
|
||||||
}
|
}
|
||||||
return group2ConsumerInfoMap;
|
return group2ConsumerInfoMap;
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -369,7 +330,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) {
|
for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) {
|
||||||
try {
|
try {
|
||||||
Map<MessageQueue, Long> rollbackStatsMap =
|
Map<MessageQueue, Long> rollbackStatsMap =
|
||||||
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
|
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
|
||||||
ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
|
ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
|
||||||
List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList();
|
List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList();
|
||||||
for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : rollbackStatsMap.entrySet()) {
|
for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : rollbackStatsMap.entrySet()) {
|
||||||
@@ -380,7 +341,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
rollbackStatsList.add(rollbackStats);
|
rollbackStatsList.add(rollbackStats);
|
||||||
}
|
}
|
||||||
groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat);
|
groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat);
|
||||||
} catch (MQClientException e) {
|
}
|
||||||
|
catch (MQClientException e) {
|
||||||
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
|
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
|
||||||
try {
|
try {
|
||||||
ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
|
ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
|
||||||
@@ -388,14 +350,17 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
|
consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
|
||||||
groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat);
|
groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat);
|
||||||
continue;
|
continue;
|
||||||
} catch (Exception err) {
|
}
|
||||||
|
catch (Exception err) {
|
||||||
logger.error("op=resetOffset_which_not_online_error", err);
|
logger.error("op=resetOffset_which_not_online_error", err);
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
|
else {
|
||||||
logger.error("op=resetOffset_error", e);
|
logger.error("op=resetOffset_error", e);
|
||||||
}
|
}
|
||||||
groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage()));
|
groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage()));
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
logger.error("op=resetOffset_error", e);
|
logger.error("op=resetOffset_error", e);
|
||||||
groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage()));
|
groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage()));
|
||||||
}
|
}
|
||||||
@@ -407,21 +372,17 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) {
|
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) {
|
||||||
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
|
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
|
||||||
try {
|
try {
|
||||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||||
for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
|
for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
|
||||||
String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
|
String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
|
||||||
SubscriptionGroupConfig subscriptionGroupConfig = null;
|
SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
|
||||||
try {
|
|
||||||
subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("op=examineSubscriptionGroupConfig_error brokerName={} group={}", brokerName, group);
|
|
||||||
}
|
|
||||||
if (subscriptionGroupConfig == null) {
|
if (subscriptionGroupConfig == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
|
consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -438,7 +399,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
deleteInNsFlag = true;
|
deleteInNsFlag = true;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||||
for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
|
for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
|
||||||
logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
|
logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
|
||||||
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true);
|
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true);
|
||||||
@@ -446,7 +407,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
|
deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
|
||||||
deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
|
deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -468,12 +430,13 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
@Override
|
@Override
|
||||||
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
|
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
|
||||||
try {
|
try {
|
||||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||||
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
|
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
|
||||||
consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) {
|
consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) {
|
||||||
mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig());
|
mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig());
|
||||||
}
|
}
|
||||||
} catch (Exception err) {
|
}
|
||||||
|
catch (Exception err) {
|
||||||
Throwables.throwIfUnchecked(err);
|
Throwables.throwIfUnchecked(err);
|
||||||
throw new RuntimeException(err);
|
throw new RuntimeException(err);
|
||||||
}
|
}
|
||||||
@@ -488,7 +451,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) {
|
for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) {
|
||||||
brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
|
brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -512,38 +476,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
|
|||||||
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) {
|
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) {
|
||||||
try {
|
try {
|
||||||
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
|
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public GroupConsumeInfo refreshGroup(String address, String consumerGroup) {
|
|
||||||
|
|
||||||
if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) {
|
|
||||||
throw new RuntimeException("Cache is being built or empty, please try again later");
|
|
||||||
}
|
|
||||||
synchronized (cacheConsumeInfoList) {
|
|
||||||
for (int i = 0; i < cacheConsumeInfoList.size(); i++) {
|
|
||||||
GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList.get(i);
|
|
||||||
if (groupConsumeInfo.getGroup().equals(consumerGroup)) {
|
|
||||||
GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, "");
|
|
||||||
updatedInfo.setUpdateTime(new Date());
|
|
||||||
updatedInfo.setGroup(consumerGroup);
|
|
||||||
updatedInfo.setAddress(consumerGroupMap.get(consumerGroup));
|
|
||||||
cacheConsumeInfoList.set(i, updatedInfo);
|
|
||||||
return updatedInfo;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new RuntimeException("No consumer group information available");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<GroupConsumeInfo> refreshAllGroup(String address) {
|
|
||||||
cacheConsumeInfoList.clear();
|
|
||||||
consumerGroupMap.clear();
|
|
||||||
return queryGroupList(false, address);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,35 +43,28 @@ import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
|
|||||||
import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
|
import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
|
||||||
import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta;
|
import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta;
|
||||||
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
|
||||||
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
|
|
||||||
import org.apache.rocketmq.dashboard.service.TopicService;
|
import org.apache.rocketmq.dashboard.service.TopicService;
|
||||||
import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
|
|
||||||
import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler;
|
|
||||||
import org.apache.rocketmq.remoting.RPCHook;
|
import org.apache.rocketmq.remoting.RPCHook;
|
||||||
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
|
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
|
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.GroupList;
|
import org.apache.rocketmq.remoting.protocol.body.GroupList;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.body.TopicList;
|
import org.apache.rocketmq.remoting.protocol.body.TopicList;
|
||||||
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
|
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
|
||||||
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
||||||
import org.apache.rocketmq.tools.command.CommandUtil;
|
import org.apache.rocketmq.tools.command.CommandUtil;
|
||||||
import org.joor.Reflect;
|
import org.joor.Reflect;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -84,14 +77,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
|||||||
|
|
||||||
private final Object producerLock = new Object();
|
private final Object producerLock = new Object();
|
||||||
|
|
||||||
private Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ClusterInfoService clusterInfoService;
|
|
||||||
|
|
||||||
private final ConcurrentMap<String, TopicRouteData> routeCache = new ConcurrentHashMap<>();
|
|
||||||
private final Object cacheLock = new Object();
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RMQConfigure configure;
|
private RMQConfigure configure;
|
||||||
|
|
||||||
@@ -124,63 +109,37 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TopicTypeList examineAllTopicType() {
|
public TopicTypeList examineAllTopicType() {
|
||||||
List<String> messageTypes = new ArrayList<>();
|
ArrayList<TopicTypeMeta> topicTypes = new ArrayList<>();
|
||||||
List<String> names = new ArrayList<>();
|
ArrayList<String> names = new ArrayList<>();
|
||||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
ArrayList<String> messageTypes = new ArrayList<>();
|
||||||
TopicList sysTopics = getSystemTopicList();
|
TopicList topicList = fetchAllTopicList(false, false);
|
||||||
clusterInfo.getBrokerAddrTable().values().forEach(brokerAddr -> {
|
checkTopicType(topicList, topicTypes);
|
||||||
try {
|
topicTypes.sort((t1, t2) -> t1.getTopicName().compareTo(t2.getTopicName()));
|
||||||
TopicConfigSerializeWrapper topicConfigSerializeWrapper = mqAdminExt.getAllTopicConfig(brokerAddr.getBrokerAddrs().get(0L), 10000L);
|
for (TopicTypeMeta topicTypeMeta : topicTypes) {
|
||||||
for (TopicConfig topicConfig : topicConfigSerializeWrapper.getTopicConfigTable().values()) {
|
names.add(topicTypeMeta.getTopicName());
|
||||||
TopicTypeMeta topicType = classifyTopicType(topicConfig.getTopicName(), topicConfigSerializeWrapper.getTopicConfigTable().get(topicConfig.getTopicName()).getAttributes(),sysTopics.getTopicList());
|
messageTypes.add(topicTypeMeta.getMessageType());
|
||||||
if (names.contains(topicType.getTopicName())) {
|
}
|
||||||
continue;
|
|
||||||
}
|
|
||||||
names.add(topicType.getTopicName());
|
|
||||||
messageTypes.add(topicType.getMessageType());
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("Failed to classify topic type for broker: " + brokerAddr, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
sysTopics.getTopicList().forEach(topicName -> {
|
|
||||||
String sysTopicName = String.format("%s%s", "%SYS%", topicName);
|
|
||||||
if (!names.contains(sysTopicName)) {
|
|
||||||
names.add(sysTopicName);
|
|
||||||
messageTypes.add("SYSTEM");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return new TopicTypeList(names, messageTypes);
|
return new TopicTypeList(names, messageTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TopicTypeMeta classifyTopicType(String topicName, Map<String,String> attributes, Set<String> sysTopics) {
|
private void checkTopicType(TopicList topicList, ArrayList<TopicTypeMeta> topicTypes) {
|
||||||
TopicTypeMeta topicType = new TopicTypeMeta();
|
for (String topicName : topicList.getTopicList()) {
|
||||||
topicType.setTopicName(topicName);
|
TopicTypeMeta topicType = new TopicTypeMeta();
|
||||||
|
topicType.setTopicName(topicName);
|
||||||
if (topicName.startsWith("%R")) {
|
if (topicName.startsWith("%R")) {
|
||||||
topicType.setMessageType("RETRY");
|
topicType.setMessageType("RETRY");
|
||||||
return topicType;
|
} else if (topicName.startsWith("%D")) {
|
||||||
} else if (topicName.startsWith("%D")) {
|
topicType.setMessageType("DELAY");
|
||||||
topicType.setMessageType("DLQ");
|
} else if (topicName.startsWith("%S")) {
|
||||||
return topicType;
|
topicType.setMessageType("SYSTEM");
|
||||||
} else if (sysTopics.contains(topicName) || topicName.startsWith("rmq_sys") || topicName.equals("DefaultHeartBeatSyncerTopic")) {
|
} else {
|
||||||
topicType.setMessageType("SYSTEM");
|
List<TopicConfigInfo> topicConfigInfos = examineTopicConfig(topicName);
|
||||||
topicType.setTopicName(String.format("%s%s", "%SYS%", topicName));
|
if (!CollectionUtils.isEmpty(topicConfigInfos)) {
|
||||||
return topicType;
|
topicType.setMessageType(topicConfigInfos.get(0).getMessageType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
topicTypes.add(topicType);
|
||||||
}
|
}
|
||||||
if (attributes == null || attributes.isEmpty()) {
|
|
||||||
topicType.setMessageType("UNSPECIFIED");
|
|
||||||
return topicType;
|
|
||||||
}
|
|
||||||
|
|
||||||
String messageType = attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
|
|
||||||
if (StringUtils.isBlank(messageType)) {
|
|
||||||
messageType = TopicMessageType.UNSPECIFIED.name();
|
|
||||||
}
|
|
||||||
topicType.setMessageType(messageType);
|
|
||||||
|
|
||||||
return topicType;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -195,24 +154,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TopicRouteData route(String topic) {
|
public TopicRouteData route(String topic) {
|
||||||
TopicRouteData cachedData = routeCache.get(topic);
|
try {
|
||||||
if (cachedData != null) {
|
return mqAdminExt.examineTopicRouteInfo(topic);
|
||||||
return cachedData;
|
} catch (Exception ex) {
|
||||||
}
|
Throwables.throwIfUnchecked(ex);
|
||||||
|
throw new RuntimeException(ex);
|
||||||
synchronized (cacheLock) {
|
|
||||||
cachedData = routeCache.get(topic);
|
|
||||||
if (cachedData != null) {
|
|
||||||
return cachedData;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
TopicRouteData freshData = mqAdminExt.examineTopicRouteInfo(topic);
|
|
||||||
routeCache.put(topic, freshData);
|
|
||||||
return freshData;
|
|
||||||
} catch (Exception ex) {
|
|
||||||
Throwables.throwIfUnchecked(ex);
|
|
||||||
throw new RuntimeException(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,7 +174,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
|
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
|
||||||
MQAdminExtImpl.clearTopicConfigCache();
|
|
||||||
TopicConfig topicConfig = new TopicConfig();
|
TopicConfig topicConfig = new TopicConfig();
|
||||||
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
|
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
|
||||||
String messageType = topicCreateOrUpdateRequest.getMessageType();
|
String messageType = topicCreateOrUpdateRequest.getMessageType();
|
||||||
@@ -248,15 +193,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public TopicConfig examineTopicConfig(String topic, String brokerName) {
|
public TopicConfig examineTopicConfig(String topic, String brokerName) {
|
||||||
|
ClusterInfo clusterInfo = null;
|
||||||
try {
|
try {
|
||||||
ClusterInfo clusterInfo = clusterInfoService.get();
|
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
|
||||||
|
return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic);
|
||||||
BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName);
|
|
||||||
if (brokerData == null) {
|
|
||||||
throw new RuntimeException("Broker not found: " + brokerName);
|
|
||||||
}
|
|
||||||
return mqAdminExt.examineTopicConfig(brokerData.selectBrokerAddr(), topic);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Throwables.throwIfUnchecked(e);
|
Throwables.throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
@@ -455,14 +397,6 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean refreshTopicList() {
|
|
||||||
routeCache.clear();
|
|
||||||
clusterInfoService.refresh();
|
|
||||||
MQAdminExtImpl.clearTopicConfigCache();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
|
private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
|
||||||
if (!traceEnabled) {
|
if (!traceEnabled) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -75,8 +75,9 @@ public class AutoCloseConsumerWrapper {
|
|||||||
|
|
||||||
|
|
||||||
protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) {
|
protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) {
|
||||||
return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook) {
|
return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook) {{
|
||||||
{ setUseTLS(useTLS); } };
|
setUseTLS(useTLS);
|
||||||
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startIdleCheckTask() {
|
private void startIdleCheckTask() {
|
||||||
|
|||||||
@@ -34,7 +34,6 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import org.apache.rocketmq.common.MixAll;
|
import org.apache.rocketmq.common.MixAll;
|
||||||
import org.apache.rocketmq.dashboard.service.ConsumerService;
|
|
||||||
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
|
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.KVTable;
|
import org.apache.rocketmq.remoting.protocol.body.KVTable;
|
||||||
import org.apache.rocketmq.remoting.protocol.body.TopicList;
|
import org.apache.rocketmq.remoting.protocol.body.TopicList;
|
||||||
@@ -60,9 +59,6 @@ public class DashboardCollectTask {
|
|||||||
@Resource
|
@Resource
|
||||||
private DashboardCollectService dashboardCollectService;
|
private DashboardCollectService dashboardCollectService;
|
||||||
|
|
||||||
@Resource
|
|
||||||
private ConsumerService consumerService;
|
|
||||||
|
|
||||||
private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class);
|
private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class);
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
@@ -93,13 +89,6 @@ public class DashboardCollectTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Scheduled(cron = "0 0 2 * * ?")
|
|
||||||
public void collectConsumer() {
|
|
||||||
consumerService.queryGroupList(false, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Scheduled(cron = "0 0/1 * * * ?")
|
@Scheduled(cron = "0 0/1 * * * ?")
|
||||||
public void collectBroker() {
|
public void collectBroker() {
|
||||||
if (!rmqConfigure.isEnableDashBoardCollect()) {
|
if (!rmqConfigure.isEnableDashBoardCollect()) {
|
||||||
|
|||||||
@@ -70,35 +70,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
|
|||||||
}
|
}
|
||||||
$scope.filterList($scope.paginationConf.currentPage)
|
$scope.filterList($scope.paginationConf.currentPage)
|
||||||
};
|
};
|
||||||
$scope.refreshConsumerGroup = function (groupName) {
|
$scope.refreshConsumerData = function () {
|
||||||
//Show loader
|
|
||||||
$('#loaderConsumer').removeClass("hide-myloader");
|
|
||||||
|
|
||||||
$http({
|
|
||||||
method: "GET",
|
|
||||||
url: "/consumer/group.refresh",
|
|
||||||
params: {
|
|
||||||
address: $scope.isRmqVersionV5() ? localStorage.getItem('proxyAddr') : null,
|
|
||||||
consumerGroup: groupName
|
|
||||||
}
|
|
||||||
}).success(function (resp) {
|
|
||||||
if (resp.status == 0) {
|
|
||||||
for (var i = 0; i < $scope.allConsumerGrouopList.length; i++) {
|
|
||||||
if ($scope.allConsumerGrouopList[i].group === groupName) {
|
|
||||||
$scope.allConsumerGrouopList[i] = resp.data;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$scope.showConsumerGroupList($scope.paginationConf.currentPage, $scope.allConsumerGrouopList.length);
|
|
||||||
//Hide loader
|
|
||||||
$('#loaderConsumer').addClass("hide-myloader");
|
|
||||||
} else {
|
|
||||||
Notification.error({message: resp.errMsg, delay: 2000});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
$scope.queryConsumerData = function () {
|
|
||||||
//Show loader
|
//Show loader
|
||||||
$('#loaderConsumer').removeClass("hide-myloader");
|
$('#loaderConsumer').removeClass("hide-myloader");
|
||||||
|
|
||||||
@@ -123,30 +95,6 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
$scope.refreshConsumerData = function () {
|
|
||||||
//Show loader
|
|
||||||
$('#loaderConsumer').removeClass("hide-myloader");
|
|
||||||
|
|
||||||
$http({
|
|
||||||
method: "GET",
|
|
||||||
url: "consumer/group.refresh.all",
|
|
||||||
params: {
|
|
||||||
skipSysGroup: false
|
|
||||||
}
|
|
||||||
}).success(function (resp) {
|
|
||||||
if (resp.status == 0) {
|
|
||||||
$scope.allConsumerGrouopList = resp.data;
|
|
||||||
console.log($scope.allConsumerGrouopList);
|
|
||||||
console.log(JSON.stringify(resp));
|
|
||||||
$scope.showConsumerGroupList($scope.paginationConf.currentPage, $scope.allConsumerGrouopList.length);
|
|
||||||
|
|
||||||
//Hide loader
|
|
||||||
$('#loaderConsumer').addClass("hide-myloader");
|
|
||||||
} else {
|
|
||||||
Notification.error({message: resp.errMsg, delay: 2000});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
$scope.monitor = function (consumerGroupName) {
|
$scope.monitor = function (consumerGroupName) {
|
||||||
$http({
|
$http({
|
||||||
method: "GET",
|
method: "GET",
|
||||||
@@ -172,12 +120,12 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
|
|||||||
$scope.intervalProcess = null;
|
$scope.intervalProcess = null;
|
||||||
}
|
}
|
||||||
if ($scope.intervalProcessSwitch) {
|
if ($scope.intervalProcessSwitch) {
|
||||||
$scope.intervalProcess = setInterval($scope.queryConsumerData, 10000);
|
$scope.intervalProcess = setInterval($scope.refreshConsumerData, 10000);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
$scope.queryConsumerData();
|
$scope.refreshConsumerData();
|
||||||
$scope.filterStr = "";
|
$scope.filterStr = "";
|
||||||
$scope.$watch('filterStr', function () {
|
$scope.$watch('filterStr', function () {
|
||||||
$scope.paginationConf.currentPage = 1;
|
$scope.paginationConf.currentPage = 1;
|
||||||
@@ -279,7 +227,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
|
|||||||
ngDialog.open({
|
ngDialog.open({
|
||||||
preCloseCallback: function (value) {
|
preCloseCallback: function (value) {
|
||||||
// Refresh topic list
|
// Refresh topic list
|
||||||
$scope.queryConsumerData();
|
$scope.refreshConsumerData();
|
||||||
},
|
},
|
||||||
template: $scope.rmqVersion ? 'consumerModifyDialogForV5' : 'consumerModifyDialog',
|
template: $scope.rmqVersion ? 'consumerModifyDialogForV5' : 'consumerModifyDialog',
|
||||||
controller: 'consumerModifyDialogController',
|
controller: 'consumerModifyDialogController',
|
||||||
@@ -363,7 +311,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
|
|||||||
ngDialog.open({
|
ngDialog.open({
|
||||||
preCloseCallback: function (value) {
|
preCloseCallback: function (value) {
|
||||||
// Refresh topic list
|
// Refresh topic list
|
||||||
$scope.queryConsumerData();
|
$scope.refreshConsumerData();
|
||||||
},
|
},
|
||||||
template: 'deleteConsumerDialog',
|
template: 'deleteConsumerDialog',
|
||||||
controller: 'deleteConsumerDialogController',
|
controller: 'deleteConsumerDialogController',
|
||||||
@@ -473,4 +421,4 @@ module.controller('consumerTopicViewDialogController', ['$scope', 'ngDialog', '$
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
}]
|
}]
|
||||||
);
|
);
|
||||||
@@ -135,5 +135,4 @@ var en = {
|
|||||||
"MESSAGE_TYPE_FIFO": "FIFO",
|
"MESSAGE_TYPE_FIFO": "FIFO",
|
||||||
"MESSAGE_TYPE_DELAY": "DELAY",
|
"MESSAGE_TYPE_DELAY": "DELAY",
|
||||||
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
|
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
|
||||||
"UPDATE_TIME": "Update Time",
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,5 +136,4 @@ var zh = {
|
|||||||
"MESSAGE_TYPE_FIFO": "顺序消息",
|
"MESSAGE_TYPE_FIFO": "顺序消息",
|
||||||
"MESSAGE_TYPE_DELAY": "定时/延时消息",
|
"MESSAGE_TYPE_DELAY": "定时/延时消息",
|
||||||
"MESSAGE_TYPE_TRANSACTION": "事务消息",
|
"MESSAGE_TYPE_TRANSACTION": "事务消息",
|
||||||
"UPDATE_TIME": "更新时间",
|
}
|
||||||
}
|
|
||||||
@@ -277,4 +277,4 @@ module.controller('messageDetailViewDialogController', ['$scope', 'ngDialog', '$
|
|||||||
$scope.messageTrackShowList = canShowList;
|
$scope.messageTrackShowList = canShowList;
|
||||||
});
|
});
|
||||||
}]
|
}]
|
||||||
);
|
);
|
||||||
@@ -59,7 +59,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
|
|||||||
$scope.userRole = $window.sessionStorage.getItem("userrole");
|
$scope.userRole = $window.sessionStorage.getItem("userrole");
|
||||||
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
|
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
|
||||||
|
|
||||||
$scope.getTopicList = function () {
|
$scope.refreshTopicList = function () {
|
||||||
$http({
|
$http({
|
||||||
method: "GET",
|
method: "GET",
|
||||||
url: "topic/list.queryTopicType"
|
url: "topic/list.queryTopicType"
|
||||||
@@ -77,34 +77,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
$scope.refreshTopicList = function () {
|
$scope.refreshTopicList();
|
||||||
$http({
|
|
||||||
method: "POST",
|
|
||||||
url: "topic/refresh"
|
|
||||||
}).success(function (resp) {
|
|
||||||
if (resp.status == 0 && resp.data == true) {
|
|
||||||
$http({
|
|
||||||
method: "GET",
|
|
||||||
url: "topic/list.queryTopicType"
|
|
||||||
}).success(function (resp1) {
|
|
||||||
if (resp1.status == 0) {
|
|
||||||
$scope.allTopicNameList = resp1.data.topicNameList;
|
|
||||||
$scope.allMessageTypeList = resp1.data.messageTypeList;
|
|
||||||
console.log($scope.allTopicNameList);
|
|
||||||
console.log(JSON.stringify(resp1));
|
|
||||||
$scope.showTopicList(1, $scope.allTopicNameList.length);
|
|
||||||
} else {
|
|
||||||
Notification.error({message: resp1.errMsg, delay: 5000});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
} else {
|
|
||||||
Notification.error({message: resp.errMsg, delay: 5000});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
$scope.getTopicList();
|
|
||||||
|
|
||||||
$scope.filterStr = "";
|
$scope.filterStr = "";
|
||||||
$scope.$watch('filterStr', function () {
|
$scope.$watch('filterStr', function () {
|
||||||
@@ -154,17 +127,17 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
|
|||||||
|
|
||||||
$scope.filterByType = function (str, type) {
|
$scope.filterByType = function (str, type) {
|
||||||
if ($scope.filterRetry) {
|
if ($scope.filterRetry) {
|
||||||
if (type.includes("RETRY")) {
|
if (str.startsWith("%R")) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ($scope.filterDLQ) {
|
if ($scope.filterDLQ) {
|
||||||
if (type.includes("DLQ")) {
|
if (str.startsWith("%D")) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ($scope.filterSystem) {
|
if ($scope.filterSystem) {
|
||||||
if (type.includes("SYSTEM")) {
|
if (str.startsWith("%S")) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -413,6 +386,10 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
|
|||||||
if (resp.status == 0) {
|
if (resp.status == 0) {
|
||||||
console.log(resp);
|
console.log(resp);
|
||||||
ngDialog.open({
|
ngDialog.open({
|
||||||
|
preCloseCallback: function (value) {
|
||||||
|
// Refresh topic list
|
||||||
|
$scope.refreshTopicList();
|
||||||
|
},
|
||||||
template: 'topicModifyDialog',
|
template: 'topicModifyDialog',
|
||||||
controller: 'topicModifyDialogController',
|
controller: 'topicModifyDialogController',
|
||||||
data: {
|
data: {
|
||||||
@@ -563,4 +540,4 @@ module.controller('routerViewDialogController', ['$scope', 'ngDialog', '$http',
|
|||||||
})
|
})
|
||||||
};
|
};
|
||||||
}]
|
}]
|
||||||
);
|
);
|
||||||
@@ -300,7 +300,4 @@
|
|||||||
background-color: rgba(0, 0, 0, 0);
|
background-color: rgba(0, 0, 0, 0);
|
||||||
cursor: text !important;
|
cursor: text !important;
|
||||||
width: 60%;
|
width: 60%;
|
||||||
}
|
}
|
||||||
.navbar .navbar-nav .dropdown-menu li {
|
|
||||||
margin: 0 !important;
|
|
||||||
}
|
|
||||||
@@ -27,9 +27,6 @@
|
|||||||
</div>
|
</div>
|
||||||
<div class="navbar-collapse collapse navbar-warning-collapse">
|
<div class="navbar-collapse collapse navbar-warning-collapse">
|
||||||
<ul class="nav navbar-nav">
|
<ul class="nav navbar-nav">
|
||||||
<li class="nav-divider disabled">
|
|
||||||
<span class="divider-bar"></span>
|
|
||||||
</li>
|
|
||||||
<li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li>
|
<li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li>
|
||||||
<li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
|
<li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
|
||||||
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
|
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
|
||||||
@@ -43,9 +40,6 @@
|
|||||||
<li ng-show="{{ show }}" ng-class="path =='acl' ? 'active':''"><a ng-href="#/acl">Acl</a></li>
|
<li ng-show="{{ show }}" ng-class="path =='acl' ? 'active':''"><a ng-href="#/acl">Acl</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
<ul class="nav navbar-nav navbar-right">
|
<ul class="nav navbar-nav navbar-right">
|
||||||
<li class="nav-divider disabled">
|
|
||||||
<span class="divider-bar"></span>
|
|
||||||
</li>
|
|
||||||
<li class="dropdown">
|
<li class="dropdown">
|
||||||
<a href="bootstrap-elements.html" data-target="#" class="dropdown-toggle" data-toggle="dropdown">{{'CHANGE_LANG' | translate}}
|
<a href="bootstrap-elements.html" data-target="#" class="dropdown-toggle" data-toggle="dropdown">{{'CHANGE_LANG' | translate}}
|
||||||
<b class="caret"></b></a>
|
<b class="caret"></b></a>
|
||||||
@@ -85,4 +79,4 @@
|
|||||||
$scope.IsVisible = $scope.IsVisible = true;
|
$scope.IsVisible = $scope.IsVisible = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
</script>
|
</script>
|
||||||
@@ -53,7 +53,6 @@
|
|||||||
<th class="text-center">{{ 'MODE' | translate}}</th>
|
<th class="text-center">{{ 'MODE' | translate}}</th>
|
||||||
<th class="text-center"><a ng-click="sortByKey('consumeTps')">TPS</a></th>
|
<th class="text-center"><a ng-click="sortByKey('consumeTps')">TPS</a></th>
|
||||||
<th class="text-center"><a ng-click="sortByKey('diffTotal')">{{ 'DELAY' | translate}}</a></th>
|
<th class="text-center"><a ng-click="sortByKey('diffTotal')">{{ 'DELAY' | translate}}</a></th>
|
||||||
<th class="text-center">{{ 'UPDATE_TIME' | translate}}</th>
|
|
||||||
<th class="text-center">{{ 'OPERATION' | translate}}</th>
|
<th class="text-center">{{ 'OPERATION' | translate}}</th>
|
||||||
</tr>
|
</tr>
|
||||||
<tr ng-repeat="consumerGroup in consumerGroupShowList"
|
<tr ng-repeat="consumerGroup in consumerGroupShowList"
|
||||||
@@ -66,7 +65,6 @@
|
|||||||
<td class="text-center">{{consumerGroup.messageModel}}</td>
|
<td class="text-center">{{consumerGroup.messageModel}}</td>
|
||||||
<td class="text-center">{{consumerGroup.consumeTps}}</td>
|
<td class="text-center">{{consumerGroup.consumeTps}}</td>
|
||||||
<td class="text-center">{{consumerGroup.diffTotal}}</td>
|
<td class="text-center">{{consumerGroup.diffTotal}}</td>
|
||||||
<td class="text-center">{{consumerGroup.updateTime}}</td>
|
|
||||||
<td class="text-left">
|
<td class="text-left">
|
||||||
<button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)"
|
<button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)"
|
||||||
class="btn btn-raised btn-sm btn-primary"
|
class="btn btn-raised btn-sm btn-primary"
|
||||||
@@ -87,9 +85,6 @@
|
|||||||
ng-show="{{!sysFlag && writeOperationEnabled}}"
|
ng-show="{{!sysFlag && writeOperationEnabled}}"
|
||||||
type="button">{{'DELETE' | translate}}
|
type="button">{{'DELETE' | translate}}
|
||||||
</button>
|
</button>
|
||||||
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-click="refreshConsumerGroup(consumerGroup.group)">
|
|
||||||
{{'REFRESH' | translate}}
|
|
||||||
</button>
|
|
||||||
|
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
@@ -573,4 +568,4 @@
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</script>
|
</script>
|
||||||
Reference in New Issue
Block a user