diff --git a/.asf.yaml b/.asf.yaml index 685f086..02bc9f1 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -9,3 +9,10 @@ github: squash: true merge: false rebase: false + +notifications: + commits: commits@rocketmq.apache.org + issues: commits@rocketmq.apache.org + pullrequests: commits@rocketmq.apache.org + jobs: commits@rocketmq.apache.org + discussions: dev@rocketmq.apache.org diff --git a/.gitignore b/.gitignore index 8f1890a..f5d6d52 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ .project .factorypath .settings/ +.vscode \ No newline at end of file diff --git a/docs/1_0_0/UserGuide_CN.md b/docs/1_0_0/UserGuide_CN.md index 0bf62a4..291b1c1 100755 --- a/docs/1_0_0/UserGuide_CN.md +++ b/docs/1_0_0/UserGuide_CN.md @@ -63,6 +63,18 @@ * 根据消息主题和消息Id进行消息的查询 * 消息详情可以展示这条消息的详细信息,查看消息对应到具体消费组的消费情况(如果异常,可以查看具体的异常信息)。可以向指定的消费组重发消息。 +## RocketMQ-V5.0 仪表盘 +* 版本切换 + * RocketMQ右上角可切换不同版本,用户可以自主选择 RocketMQ-5.x 或 RocketMQ-4.x 版本 +* 主题页面 + * 支持延迟/顺序/事务消息的筛选 + * 支持延迟/顺序/事物/普通等多种消息类型主题的新增与更新 +* 消费页面 + * 支持顺序消费类型订阅组的过滤 + * 提供顺序消费类型订阅组的新增与更新,如果需要开启顺序消费,FIFO类型的订阅组一定需要打开consumeOrderlyEnable选项 +* 代理页面(RocketMQ 5.0新增) + * 支持代理节点的新增与查询 + * 支持代理节点地址配置:在application.yml中可对proxyAddr和proxyAddrs属性进行预配置 ## HTTPS 方式访问Dashboard * HTTPS功能实际上是使用SpringBoot提供的配置功能即可完成,首先,需要有一个SSL KeyStore来存放服务端证书,可以使用本工程所提供的测试密钥库: diff --git a/docs/1_0_0/UserGuide_EN.md b/docs/1_0_0/UserGuide_EN.md index fd469d2..b896dbc 100644 --- a/docs/1_0_0/UserGuide_EN.md +++ b/docs/1_0_0/UserGuide_EN.md @@ -64,6 +64,18 @@ * look over this message's detail info.you can see the message's consume state(each group has one line),show the exception message if has exception. you can send this message to the group you selected +## RocketMQ-V5.0 dashboard +* Version switching + * RocketMQ can switch between different versions in the upper right corner, and users can freely choose between RocketMQ-5.X or RocketMQ-4.X versions +* Theme page + * Support filtering of delayed/sequential/transaction messages + * Support the addition and update of multiple message types such as delay, sequence, object, and ordinary themes +* Consumption page + * Support filtering of subscription groups for fifo consumption types + * Provide the addition and update of subscription groups for sequential consumption types. If fifo consumption needs to be enabled, FIFO type subscription groups must have the consumeOrderlyEnable option enabled +* Proxy page (Added in RocketMQ 5.0) + * Support for adding and querying proxy nodes + * Support proxy node address configuration: ProxyAddr and proxyAddrs properties can be pre configured in application.yml ## Access Dashboard with HTTPS * SpringBoot itself has provided the SSL configuration. You can use the project test Keystore:resources/rmqcngkeystore.jks. The store is generated with the following unix keytool commands: diff --git a/pom.xml b/pom.xml index 31d4968..2b61bfc 100644 --- a/pom.xml +++ b/pom.xml @@ -91,10 +91,10 @@ 2.4 1.2 3.2.2 - 4.9.3 + 5.1.0 2.19.1 1.9.6 - 1.18.12 + 1.18.22 ${basedir}/../.. apacherocketmq 2.6.0 diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java index b68f931..0c495be 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java +++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java @@ -21,7 +21,7 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.tools.admin.MQAdminExt; @Slf4j diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java index 991a2d8..5ce21ff 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java +++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java @@ -43,6 +43,8 @@ public class RMQConfigure { //use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + private volatile String proxyAddr; + private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"); @@ -62,6 +64,8 @@ public class RMQConfigure { private List namesrvAddrs = new ArrayList<>(); + private List proxyAddrs = new ArrayList<>(); + public String getAccessKey() { return accessKey; } @@ -86,6 +90,25 @@ public class RMQConfigure { return namesrvAddrs; } + public List getProxyAddrs() { + return this.proxyAddrs; + } + + public void setProxyAddrs(List proxyAddrs) { + this.proxyAddrs = proxyAddrs; + if (CollectionUtils.isNotEmpty(proxyAddrs)) { + this.setProxyAddr(proxyAddrs.get(0)); + } + } + + public String getProxyAddr() { + return proxyAddr; + } + + public void setProxyAddr(String proxyAddr) { + this.proxyAddr = proxyAddr; + } + public void setNamesrvAddrs(List namesrvAddrs) { this.namesrvAddrs = namesrvAddrs; if (CollectionUtils.isNotEmpty(namesrvAddrs)) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java index 68becd1..96fc056 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.controller; import com.google.common.base.Preconditions; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; @@ -47,14 +47,14 @@ public class ConsumerController { @RequestMapping(value = "/groupList.query") @ResponseBody - public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) { - return consumerService.queryGroupList(skipSysGroup); + public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) { + return consumerService.queryGroupList(skipSysGroup, address); } @RequestMapping(value = "/group.query") @ResponseBody - public Object groupQuery(@RequestParam String consumerGroup) { - return consumerService.queryGroup(consumerGroup); + public Object groupQuery(@RequestParam String consumerGroup, String address) { + return consumerService.queryGroup(consumerGroup, address); } @RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST}) @@ -99,14 +99,14 @@ public class ConsumerController { @RequestMapping(value = "/queryTopicByConsumer.query") @ResponseBody - public Object queryConsumerByTopic(@RequestParam String consumerGroup) { - return consumerService.queryConsumeStatsListByGroupName(consumerGroup); + public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) { + return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address); } @RequestMapping(value = "/consumerConnection.query") @ResponseBody - public Object consumerConnection(@RequestParam(required = false) String consumerGroup) { - ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup); + public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) { + ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address); consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet())); return consumerConnection; } diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java index e4dfcd9..9eb08f6 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/MessageController.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.controller; import com.google.common.collect.Maps; import org.apache.rocketmq.common.Pair; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.request.MessageQuery; diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java index 9c1d79d..389506e 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ProducerController.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.dashboard.controller; import javax.annotation.Resource; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.permisssion.Permission; import org.apache.rocketmq.dashboard.service.ProducerService; diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java new file mode 100644 index 0000000..27aa59d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.controller; + +import org.apache.rocketmq.dashboard.permisssion.Permission; +import org.apache.rocketmq.dashboard.service.ProxyService; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.Resource; + +@Controller +@RequestMapping("/proxy") +@Permission +public class ProxyController { + @Resource + private ProxyService proxyService; + @RequestMapping(value = "/homePage.query", method = RequestMethod.GET) + @ResponseBody + public Object homePage() { + return proxyService.getProxyHomePage(); + } + + @RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST) + @ResponseBody + public Object addProxyAddr(@RequestParam String newProxyAddr) { + proxyService.addProxyAddrList(newProxyAddr); + return true; + } + + @RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST) + @ResponseBody + public Object updateProxyAddr(@RequestParam String proxyAddr) { + proxyService.updateProxyAddrList(proxyAddr); + return true; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java index ebed69e..467c18e 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java @@ -56,6 +56,12 @@ public class TopicController { return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq); } + @RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET) + @ResponseBody + public Object listTopicType() { + return topicService.examineAllTopicType(); + } + @RequestMapping(value = "/stats.query", method = RequestMethod.GET) @ResponseBody public Object stats(@RequestParam String topic) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java index 9070542..a100f92 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/ConnectionInfo.java @@ -20,7 +20,7 @@ import com.google.common.collect.Sets; import java.util.Collection; import java.util.HashSet; import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.Connection; public class ConnectionInfo extends Connection { private String versionDesc; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java b/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java index f7e4a4e..a42037b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/ConsumerGroupRollBackStat.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.dashboard.model; -import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import com.google.common.collect.Lists; import java.util.List; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java index 44bf55f..b93978d 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.dashboard.model; import lombok.Data; -import org.apache.rocketmq.common.protocol.body.CMResult; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.CMResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; @Data public class DlqMessageResendResult { diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java index 6429ba7..db11c41 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java @@ -16,8 +16,10 @@ */ package org.apache.rocketmq.dashboard.model; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; + +import java.util.List; public class GroupConsumeInfo implements Comparable { private String group; @@ -25,8 +27,11 @@ public class GroupConsumeInfo implements Comparable { private int count; private ConsumeType consumeType; private MessageModel messageModel; + private List address; private int consumeTps; private long diffTotal = -1; + private String subGroupType = "NORMAL"; + public String getGroup() { return group; @@ -68,6 +73,22 @@ public class GroupConsumeInfo implements Comparable { this.diffTotal = diffTotal; } + public List getAddress() { + return address; + } + + public void setAddress(List address) { + this.address = address; + } + + public String getSubGroupType() { + return subGroupType; + } + + public void setSubGroupType(String subGroupType) { + this.subGroupType = subGroupType; + } + @Override public int compareTo(GroupConsumeInfo o) { if (this.count != o.count) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java index 38daddd..29dc542 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/QueueStatInfo.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.dashboard.model; -import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.BeanUtils; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java index acebafc..2a7e9c0 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/ConsumerConfigInfo.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.dashboard.model.request; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import java.util.List; diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java index 32572fe..6b9eb67 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java @@ -15,7 +15,6 @@ * limitations under the License. */ package org.apache.rocketmq.dashboard.model.request; - import com.google.common.base.Objects; import java.util.List; @@ -32,6 +31,7 @@ public class TopicConfigInfo { private int perm; private boolean order; + private String messageType; public List getClusterNameList() { return clusterNameList; } @@ -92,6 +92,18 @@ public class TopicConfigInfo { this.order = order; } + + public String getMessageType() { + return messageType; + } + + public void setMessageType(String messageType) { + this.messageType = messageType; + } + + + + @Override public boolean equals(Object o) { if (this == o) @@ -103,12 +115,13 @@ public class TopicConfigInfo { readQueueNums == that.readQueueNums && perm == that.perm && order == that.order && - Objects.equal(topicName, that.topicName); + Objects.equal(topicName, that.topicName) && + Objects.equal(messageType, that.messageType); } @Override public int hashCode() { - return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order); + return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order,messageType); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicTypeList.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicTypeList.java new file mode 100644 index 0000000..e7b8148 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicTypeList.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model.request; + +import java.util.List; + +public class TopicTypeList { + private List topicNameList; + private List messageTypeList; + + public List getTopicNameList() { + return topicNameList; + } + + public void setTopicNameList(List topicNameList) { + this.topicNameList = topicNameList; + } + + public List getMessageTypeList() { + return messageTypeList; + } + + public void setMessageTypeList(List messageTypeList) { + this.messageTypeList = messageTypeList; + } + + public TopicTypeList(List topicNameList, List messageTypeList) { + this.topicNameList = topicNameList; + this.messageTypeList = messageTypeList; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicTypeMeta.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicTypeMeta.java new file mode 100644 index 0000000..a1fe935 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicTypeMeta.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model.request; + +public class TopicTypeMeta { + private String topicName; + private String messageType; + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public String getMessageType() { + return messageType; + } + + public void setMessageType(String messageType) { + this.messageType = messageType; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java b/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java index 1f85796..a546fbf 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/AbstractCommonService.java @@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.service; import org.apache.rocketmq.tools.admin.MQAdminExt; import com.google.common.base.Throwables; import com.google.common.collect.Sets; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; @@ -28,7 +28,7 @@ import org.apache.commons.collections.CollectionUtils; public abstract class AbstractCommonService { @Resource protected MQAdminExt mqAdminExt; - protected final Set changeToBrokerNameSet(HashMap> clusterAddrTable, + protected final Set changeToBrokerNameSet(Map> clusterAddrTable, List clusterNameList, List brokerNameList) { Set finalBrokerNameList = Sets.newHashSet(); if (CollectionUtils.isNotEmpty(clusterNameList)) { @@ -38,7 +38,8 @@ public abstract class AbstractCommonService { } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } if (CollectionUtils.isNotEmpty(brokerNameList)) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java index 6f4965c..e284c44 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.dashboard.service; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.TopicConsumerInfo; @@ -31,12 +31,12 @@ import java.util.Map; import java.util.Set; public interface ConsumerService { - List queryGroupList(boolean skipSysGroup); + List queryGroupList(boolean skipSysGroup,String address); - GroupConsumeInfo queryGroup(String consumerGroup); + GroupConsumeInfo queryGroup(String consumerGroup, String address); - List queryConsumeStatsListByGroupName(String groupName); + List queryConsumeStatsListByGroupName(String groupName, String address); List queryConsumeStatsList(String topic, String groupName); @@ -52,7 +52,7 @@ public interface ConsumerService { Set fetchBrokerNameSetBySubscriptionGroup(String group); - ConsumerConnection getConsumerConnection(String consumerGroup); + ConsumerConnection getConsumerConnection(String consumerGroup, String address); ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java b/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java index 802ca45..36fb5cd 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/MessageService.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.tools.admin.api.MessageTrack; diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java index cd9f582..ac0e731 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProducerService.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.dashboard.service; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; public interface ProducerService { ProducerConnection getProducerConnection(String producerGroup, String topic); diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java new file mode 100644 index 0000000..2a64680 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service; + +import java.util.Map; + +public interface ProxyService { + + void addProxyAddrList(String proxyAddr); + + void updateProxyAddrList(String proxyAddr); + + Map getProxyHomePage(); +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java index 57f0dea..9ff0bf0 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java @@ -19,10 +19,11 @@ package org.apache.rocketmq.dashboard.service; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.dashboard.model.request.TopicTypeList; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; @@ -31,6 +32,8 @@ import java.util.List; public interface TopicService { TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq); + TopicTypeList examineAllTopicType(); + TopicStatsTable stats(String topic); TopicRouteData route(String topic); diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 6788522..0281c5c 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -29,31 +29,41 @@ import org.apache.rocketmq.client.impl.MQAdminImpl; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; -import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; -import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.message.MessageRequestMode; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; +import org.apache.rocketmq.remoting.protocol.body.QueryConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.dashboard.util.JsonUtil; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -63,7 +73,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.apache.rocketmq.tools.admin.common.AdminToolResult; import org.joor.Reflect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +93,7 @@ public class MQAdminExtImpl implements MQAdminExt { @Override public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - UnsupportedEncodingException, InterruptedException, MQBrokerException { + UnsupportedEncodingException, InterruptedException, MQBrokerException, MQClientException { MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); } @@ -128,7 +140,7 @@ public class MQAdminExtImpl implements MQAdminExt { } @Override - public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws MQBrokerException { RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = null; @@ -136,7 +148,8 @@ public class MQAdminExtImpl implements MQAdminExt { response = remotingClient.invokeSync(addr, request, 3000); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } assert response != null; switch (response.getCode()) { @@ -145,12 +158,12 @@ public class MQAdminExtImpl implements MQAdminExt { return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); } default: - throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + throw new MQBrokerException(response.getCode(), response.getRemark()); } } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { + public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException { RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand response = null; @@ -158,7 +171,8 @@ public class MQAdminExtImpl implements MQAdminExt { response = remotingClient.invokeSync(addr, request, 3000); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -166,7 +180,7 @@ public class MQAdminExtImpl implements MQAdminExt { return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); } default: - throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + throw new MQBrokerException(response.getCode(), response.getRemark()); } } @@ -376,14 +390,14 @@ public class MQAdminExtImpl implements MQAdminExt { } @Override - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); + public void createTopic(String key, String newTopic, int queueNum, Map attributes) throws MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, attributes); } @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map attributes) throws MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag, attributes); } @Override @@ -572,4 +586,256 @@ public class MQAdminExtImpl implements MQAdminExt { String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return false; } + + @Override + public void addBrokerToContainer(String brokerContainerAddr, String brokerConfig) throws InterruptedException, + MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'addBrokerToContainer'"); + } + + @Override + public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName, + long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'removeBrokerFromContainer'"); + } + + @Override + public void updateGlobalWhiteAddrConfig(String addr, String globalWhiteAddrs, String aclFileFullPath) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'updateGlobalWhiteAddrConfig'"); + } + + @Override + public TopicStatsTable examineTopicStats(String brokerAddr, String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineTopicStats'"); + } + + @Override + public AdminToolResult examineTopicStatsConcurrent(String topic) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineTopicStatsConcurrent'"); + } + + @Override + public ConsumeStats examineConsumeStats(String brokerAddr, String consumerGroup, String topicName, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + // TODO Auto-generated method stub + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis); + } + + @Override + public AdminToolResult examineConsumeStatsConcurrent(String consumerGroup, String topic) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStatsConcurrent'"); + } + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) + throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr); + } + + @Override + public ProducerTableInfo getAllProducerInfo(String brokerAddr) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getAllProducerInfo'"); + } + + @Override + public void deleteTopic(String topicName, String clusterName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteTopic'"); + } + + @Override + public AdminToolResult deleteTopicInBrokerConcurrent(Set addrs, String topic) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInBrokerConcurrent'"); + } + + @Override + public void deleteTopicInNameServer(Set addrs, String clusterName, String topic) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInNameServer'"); + } + + @Override + public AdminToolResult resetOffsetNewConcurrent(String group, String topic, long timestamp) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'resetOffsetNewConcurrent'"); + } + + @Override + public TopicList queryTopicsByConsumer(String group) + throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumer'"); + } + + @Override + public AdminToolResult queryTopicsByConsumerConcurrent(String group) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumerConcurrent'"); + } + + @Override + public SubscriptionData querySubscription(String group, String topic) + throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'querySubscription'"); + } + + @Override + public AdminToolResult> queryConsumeTimeSpanConcurrent(String topic, String group) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryConsumeTimeSpanConcurrent'"); + } + + @Override + public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLog'"); + } + + @Override + public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLogByAddr'"); + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack, + boolean metrics) throws RemotingException, MQClientException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getConsumerRunningInfo'"); + } + + @Override + public List messageTrackDetailConcurrent(MessageExt msg) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'messageTrackDetailConcurrent'"); + } + + @Override + public void setMessageRequestMode(String brokerAddr, String topic, String consumerGroup, MessageRequestMode mode, + int popWorkGroupSize, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'setMessageRequestMode'"); + } + + @Override + public long searchOffset(String brokerAddr, String topicName, int queueId, long timestamp, long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'searchOffset'"); + } + + @Override + public void resetOffsetByQueueId(String brokerAddr, String consumerGroup, String topicName, int queueId, + long resetOffset) throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'resetOffsetByQueueId'"); + } + + @Override + public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, + TopicQueueMappingDetail mappingDetail, boolean force) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'createStaticTopic'"); + } + + @Override + public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName, + Boolean readable) throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'updateAndGetGroupReadForbidden'"); + } + + @Override + public MessageExt queryMessage(String clusterName, String topic, String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'queryMessage'"); + } + + @Override + public HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getBrokerHAStatus'"); + } + + @Override + public BrokerReplicasInfo getInSyncStateData(String controllerAddress, List brokers) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getInSyncStateData'"); + } + + @Override + public EpochEntryCache getBrokerEpochCache(String brokerAddr) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getBrokerEpochCache'"); + } + + @Override + public GetMetaDataResponseHeader getControllerMetaData(String controllerAddr) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getControllerMetaData'"); + } + + @Override + public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, + MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'resetMasterFlushOffset'"); + } + + @Override + public Map getControllerConfig(List controllerServers) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQClientException, UnsupportedEncodingException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'getControllerConfig'"); + } + + @Override + public void updateControllerConfig(Properties properties, List controllers) + throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'updateControllerConfig'"); + } + + @Override + public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName, + String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'electMaster'"); + } + + @Override + public void cleanControllerBrokerData(String controllerAddr, String clusterName, String brokerName, + String brokerAddr, boolean isCleanLivingBroker) + throws RemotingException, InterruptedException, MQBrokerException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'cleanControllerBrokerData'"); + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java new file mode 100644 index 0000000..4344c7c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.client; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; + +public interface ProxyAdmin { + + ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException; +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java new file mode 100644 index 0000000..eadae12 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.client; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST; + +@Slf4j +@Service +public class ProxyAdminImpl implements ProxyAdmin { + @Autowired + private GenericObjectPool mqAdminExtPool; + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + try { + MQAdminInstance.createMQAdmin(mqAdminExtPool); + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader); + RemotingCommand response = remotingClient.invokeSync(addr, request, 3000); + switch (response.getCode()) { + case 0: + return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); + default: + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); + } + } finally { + MQAdminInstance.returnMQAdmin(mqAdminExtPool); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java index 1e7e294..0c0177f 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/AclServiceImpl.java @@ -36,8 +36,8 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.AclService; @@ -68,7 +68,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } catch (Exception e) { log.error("getAclConfig error.", e); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } AclConfig aclConfig = new AclConfig(); aclConfig.setGlobalWhiteAddrs(Collections.emptyList()); @@ -100,7 +101,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -116,7 +118,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -142,7 +145,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -174,7 +178,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -206,7 +211,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -249,7 +255,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService } } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -261,7 +268,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -281,7 +289,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -297,7 +306,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -311,7 +321,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ",")); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java index c57f7e1..12e7f71 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java @@ -17,9 +17,10 @@ package org.apache.rocketmq.dashboard.service.impl; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.dashboard.service.ClusterService; import org.apache.rocketmq.dashboard.util.JsonUtil; @@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.Arrays; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; @Service public class ClusterServiceImpl implements ClusterService { @@ -56,10 +59,14 @@ public class ClusterServiceImpl implements ClusterService { } resultMap.put("clusterInfo", clusterInfo); resultMap.put("brokerServer", brokerServer); + // add messageType + resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted() + .collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue())))); return resultMap; } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -70,7 +77,8 @@ public class ClusterServiceImpl implements ClusterService { return mqAdminExt.getBrokerConfig(brokerAddr); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index 3ad85d4..9bc37ab 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -23,12 +23,22 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.annotation.Resource; import org.apache.commons.collections.CollectionUtils; @@ -36,18 +46,20 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; @@ -60,19 +72,46 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.ConsumerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; -import static com.google.common.base.Throwables.propagate; - @Service -public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { +public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); + @Resource + protected ProxyAdmin proxyAdmin; @Resource private RMQConfigure configure; private static final Set SYSTEM_GROUP_SET = new HashSet<>(); + private ExecutorService executorService; + + @Override + public void afterPropertiesSet() { + Runtime runtime = Runtime.getRuntime(); + int corePoolSize = Math.max(10, runtime.availableProcessors() * 2); + int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2); + ThreadFactory threadFactory = new ThreadFactory() { + private final AtomicLong threadIndex = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet()); + } + }; + RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); + this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(5000), threadFactory, handler); + } + + @Override + public void destroy() { + ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS); + } + static { SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP); SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP); @@ -85,22 +124,47 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public List queryGroupList(boolean skipSysGroup) { - Set consumerGroupSet = Sets.newHashSet(); + public List queryGroupList(boolean skipSysGroup, String address) { + HashMap> consumerGroupMap = Maps.newHashMap(); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); - consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); + for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) { + if (!consumerGroupMap.containsKey(groupName)) { + consumerGroupMap.putIfAbsent(groupName, new ArrayList<>()); + } + List addresses = consumerGroupMap.get(groupName); + addresses.add(brokerData.selectBrokerAddr()); + consumerGroupMap.put(groupName, addresses); + } } + } catch (Exception err) { + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } - catch (Exception err) { - throw Throwables.propagate(err); + List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); + CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size()); + for (Map.Entry> entry : consumerGroupMap.entrySet()) { + String consumerGroup = entry.getKey(); + executorService.submit(() -> { + try { + GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address); + consumeInfo.setAddress(entry.getValue()); + groupConsumeInfoList.add(consumeInfo); + } catch (Exception e) { + logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); + } finally { + countDownLatch.countDown(); + } + }); } - List groupConsumeInfoList = Lists.newArrayList(); - for (String consumerGroup : consumerGroupSet) { - groupConsumeInfoList.add(queryGroup(consumerGroup)); + try { + countDownLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("query consumerGroup countDownLatch await Exception", e); } + if (!skipSysGroup) { groupConsumeInfoList.stream().map(group -> { if (SYSTEM_GROUP_SET.contains(group.getGroup())) { @@ -114,7 +178,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public GroupConsumeInfo queryGroup(String consumerGroup) { + public GroupConsumeInfo queryGroup(String consumerGroup, String address) { GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); try { ConsumeStats consumeStats = null; @@ -126,14 +190,28 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } ConsumerConnection consumerConnection = null; + boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup) + .stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig) + .allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly); + try { - consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { + if (StringUtils.isNotEmpty(address)) { + consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup); + } else { + consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + } + } catch (Exception e) { logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); } groupConsumeInfo.setGroup(consumerGroup); + if (SYSTEM_GROUP_SET.contains(consumerGroup)) { + groupConsumeInfo.setSubGroupType("SYSTEM"); + } else if (isFifoType) { + groupConsumeInfo.setSubGroupType("FIFO"); + } else { + groupConsumeInfo.setSubGroupType("NORMAL"); + } if (consumeStats != null) { groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); @@ -155,8 +233,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public List queryConsumeStatsListByGroupName(String groupName) { - return queryConsumeStatsList(null, groupName); + public List queryConsumeStatsListByGroupName(String groupName, String address) { + ConsumeStats consumeStats; + String topic = null; + try { + String[] addresses = address.split(","); + String addr = addresses[0]; + consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + return toTopicConsumerInfoList(topic, consumeStats, groupName); } @Override @@ -166,8 +254,13 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } + return toTopicConsumerInfoList(topic, consumeStats, groupName); + } + + private List toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) { List mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate() { @Override public boolean apply(MessageQueue o) { @@ -226,7 +319,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return group2ConsumerInfoMap; } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -289,7 +383,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return consumerConfigInfoList; } @@ -314,7 +409,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception e) { - throw propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return true; } @@ -341,7 +437,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } return true; } @@ -356,19 +453,22 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return brokerNameSet; } @Override - public ConsumerConnection getConsumerConnection(String consumerGroup) { + public ConsumerConnection getConsumerConnection(String consumerGroup, String address) { try { - return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { - throw Throwables.propagate(e); + String[] addresses = address.split(","); + String addr = addresses[0]; + return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -378,7 +478,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java index fa8f073..75cebd4 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DashboardCollectServiceImpl.java @@ -107,7 +107,8 @@ public class DashboardCollectServiceImpl implements DashboardCollectService { strings = Files.readLines(file, Charsets.UTF_8); } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } StringBuffer sb = new StringBuffer(); for (String string : strings) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java index 006f1c2..4d3c3f7 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java @@ -25,8 +25,8 @@ import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.dashboard.model.DlqMessageResendResult; import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.MessagePage; @@ -62,10 +62,12 @@ public class DlqMessageServiceImpl implements DlqMessageService { && e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) { return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId()); } else { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return messageService.queryMessageByPage(query); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java index d3b6479..18fd810 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java @@ -117,7 +117,8 @@ public class MessageServiceImpl implements MessageService { if (err instanceof MQClientException) { throw new ServiceException(-1, ((MQClientException) err).getErrorMessage()); } - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -187,7 +188,8 @@ public class MessageServiceImpl implements MessageService { }); return messageViewList; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { consumer.shutdown(); } @@ -211,7 +213,8 @@ public class MessageServiceImpl implements MessageService { try { return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -232,7 +235,8 @@ public class MessageServiceImpl implements MessageService { throw new IllegalStateException("CONSUMER NOT SUBSCRIPT THIS TAG"); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } throw new IllegalStateException("NO CONSUMER"); @@ -397,7 +401,8 @@ public class MessageServiceImpl implements MessageService { PageImpl page = new PageImpl<>(messageViews, query.page(), total); return new MessagePageTask(page, queueOffsetInfos); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { consumer.shutdown(); } @@ -464,7 +469,8 @@ public class MessageServiceImpl implements MessageService { } return new PageImpl<>(messageViews, query.page(), total); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { consumer.shutdown(); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java index ea4dd58..d0e44c3 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MonitorServiceImpl.java @@ -82,7 +82,8 @@ public class MonitorServiceImpl implements MonitorService { MixAll.string2File(dataStr, path); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java index 3ce408b..5f5e491 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProducerServiceImpl.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service.impl; import com.google.common.base.Throwables; import javax.annotation.Resource; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.protocol.body.ProducerConnection; import org.apache.rocketmq.dashboard.service.ProducerService; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.springframework.stereotype.Service; @@ -35,7 +35,8 @@ public class ProducerServiceImpl implements ProducerService { return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java new file mode 100644 index 0000000..07e63b3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.impl; + +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.dashboard.service.ProxyService; +import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +@Slf4j +@Service +public class ProxyServiceImpl implements ProxyService { + @Resource + protected ProxyAdmin proxyAdmin; + @Resource + private RMQConfigure configure; + + @Override + public void addProxyAddrList(String proxyAddr) { + List proxyAddrs = configure.getProxyAddrs(); + if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) { + proxyAddrs.add(proxyAddr); + } + configure.setProxyAddrs(proxyAddrs); + } + + @Override + public void updateProxyAddrList(String proxyAddr) { + configure.setProxyAddr(proxyAddr); + } + + @Override + public Map getProxyHomePage() { + Map homePageInfoMap = Maps.newHashMap(); + homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr()); + homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs()); + return homePageInfoMap; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index 9dfde72..4f34fc6 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -18,43 +18,57 @@ package org.apache.rocketmq.dashboard.service.impl; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; +import org.apache.rocketmq.dashboard.model.request.TopicTypeList; +import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.TopicService; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.tools.command.CommandUtil; import org.joor.Reflect; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE; @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { @@ -68,23 +82,59 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ TopicList allTopics = mqAdminExt.fetchAllTopicList(); TopicList sysTopics = getSystemTopicList(); Set topics = - allTopics.getTopicList().stream().map(topic -> { - if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { - topic = String.format("%s%s", "%SYS%", topic); - } - return topic; - }).filter(topic -> { - if (skipRetryAndDlq) { - return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) - || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); - } - return true; - }).collect(Collectors.toSet()); + allTopics.getTopicList().stream().map(topic -> { + if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { + topic = String.format("%s%s", "%SYS%", topic); + } + return topic; + }).filter(topic -> { + if (skipRetryAndDlq) { + return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) + || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); + } + return true; + }).collect(Collectors.toSet()); allTopics.getTopicList().clear(); allTopics.getTopicList().addAll(topics); return allTopics; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + @Override + public TopicTypeList examineAllTopicType() { + ArrayList topicTypes = new ArrayList<>(); + ArrayList names = new ArrayList<>(); + ArrayList messageTypes = new ArrayList<>(); + TopicList topicList = fetchAllTopicList(false, false); + checkTopicType(topicList, topicTypes); + topicTypes.sort((t1, t2) -> t1.getTopicName().compareTo(t2.getTopicName())); + for (TopicTypeMeta topicTypeMeta : topicTypes) { + names.add(topicTypeMeta.getTopicName()); + messageTypes.add(topicTypeMeta.getMessageType()); + } + return new TopicTypeList(names, messageTypes); + } + + private void checkTopicType(TopicList topicList, ArrayList topicTypes) { + for (String topicName : topicList.getTopicList()) { + TopicTypeMeta topicType = new TopicTypeMeta(); + topicType.setTopicName(topicName); + if (topicName.startsWith("%R")) { + topicType.setMessageType("RETRY"); + } else if (topicName.startsWith("%D")) { + topicType.setMessageType("DELAY"); + } else if (topicName.startsWith("%S")) { + topicType.setMessageType("SYSTEM"); + } else { + List topicConfigInfos = examineTopicConfig(topicName); + if (!CollectionUtils.isEmpty(topicConfigInfos)) { + topicType.setMessageType(topicConfigInfos.get(0).getMessageType()); + } + } + topicTypes.add(topicType); } } @@ -93,7 +143,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { return mqAdminExt.examineTopicStats(topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -102,7 +153,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { return mqAdminExt.examineTopicRouteInfo(topic); } catch (Exception ex) { - throw Throwables.propagate(ex); + Throwables.throwIfUnchecked(ex); + throw new RuntimeException(ex); } } @@ -111,7 +163,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { return mqAdminExt.queryTopicConsumeByWho(topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -119,14 +172,20 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { TopicConfig topicConfig = new TopicConfig(); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); + String messageType = topicCreateOrUpdateRequest.getMessageType(); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.NORMAL.name(); + } + topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()), messageType)); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), - topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { + topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -137,7 +196,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ clusterInfo = mqAdminExt.examineBrokerClusterInfo(); return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -150,6 +210,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); BeanUtils.copyProperties(topicConfig, topicConfigInfo); topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); + String messageType = topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.UNSPECIFIED.name(); + } + topicConfigInfo.setMessageType(messageType); topicConfigInfoList.add(topicConfigInfo); } return topicConfigInfoList; @@ -170,7 +235,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } return true; } @@ -181,7 +247,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { clusterInfo = mqAdminExt.examineBrokerClusterInfo(); } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { deleteTopic(topic, clusterName); @@ -197,11 +264,13 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ try { clusterInfo = mqAdminExt.examineBrokerClusterInfo(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return true; } @@ -216,6 +285,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ return defaultMQProducer; } + public TransactionMQProducer buildTransactionMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) { + TransactionMQProducer defaultMQProducer = new TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC); + defaultMQProducer.setUseTLS(configure.isUseTLS()); + return defaultMQProducer; + } + private TopicList getSystemTopicList() { RPCHook rpcHook = null; boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); @@ -230,7 +305,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ producer.start(); return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { producer.shutdown(); } @@ -238,31 +314,61 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { - DefaultMQProducer producer = null; + List topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic()); + String messageType = topicConfigInfos.get(0).getMessageType(); AclClientRPCHook rpcHook = null; if (configure.isACLEnabled()) { rpcHook = new AclClientRPCHook(new SessionCredentials( - configure.getAccessKey(), - configure.getSecretKey() + configure.getAccessKey(), + configure.getSecretKey() )); } - producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); - producer.setInstanceName(String.valueOf(System.currentTimeMillis())); - producer.setNamesrvAddr(configure.getNamesrvAddr()); - try { - producer.start(); - Message msg = new Message(sendTopicMessageRequest.getTopic(), - sendTopicMessageRequest.getTag(), - sendTopicMessageRequest.getKey(), - sendTopicMessageRequest.getMessageBody().getBytes() - ); - return producer.send(msg); - } catch (Exception e) { - throw Throwables.propagate(e); - } finally { - waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); - producer.shutdown(); + if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) { + // transaction message + TransactionListener transactionListener = new TransactionListenerImpl(); + + TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(configure.getNamesrvAddr()); + producer.setTransactionListener(transactionListener); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.sendMessageInTransaction(msg, null); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); + producer.shutdown(); + } + } else { + // no transaction message + DefaultMQProducer producer = null; + producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(configure.getNamesrvAddr()); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.send(msg); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); + producer.shutdown(); + } } + } private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { @@ -284,4 +390,20 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } catch (Exception ignore) { } } + + static class TransactionListenerImpl implements TransactionListener { + private AtomicInteger transactionIndex = new AtomicInteger(0); + + private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); + + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java index 2e96566..28fd7a5 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.tools.admin.MQAdminExt; @@ -93,7 +93,8 @@ public class CollectTaskRunnble implements Runnable { try { list = dashboardCollectService.getTopicMap().get(topic); } catch (ExecutionException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } if (null == list) { list = Lists.newArrayList(); diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java index cbc08da..d58668b 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java @@ -34,10 +34,10 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import javax.annotation.Resource; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.service.DashboardCollectService; @@ -84,7 +84,8 @@ public class DashboardCollectTask { } } catch (Exception err) { - throw Throwables.propagate(err); + Throwables.throwIfUnchecked(err); + throw new RuntimeException(err); } } @@ -128,7 +129,8 @@ public class DashboardCollectTask { log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap())); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -144,10 +146,12 @@ public class DashboardCollectTask { Thread.sleep(1000); } catch (InterruptedException e1) { - throw Throwables.propagate(e1); + Throwables.throwIfUnchecked(e1); + throw new RuntimeException(e1); } fetchBrokerRuntimeStats(brokerAddr, retryTime - 1); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -197,7 +201,8 @@ public class DashboardCollectTask { } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -250,7 +255,7 @@ public class DashboardCollectTask { private void addSystemTopic() throws Exception { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - HashMap> clusterTable = clusterInfo.getClusterAddrTable(); + Map> clusterTable = clusterInfo.getClusterAddrTable(); for (Map.Entry> entry : clusterTable.entrySet()) { String clusterName = entry.getKey(); TopicValidator.addSystemTopic(clusterName); diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java index 710929b..3c8a77e 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java @@ -40,7 +40,7 @@ public class MonitorTask { // @Scheduled(cron = "* * * * * ?") public void scanProblemConsumeGroup() { for (Map.Entry configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) { - GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey()); + GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null); if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) { logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0ab405e..fe4d283 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -42,7 +42,9 @@ rocketmq: # configure multiple namesrv addresses to manage multiple different clusters namesrvAddrs: - 127.0.0.1:9876 - - 127.0.0.2:9876 + # - 127.0.0.2:9876 + # - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876 + # - 10.151.47.30:9876 # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true isVIPChannel: # timeout for mqadminExt, default 5000ms @@ -57,9 +59,12 @@ rocketmq: # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required loginRequired: false useTLS: false + proxyAddr: 127.0.0.1:8080 + proxyAddrs: + - 127.0.0.1:8080 # set the accessKey and secretKey if you used acl - accessKey: # if version > 4.4.0 - secretKey: # if version > 4.4.0 +# accessKey: rocketmq2 +# secretKey: 12345678 threadpool: config: diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 76823e8..87ffc3b 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -25,10 +25,10 @@ - ${user.home}/logs/consolelogs/rocketmq-console.log + ${user.home}/logs/dashboardlogs/rocketmq-dashboard.log true - ${user.home}/logs/consolelogs/rocketmq-console-%d{yyyy-MM-dd}.%i.log + ${user.home}/logs/dashboardlogs/rocketmq-dashboard-%d{yyyy-MM-dd}.%i.log diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index c2bf349..ee3c3fe 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -104,6 +104,7 @@ + diff --git a/src/main/resources/static/src/app.js b/src/main/resources/static/src/app.js index a7ca1be..1bbb650 100644 --- a/src/main/resources/static/src/app.js +++ b/src/main/resources/static/src/app.js @@ -213,6 +213,9 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro }).when('/ops', { templateUrl: 'view/pages/ops.html', controller:'opsController' + }).when('/proxy', { + templateUrl: 'view/pages/proxy.html', + controller:'proxyController' }).when('/acl', { templateUrl: 'view/pages/acl.html', controller: 'aclController' diff --git a/src/main/resources/static/src/consumer.js b/src/main/resources/static/src/consumer.js index 6641dd8..d192334 100644 --- a/src/main/resources/static/src/consumer.js +++ b/src/main/resources/static/src/consumer.js @@ -45,6 +45,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); $scope.filterNormal = true; $scope.filterSystem = false; + $scope.filterFIFO = false; $scope.doSort = function () {// todo how to change this fe's code ? (it's dirty) if ($scope.sortKey == 'diffTotal') { @@ -75,7 +76,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific $http({ method: "GET", - url: "consumer/groupList.query" + url: "consumer/groupList.query", + params: { + skipSysGroup: false, + address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null + } }).success(function (resp) { if (resp.status == 0) { $scope.allConsumerGrouopList = resp.data; @@ -135,16 +140,28 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific $scope.filterList(1); }); - $scope.filterByType = function (str) { + $scope.$watch('filterFIFO', function () { + $scope.filterList(1); + }); + + $scope.filterByType = function (str, type,version) { if ($scope.filterSystem) { - if (str.startsWith("%S")) { + if (type === "SYSTEM") { return true } } if ($scope.filterNormal) { - if (str.startsWith("%") == false) { + if (type === "NORMAL") { return true } + if(!version && type === "FIFO"){ + return true; + } + } + if ($scope.filterFIFO) { + if (type === "FIFO") { + return true; + } } return false; }; @@ -154,7 +171,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific var canShowList = []; $scope.allConsumerGrouopList.forEach(function (element) { console.log(element) - if ($scope.filterByType(element.group)) { + if ($scope.filterByType(element.group, element.subGroupType, $scope.rmqVersion)) { if (element.group.toLowerCase().indexOf(lowExceptStr) != -1) { canShowList.push(element); } @@ -189,6 +206,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific subscriptionGroupConfig: { groupName: "", consumeEnable: true, + consumeMessageOrderly: false, consumeFromMinEnable: true, consumeBroadcastEnable: true, retryQueueNums: 1, @@ -211,7 +229,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific // Refresh topic list $scope.refreshConsumerData(); }, - template: 'consumerModifyDialog', + template: $scope.rmqVersion ? 'consumerModifyDialogForV5' : 'consumerModifyDialog', controller: 'consumerModifyDialogController', data: { consumerRequestList: request, @@ -226,11 +244,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific } }); }; - $scope.detail = function (consumerGroupName) { + $scope.detail = function (consumerGroupName, address) { $http({ method: "GET", url: "consumer/queryTopicByConsumer.query", - params: {consumerGroup: consumerGroupName} + params: {consumerGroup: consumerGroupName, address: address} }).success(function (resp) { if (resp.status == 0) { console.log(resp); @@ -245,11 +263,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific }); }; - $scope.client = function (consumerGroupName) { + $scope.client = function (consumerGroupName, address) { $http({ method: "GET", url: "consumer/consumerConnection.query", - params: {consumerGroup: consumerGroupName} + params: {consumerGroup: consumerGroupName, address: address} }).success(function (resp) { if (resp.status == 0) { console.log(resp); diff --git a/src/main/resources/static/src/controller.js b/src/main/resources/static/src/controller.js index cdcced5..1d71c67 100644 --- a/src/main/resources/static/src/controller.js +++ b/src/main/resources/static/src/controller.js @@ -15,10 +15,18 @@ * limitations under the License. */ app.controller('AppCtrl', ['$scope','$window','$translate','$http','Notification', function ($scope,$window,$translate, $http, Notification) { + $scope.rmqVersion = localStorage.getItem("isV5"); + $scope.changeTranslate = function(langKey){ $translate.use(langKey); } + $scope.changeRMQVersion = function (version) { + $scope.rmqVersion = version === 5; + var v = version === 5; + localStorage.setItem("isV5", v); + } + $scope.logout = function(){ $http({ method: "POST", diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index f9a4e3c..2c1450d 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -54,9 +54,13 @@ var en = { "RESET_CUS_OFFSET": "Reset Consumer Offset", "DELETE": "Delete", "CHANGE_LANG": "ChangeLanguage", + "CHANGE_VERSION": "ChangeVersion", "BROKER": "Broker", "NORMAL": "NORMAL", "RETRY": "RETRY", + "FIFO": "FIFO", + "TRANSACTION": "TRANSACTION", + "UNSPECIFIED": "UNSPECIFIED", "DLQ": "DLQ", "QUANTITY":"Quantity", "TYPE":"Type", @@ -97,6 +101,7 @@ var en = { "RESET_OFFSET":"resetOffset", "CLUSTER_NAME":"clusterName", "OPS":"OPS", + "PROXY":"Proxy", "AUTO_REFRESH":"AUTO_REFRESH", "REFRESH":"REFRESH", "LOGOUT":"Logout", @@ -123,5 +128,11 @@ var en = { "GROUP_PERM":"Group Permission", "SYNCHRONIZE":"Synchronize Data", "SHOW":"Show", - "HIDE":"Hide" + "HIDE":"Hide", + "MESSAGE_TYPE":"messageType", + "MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL", + "MESSAGE_TYPE_NORMAL": "NORMAL", + "MESSAGE_TYPE_FIFO": "FIFO", + "MESSAGE_TYPE_DELAY": "DELAY", + "MESSAGE_TYPE_TRANSACTION": "TRANSACTION", } diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index b6fa589..2f0e6f3 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -55,9 +55,13 @@ var zh = { "SKIP_MESSAGE_ACCUMULATE":"跳过堆积", "DELETE": "删除", "CHANGE_LANG": "更换语言", + "CHANGE_VERSION": "更换版本", "BROKER": "Broker", "NORMAL": "普通", "RETRY": "重试", + "FIFO": "顺序", + "TRANSACTION": "事务", + "UNSPECIFIED": "未指定", "DLQ": "死信", "QUANTITY":"数量", "TYPE":"类型", @@ -98,6 +102,7 @@ var zh = { "RESET_OFFSET":"重置位点", "CLUSTER_NAME":"集群名", "OPS":"运维", + "PROXY":"代理", "AUTO_REFRESH":"自动刷新", "REFRESH":"刷新", "LOGOUT":"退出", @@ -124,5 +129,11 @@ var zh = { "GROUP_PERM":"消费组权限", "SYNCHRONIZE":"同步", "SHOW":"显示", - "HIDE":"隐藏" + "HIDE":"隐藏", + "MESSAGE_TYPE":"消息类型", + "MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息", + "MESSAGE_TYPE_NORMAL": "普通消息", + "MESSAGE_TYPE_FIFO": "顺序消息", + "MESSAGE_TYPE_DELAY": "定时/延时消息", + "MESSAGE_TYPE_TRANSACTION": "事务消息", } \ No newline at end of file diff --git a/src/main/resources/static/src/proxy.js b/src/main/resources/static/src/proxy.js new file mode 100644 index 0000000..4461b09 --- /dev/null +++ b/src/main/resources/static/src/proxy.js @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +var module = app; +module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window', + function ($scope, $location, $http, Notification, remoteApi, tools, $window) { + $scope.proxyAddrList = []; + $scope.userRole = $window.sessionStorage.getItem("userrole"); + $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); + $scope.inputReadonly = !$scope.writeOperationEnabled; + $scope.newProxyAddr = ""; + $scope.allProxyConfig = {}; + + $http({ + method: "GET", + url: "proxy/homePage.query" + }).success(function (resp) { + if (resp.status == 0) { + $scope.proxyAddrList = resp.data.proxyAddrList; + $scope.selectedProxy = resp.data.currentProxyAddr; + $scope.showProxyDetailConfig($scope.selectedProxy); + localStorage.setItem('proxyAddr',$scope.selectedProxy); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + + $scope.eleChange = function (data) { + $scope.proxyAddrList = data; + } + $scope.showDetailConf = function () { + $(".proxyModal").modal(); + } + + + $scope.showProxyDetailConfig = function (proxyAddr) { + $http({ + method: "GET", + url: "proxy/proxyDetailConfig.query", + params: {proxyAddress: proxyAddr} + }).success(function (resp) { + if (resp.status == 0) { + $scope.allProxyConfig = resp.data; + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + + $scope.updateProxyAddr = function () { + $http({ + method: "POST", + url: "proxy/updateProxyAddr.do", + params: {proxyAddr: $scope.selectedProxy} + }).success(function (resp) { + if (resp.status == 0) { + localStorage.setItem('proxyAddr', $scope.selectedProxy); + Notification.info({message: "SUCCESS", delay: 2000}); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + $scope.showProxyDetailConfig($scope.selectedProxy); + }; + + $scope.addProxyAddr = function () { + $http({ + method: "POST", + url: "proxy/addProxyAddr.do", + params: {newProxyAddr: $scope.newProxyAddr} + }).success(function (resp) { + if (resp.status == 0) { + if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) { + $scope.proxyAddrList.push($scope.newProxyAddr); + } + $("#proxyAddr").val(""); + $scope.newProxyAddr = ""; + Notification.info({message: "SUCCESS", delay: 2000}); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + }]) diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index 998f219..81383b3 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -45,24 +45,31 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati } }; $scope.filterNormal = true + $scope.filterDelay = false + $scope.filterFifo = false + $scope.filterTransaction = false + $scope.filterUnspecified = false $scope.filterRetry = false $scope.filterDLQ = false $scope.filterSystem = false $scope.allTopicList = []; + $scope.allTopicNameList = []; + $scope.allMessageTypeList = []; $scope.topicShowList = []; $scope.userRole = $window.sessionStorage.getItem("userrole"); - $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); + $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); $scope.refreshTopicList = function () { $http({ method: "GET", - url: "topic/list.query" + url: "topic/list.queryTopicType" }).success(function (resp) { if (resp.status == 0) { - $scope.allTopicList = resp.data.topicList.sort(); - console.log($scope.allTopicList); + $scope.allTopicNameList = resp.data.topicNameList; + $scope.allMessageTypeList = resp.data.messageTypeList; + console.log($scope.allTopicNameList); console.log(JSON.stringify(resp)); - $scope.showTopicList(1, $scope.allTopicList.length); + $scope.showTopicList(1, $scope.allTopicNameList.length); } else { Notification.error({message: resp.errMsg, delay: 5000}); @@ -79,6 +86,18 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati $scope.$watch('filterNormal', function () { $scope.filterList(1); }); + $scope.$watch('filterFifo', function () { + $scope.filterList(1); + }); + $scope.$watch('filterTransaction', function () { + $scope.filterList(1); + }); + $scope.$watch('filterUnspecified', function () { + $scope.filterList(1); + }); + $scope.$watch('filterDelay', function () { + $scope.filterList(1); + }); $scope.$watch('filterRetry', function () { $scope.filterList(1); }); @@ -92,13 +111,13 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati var lowExceptStr = $scope.filterStr.toLowerCase(); var canShowList = []; - $scope.allTopicList.forEach(function (element) { - if ($scope.filterByType(element)) { - if (element.toLowerCase().indexOf(lowExceptStr) != -1) { - canShowList.push(element); + for (let i = 0; i < $scope.allTopicNameList.length; ++i) { + if ($scope.filterByType($scope.allTopicNameList[i], $scope.allMessageTypeList[i])) { + if ($scope.allTopicNameList[i].toLowerCase().indexOf(lowExceptStr) != -1) { + canShowList.push($scope.allTopicNameList[i]); } } - }); + } $scope.paginationConf.totalItems = canShowList.length; var perPage = $scope.paginationConf.itemsPerPage; var from = (currentPage - 1) * perPage; @@ -106,7 +125,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati $scope.topicShowList = canShowList.slice(from, to); }; - $scope.filterByType = function (str) { + $scope.filterByType = function (str, type) { if ($scope.filterRetry) { if (str.startsWith("%R")) { return true @@ -122,8 +141,31 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati return true } } + if (localStorage.getItem('isV5') && $scope.filterUnspecified) { + if (type.includes("UNSPECIFIED")) { + return true + } + } if ($scope.filterNormal) { - if (str.startsWith("%") == false) { + if (type.includes("NORMAL")) { + return true + } + if (!localStorage.getItem('isV5') && type.includes("UNSPECIFIED")) { + return true + } + } + if (localStorage.getItem('isV5') && $scope.filterDelay) { + if (type.includes("DELAY")) { + return true + } + } + if (localStorage.getItem('isV5') && $scope.filterFifo) { + if (type.includes("FIFO")) { + return true + } + } + if (localStorage.getItem('isV5') && $scope.filterTransaction) { + if (type.includes("TRANSACTION")) { return true } } @@ -138,10 +180,10 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati var perPage = $scope.paginationConf.itemsPerPage; var from = (currentPage - 1) * perPage; var to = (from + perPage) > totalItem ? totalItem : from + perPage; - console.log($scope.allTopicList); + console.log($scope.allTopicNameList); console.log(from) console.log(to) - $scope.topicShowList = $scope.allTopicList.slice(from, to); + $scope.topicShowList = $scope.allTopicNameList.slice(from, to); $scope.paginationConf.totalItems = totalItem; console.log($scope.topicShowList) console.log($scope.paginationConf.totalItems) @@ -328,8 +370,8 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati var bIsUpdate = true; if (request == null) { request = [{ - writeQueueNums: 16, - readQueueNums: 16, + writeQueueNums: 8, + readQueueNums: 8, perm: 6, order: false, topicName: "", @@ -355,6 +397,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati topicRequestList: request, allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable), allBrokerNameList: Object.keys(resp.data.brokerServer), + allMessageTypeList: resp.data.messageTypes, bIsUpdate: bIsUpdate, writeOperationEnabled: $scope.writeOperationEnabled } diff --git a/src/main/resources/static/view/layout/_header.html b/src/main/resources/static/view/layout/_header.html index f448541..8159138 100644 --- a/src/main/resources/static/view/layout/_header.html +++ b/src/main/resources/static/view/layout/_header.html @@ -28,6 +28,7 @@ {{'NORMAL' | translate}} + {{'FIFO' | translate}} + {{'SYSTEM' | translate}} - @@ -320,6 +322,116 @@ + +