Compare commits

..

22 Commits

Author SHA1 Message Date
dependabot[bot]
9458874743 Merge 4f6b3d2e36 into e57d423268 2024-11-26 11:08:49 +08:00
yuz10
e57d423268 remove rocketmq-namesrv dependency (#254) 2024-11-04 15:49:02 +08:00
dependabot[bot]
4f6b3d2e36 Bump cookie and express in /frontend
Bumps [cookie](https://github.com/jshttp/cookie) and [express](https://github.com/expressjs/express). These dependencies needed to be updated together.

Updates `cookie` from 0.4.0 to 0.7.1
- [Release notes](https://github.com/jshttp/cookie/releases)
- [Commits](https://github.com/jshttp/cookie/compare/v0.4.0...v0.7.1)

Updates `express` from 4.17.1 to 4.21.1
- [Release notes](https://github.com/expressjs/express/releases)
- [Changelog](https://github.com/expressjs/express/blob/4.21.1/History.md)
- [Commits](https://github.com/expressjs/express/compare/4.17.1...4.21.1)

---
updated-dependencies:
- dependency-name: cookie
  dependency-type: indirect
- dependency-name: express
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-11-04 03:38:30 +00:00
dependabot[bot]
0d87486d7a Bump snakeyaml from 1.30 to 1.32 (#130)
---
updated-dependencies:
- dependency-name: org.yaml:snakeyaml
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-11-04 11:36:30 +08:00
yuz10
94d7a4e418 remove rocketmq-broker dependency (#249) 2024-11-04 11:35:39 +08:00
xx
3fbaa3ab92 fix: Duplicate message in the topic tab message list in the message menu. (#202)
Co-authored-by: xx <xx@123.com>
2024-11-04 11:35:17 +08:00
RongtongJin
e6d454301f [maven-release-plugin] prepare for next development iteration 2024-09-18 09:57:37 +08:00
RongtongJin
f5c09ac287 [maven-release-plugin] prepare release rocketmq-dashboard-2.0.0 2024-09-18 09:57:04 +08:00
Guyu
e97072a3b1 style: Remove unused imports for the checkstyle. (#232)
* fix: 5.x query message detail throw: Failed to query message by Id: xxx

* style: Remove unused imports for the checkstyle.

---------

Co-authored-by: yangzengc <yangzengc@ewan.cn>
2024-09-12 16:51:34 +08:00
Evan
6d360509c0 fix: 5.x query message detail throw: Failed to query message by Id: xxx (#231)
Co-authored-by: yangzengc <yangzengc@ewan.cn>
2024-09-07 19:59:38 +08:00
Akai
464f57adf8 Support retryMaxTimes filed set for consumer group (#229)
* fix:Fixed the issue that normal messages in version v4 are not showed

* feat:support retryMaxTimes args set

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
Co-authored-by: yuanziwei.akai <yuanziwei.akai@bytedance.com>
2024-08-27 20:30:00 +08:00
Akai
5d08d3b122 Support Unspecified Topic Add & Update & Query (#223)
* fix:Fixed the issue that normal messages in version v4 are not showed

* feat:support unspecified topic

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
2024-07-24 10:57:04 +08:00
Akai
d9fc76d3a3 fix:Fixed the issue that normal messages in version v4 are not showed (#222)
Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
2024-07-24 10:56:47 +08:00
Akai
2bc59db340 Supplement UserGuide for RocketMQ 5.0 (#208)
* Support UserGuide for new feature

* update userGuide md

* update userGuide md

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
2024-06-13 15:22:38 +08:00
Akai
d58e13da95 Proxy Support And ConsumerGroup Enhancement (#207)
* Support dashboard v4-v5 switch And query for v5 topic

* Modify tag name

* Support proxy-module And Fix the problem of showing wrong consumerGroup-info

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
2024-06-12 09:12:19 +08:00
Akai
e7cb315050 Support FIFO-Type SubGroup Add、Update and Query For V5 (#204)
* Support dashboard v4-v5 switch And query for v5 topic

* Modify tag name

* Support subGroup FIFO Type Query and Update

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
2024-06-11 10:53:36 +08:00
Akai
21dc2acfdc Support dashboard v4-v5 switch And query for v5 topic (#198)
* Support dashboard v4-v5 switch And query for v5 topic

* Modify tag name

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
2024-06-05 15:17:23 +08:00
guangdashao
823bce2b8b feat: add topic message type
add message type
2024-06-04 11:40:46 +08:00
Javen
2fb0fce0b1 perf: The new metrics of getTransferredTps for rocketmq5.x and the old metrics of getTransferedTps for rocketmq4.x (#197)
Co-authored-by: jinwei2 <jinwei2@enmonster.com>
2024-03-26 17:02:16 +08:00
Abhijeet Mishra
6456630324 [#148] Throwables.propagate in deprecated for making runtime exception more verbose (#160) 2023-04-19 20:33:17 +08:00
Abhijeet Mishra
a25ccd6337 5.1.0 rocketmq version update (#155)
* update rocketmq version to 5.1.0
2023-04-07 08:30:18 +08:00
Abhijeet Mishra
538d1c1c45 [ISSUE apache#149] updated lombok version in pom.xml because of this compilation was failing (#151) 2023-03-20 15:33:45 +08:00
71 changed files with 19016 additions and 11922 deletions

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@
.project .project
.factorypath .factorypath
.settings/ .settings/
.vscode

View File

@@ -63,6 +63,18 @@
* 根据消息主题和消息Id进行消息的查询 * 根据消息主题和消息Id进行消息的查询
* 消息详情可以展示这条消息的详细信息,查看消息对应到具体消费组的消费情况(如果异常,可以查看具体的异常信息)。可以向指定的消费组重发消息。 * 消息详情可以展示这条消息的详细信息,查看消息对应到具体消费组的消费情况(如果异常,可以查看具体的异常信息)。可以向指定的消费组重发消息。
## RocketMQ-V5.0 仪表盘
* 版本切换
* RocketMQ右上角可切换不同版本用户可以自主选择 RocketMQ-5.x 或 RocketMQ-4.x 版本
* 主题页面
* 支持延迟/顺序/事务消息的筛选
* 支持延迟/顺序/事物/普通等多种消息类型主题的新增与更新
* 消费页面
* 支持顺序消费类型订阅组的过滤
* 提供顺序消费类型订阅组的新增与更新如果需要开启顺序消费FIFO类型的订阅组一定需要打开consumeOrderlyEnable选项
* 代理页面RocketMQ 5.0新增)
* 支持代理节点的新增与查询
* 支持代理节点地址配置在application.yml中可对proxyAddr和proxyAddrs属性进行预配置
## HTTPS 方式访问Dashboard ## HTTPS 方式访问Dashboard
* HTTPS功能实际上是使用SpringBoot提供的配置功能即可完成首先需要有一个SSL KeyStore来存放服务端证书可以使用本工程所提供的测试密钥库: * HTTPS功能实际上是使用SpringBoot提供的配置功能即可完成首先需要有一个SSL KeyStore来存放服务端证书可以使用本工程所提供的测试密钥库:

View File

@@ -64,6 +64,18 @@
* look over this message's detail info.you can see the message's consume state(each group has one line),show the exception message if has exception. * look over this message's detail info.you can see the message's consume state(each group has one line),show the exception message if has exception.
you can send this message to the group you selected you can send this message to the group you selected
## RocketMQ-V5.0 dashboard
* Version switching
* RocketMQ can switch between different versions in the upper right corner, and users can freely choose between RocketMQ-5.X or RocketMQ-4.X versions
* Theme page
* Support filtering of delayed/sequential/transaction messages
* Support the addition and update of multiple message types such as delay, sequence, object, and ordinary themes
* Consumption page
* Support filtering of subscription groups for fifo consumption types
* Provide the addition and update of subscription groups for sequential consumption types. If fifo consumption needs to be enabled, FIFO type subscription groups must have the consumeOrderlyEnable option enabled
* Proxy page (Added in RocketMQ 5.0)
* Support for adding and querying proxy nodes
* Support proxy node address configuration: ProxyAddr and proxyAddrs properties can be pre configured in application.yml
## Access Dashboard with HTTPS ## Access Dashboard with HTTPS
* SpringBoot itself has provided the SSL configuration. You can use the project test Keystore:resources/rmqcngkeystore.jks. The store is generated with the following unix keytool commands: * SpringBoot itself has provided the SSL configuration. You can use the project test Keystore:resources/rmqcngkeystore.jks. The store is generated with the following unix keytool commands:

17393
frontend/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,7 @@
"react": "^17.0.2", "react": "^17.0.2",
"react-dom": "^17.0.2", "react-dom": "^17.0.2",
"react-json-view": "^1.21.3", "react-json-view": "^1.21.3",
"react-scripts": "5.0.1", "react-scripts": "4.0.3",
"web-vitals": "^1.0.1" "web-vitals": "^1.0.1"
}, },
"proxy": "http://localhost:8080", "proxy": "http://localhost:8080",

File diff suppressed because it is too large Load Diff

12
pom.xml
View File

@@ -28,7 +28,7 @@
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-dashboard</artifactId> <artifactId>rocketmq-dashboard</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<version>1.0.1-SNAPSHOT</version> <version>2.0.1-SNAPSHOT</version>
<name>rocketmq-dashboard</name> <name>rocketmq-dashboard</name>
<scm> <scm>
@@ -91,10 +91,10 @@
<commons-io.version>2.4</commons-io.version> <commons-io.version>2.4</commons-io.version>
<commons-cli.version>1.2</commons-cli.version> <commons-cli.version>1.2</commons-cli.version>
<commons-collections.version>3.2.2</commons-collections.version> <commons-collections.version>3.2.2</commons-collections.version>
<rocketmq.version>4.9.3</rocketmq.version> <rocketmq.version>5.1.0</rocketmq.version>
<surefire.version>2.19.1</surefire.version> <surefire.version>2.19.1</surefire.version>
<aspectj.version>1.9.6</aspectj.version> <aspectj.version>1.9.6</aspectj.version>
<lombok.version>1.18.12</lombok.version> <lombok.version>1.18.22</lombok.version>
<main.basedir>${basedir}/../..</main.basedir> <main.basedir>${basedir}/../..</main.basedir>
<docker.image.prefix>apacherocketmq</docker.image.prefix> <docker.image.prefix>apacherocketmq</docker.image.prefix>
<spring.boot.version>2.6.0</spring.boot.version> <spring.boot.version>2.6.0</spring.boot.version>
@@ -104,7 +104,7 @@
<easyexcel.version>2.2.10</easyexcel.version> <easyexcel.version>2.2.10</easyexcel.version>
<asm.version>4.2</asm.version> <asm.version>4.2</asm.version>
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
<snakeyaml.version>1.30</snakeyaml.version> <snakeyaml.version>1.32</snakeyaml.version>
<cglib.version>2.2.2</cglib.version> <cglib.version>2.2.2</cglib.version>
<joor.version>0.9.6</joor.version> <joor.version>0.9.6</joor.version>
<bcpkix-jdk15on.version>1.68</bcpkix-jdk15on.version> <bcpkix-jdk15on.version>1.68</bcpkix-jdk15on.version>
@@ -167,6 +167,7 @@
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-namesrv</artifactId> <artifactId>rocketmq-namesrv</artifactId>
<version>${rocketmq.version}</version> <version>${rocketmq.version}</version>
<scope>test</scope>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -186,6 +187,7 @@
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-broker</artifactId> <artifactId>rocketmq-broker</artifactId>
<version>${rocketmq.version}</version> <version>${rocketmq.version}</version>
<scope>test</scope>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -459,7 +461,7 @@
<configuration> <configuration>
<target> <target>
<copy todir="${project.build.directory}/classes/public"> <copy todir="${project.build.directory}/classes/public">
<fileset dir="${project.basedir}/frontend/build"/> <fileset dir="${project.basedir}/frontend/build" />
</copy> </copy>
</target> </target>
</configuration> </configuration>

View File

@@ -21,7 +21,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
@Slf4j @Slf4j

View File

@@ -43,6 +43,8 @@ public class RMQConfigure {
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env //use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private volatile String proxyAddr;
private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"); private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
@@ -62,6 +64,8 @@ public class RMQConfigure {
private List<String> namesrvAddrs = new ArrayList<>(); private List<String> namesrvAddrs = new ArrayList<>();
private List<String> proxyAddrs = new ArrayList<>();
public String getAccessKey() { public String getAccessKey() {
return accessKey; return accessKey;
} }
@@ -86,6 +90,25 @@ public class RMQConfigure {
return namesrvAddrs; return namesrvAddrs;
} }
public List<String> getProxyAddrs() {
return this.proxyAddrs;
}
public void setProxyAddrs(List<String> proxyAddrs) {
this.proxyAddrs = proxyAddrs;
if (CollectionUtils.isNotEmpty(proxyAddrs)) {
this.setProxyAddr(proxyAddrs.get(0));
}
}
public String getProxyAddr() {
return proxyAddr;
}
public void setProxyAddr(String proxyAddr) {
this.proxyAddr = proxyAddr;
}
public void setNamesrvAddrs(List<String> namesrvAddrs) { public void setNamesrvAddrs(List<String> namesrvAddrs) {
this.namesrvAddrs = namesrvAddrs; this.namesrvAddrs = namesrvAddrs;
if (CollectionUtils.isNotEmpty(namesrvAddrs)) { if (CollectionUtils.isNotEmpty(namesrvAddrs)) {

View File

@@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.controller;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.model.ConnectionInfo;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
@@ -47,14 +47,14 @@ public class ConsumerController {
@RequestMapping(value = "/groupList.query") @RequestMapping(value = "/groupList.query")
@ResponseBody @ResponseBody
public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) { public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) {
return consumerService.queryGroupList(skipSysGroup); return consumerService.queryGroupList(skipSysGroup, address);
} }
@RequestMapping(value = "/group.query") @RequestMapping(value = "/group.query")
@ResponseBody @ResponseBody
public Object groupQuery(@RequestParam String consumerGroup) { public Object groupQuery(@RequestParam String consumerGroup, String address) {
return consumerService.queryGroup(consumerGroup); return consumerService.queryGroup(consumerGroup, address);
} }
@RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST}) @RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@ public class ConsumerController {
@RequestMapping(value = "/queryTopicByConsumer.query") @RequestMapping(value = "/queryTopicByConsumer.query")
@ResponseBody @ResponseBody
public Object queryConsumerByTopic(@RequestParam String consumerGroup) { public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) {
return consumerService.queryConsumeStatsListByGroupName(consumerGroup); return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address);
} }
@RequestMapping(value = "/consumerConnection.query") @RequestMapping(value = "/consumerConnection.query")
@ResponseBody @ResponseBody
public Object consumerConnection(@RequestParam(required = false) String consumerGroup) { public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) {
ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup); ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address);
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet())); consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
return consumerConnection; return consumerConnection;
} }

View File

@@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.controller;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.model.request.MessageQuery;

View File

@@ -17,7 +17,7 @@
package org.apache.rocketmq.dashboard.controller; package org.apache.rocketmq.dashboard.controller;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.dashboard.model.ConnectionInfo; import org.apache.rocketmq.dashboard.model.ConnectionInfo;
import org.apache.rocketmq.dashboard.permisssion.Permission; import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProducerService; import org.apache.rocketmq.dashboard.service.ProducerService;

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping("/proxy")
@Permission
public class ProxyController {
@Resource
private ProxyService proxyService;
@RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
@ResponseBody
public Object homePage() {
return proxyService.getProxyHomePage();
}
@RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object addProxyAddr(@RequestParam String newProxyAddr) {
proxyService.addProxyAddrList(newProxyAddr);
return true;
}
@RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object updateProxyAddr(@RequestParam String proxyAddr) {
proxyService.updateProxyAddrList(proxyAddr);
return true;
}
}

View File

@@ -56,6 +56,12 @@ public class TopicController {
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq); return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
} }
@RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
@ResponseBody
public Object listTopicType() {
return topicService.examineAllTopicType();
}
@RequestMapping(value = "/stats.query", method = RequestMethod.GET) @RequestMapping(value = "/stats.query", method = RequestMethod.GET)
@ResponseBody @ResponseBody
public Object stats(@RequestParam String topic) { public Object stats(@RequestParam String topic) {

View File

@@ -20,7 +20,7 @@ import com.google.common.collect.Sets;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
public class ConnectionInfo extends Connection { public class ConnectionInfo extends Connection {
private String versionDesc; private String versionDesc;

View File

@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.List; import java.util.List;

View File

@@ -17,8 +17,8 @@
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import lombok.Data; import lombok.Data;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
@Data @Data
public class DlqMessageResendResult { public class DlqMessageResendResult {

View File

@@ -16,8 +16,10 @@
*/ */
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import java.util.List;
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group; private String group;
@@ -25,8 +27,11 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private int count; private int count;
private ConsumeType consumeType; private ConsumeType consumeType;
private MessageModel messageModel; private MessageModel messageModel;
private List<String> address;
private int consumeTps; private int consumeTps;
private long diffTotal = -1; private long diffTotal = -1;
private String subGroupType = "NORMAL";
public String getGroup() { public String getGroup() {
return group; return group;
@@ -68,6 +73,22 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
this.diffTotal = diffTotal; this.diffTotal = diffTotal;
} }
public List<String> getAddress() {
return address;
}
public void setAddress(List<String> address) {
this.address = address;
}
public String getSubGroupType() {
return subGroupType;
}
public void setSubGroupType(String subGroupType) {
this.subGroupType = subGroupType;
}
@Override @Override
public int compareTo(GroupConsumeInfo o) { public int compareTo(GroupConsumeInfo o) {
if (this.count != o.count) { if (this.count != o.count) {

View File

@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.dashboard.model; package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;

View File

@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.dashboard.model.request; package org.apache.rocketmq.dashboard.model.request;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import java.util.List; import java.util.List;

View File

@@ -15,7 +15,6 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.dashboard.model.request; package org.apache.rocketmq.dashboard.model.request;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import java.util.List; import java.util.List;
@@ -32,6 +31,7 @@ public class TopicConfigInfo {
private int perm; private int perm;
private boolean order; private boolean order;
private String messageType;
public List<String> getClusterNameList() { public List<String> getClusterNameList() {
return clusterNameList; return clusterNameList;
} }
@@ -92,6 +92,18 @@ public class TopicConfigInfo {
this.order = order; this.order = order;
} }
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) if (this == o)
@@ -103,12 +115,13 @@ public class TopicConfigInfo {
readQueueNums == that.readQueueNums && readQueueNums == that.readQueueNums &&
perm == that.perm && perm == that.perm &&
order == that.order && order == that.order &&
Objects.equal(topicName, that.topicName); Objects.equal(topicName, that.topicName) &&
Objects.equal(messageType, that.messageType);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order); return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order,messageType);
} }
} }

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model.request;
import java.util.List;
public class TopicTypeList {
private List<String> topicNameList;
private List<String> messageTypeList;
public List<String> getTopicNameList() {
return topicNameList;
}
public void setTopicNameList(List<String> topicNameList) {
this.topicNameList = topicNameList;
}
public List<String> getMessageTypeList() {
return messageTypeList;
}
public void setMessageTypeList(List<String> messageTypeList) {
this.messageTypeList = messageTypeList;
}
public TopicTypeList(List<String> topicNameList, List<String> messageTypeList) {
this.topicNameList = topicNameList;
this.messageTypeList = messageTypeList;
}
}

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model.request;
public class TopicTypeMeta {
private String topicName;
private String messageType;
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
}

View File

@@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@@ -28,7 +28,7 @@ import org.apache.commons.collections.CollectionUtils;
public abstract class AbstractCommonService { public abstract class AbstractCommonService {
@Resource @Resource
protected MQAdminExt mqAdminExt; protected MQAdminExt mqAdminExt;
protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable, protected final Set<String> changeToBrokerNameSet(Map<String, Set<String>> clusterAddrTable,
List<String> clusterNameList, List<String> brokerNameList) { List<String> clusterNameList, List<String> brokerNameList) {
Set<String> finalBrokerNameList = Sets.newHashSet(); Set<String> finalBrokerNameList = Sets.newHashSet();
if (CollectionUtils.isNotEmpty(clusterNameList)) { if (CollectionUtils.isNotEmpty(clusterNameList)) {
@@ -38,7 +38,8 @@ public abstract class AbstractCommonService {
} }
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
if (CollectionUtils.isNotEmpty(brokerNameList)) { if (CollectionUtils.isNotEmpty(brokerNameList)) {

View File

@@ -17,8 +17,8 @@
package org.apache.rocketmq.dashboard.service; package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo; import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo; import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
@@ -31,12 +31,12 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
public interface ConsumerService { public interface ConsumerService {
List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup); List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
GroupConsumeInfo queryGroup(String consumerGroup); GroupConsumeInfo queryGroup(String consumerGroup, String address);
List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName); List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address);
List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName); List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
@@ -52,7 +52,7 @@ public interface ConsumerService {
Set<String> fetchBrokerNameSetBySubscriptionGroup(String group); Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
ConsumerConnection getConsumerConnection(String consumerGroup); ConsumerConnection getConsumerConnection(String consumerGroup, String address);
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
} }

View File

@@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.MessageTrack;

View File

@@ -17,7 +17,7 @@
package org.apache.rocketmq.dashboard.service; package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
public interface ProducerService { public interface ProducerService {
ProducerConnection getProducerConnection(String producerGroup, String topic); ProducerConnection getProducerConnection(String producerGroup, String topic);

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service;
import java.util.Map;
public interface ProxyService {
void addProxyAddrList(String proxyAddr);
void updateProxyAddrList(String proxyAddr);
Map<String, Object> getProxyHomePage();
}

View File

@@ -19,10 +19,11 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
@@ -31,6 +32,8 @@ import java.util.List;
public interface TopicService { public interface TopicService {
TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq); TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq);
TopicTypeList examineAllTopicType();
TopicStatsTable stats(String topic); TopicStatsTable stats(String topic);
TopicRouteData route(String topic); TopicRouteData route(String topic);

View File

@@ -17,43 +17,16 @@
package org.apache.rocketmq.dashboard.service.client; package org.apache.rocketmq.dashboard.service.client;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.util.JsonUtil; import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -62,13 +35,51 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.remoting.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.joor.Reflect; import org.apache.rocketmq.tools.admin.common.AdminToolResult;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service @Service
@@ -81,7 +92,7 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override @Override
public void updateBrokerConfig(String brokerAddr, Properties properties) public void updateBrokerConfig(String brokerAddr, Properties properties)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException { UnsupportedEncodingException, InterruptedException, MQBrokerException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
} }
@@ -128,7 +139,7 @@ public class MQAdminExtImpl implements MQAdminExt {
} }
@Override @Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws MQBrokerException {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = null; RemotingCommand response = null;
@@ -136,7 +147,8 @@ public class MQAdminExtImpl implements MQAdminExt {
response = remotingClient.invokeSync(addr, request, 3000); response = remotingClient.invokeSync(addr, request, 3000);
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
assert response != null; assert response != null;
switch (response.getCode()) { switch (response.getCode()) {
@@ -145,12 +157,12 @@ public class MQAdminExtImpl implements MQAdminExt {
return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
} }
default: default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
} }
@Override @Override
public TopicConfig examineTopicConfig(String addr, String topic) { public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null; RemotingCommand response = null;
@@ -158,7 +170,8 @@ public class MQAdminExtImpl implements MQAdminExt {
response = remotingClient.invokeSync(addr, request, 3000); response = remotingClient.invokeSync(addr, request, 3000);
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
@@ -166,7 +179,7 @@ public class MQAdminExtImpl implements MQAdminExt {
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
} }
default: default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
} }
@@ -376,14 +389,14 @@ public class MQAdminExtImpl implements MQAdminExt {
} }
@Override @Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, attributes);
} }
@Override @Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes)
throws MQClientException { throws MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
} }
@Override @Override
@@ -451,16 +464,19 @@ public class MQAdminExtImpl implements MQAdminExt {
} }
catch (Exception e) { catch (Exception e) {
} }
MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32, Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get(); if (clusterList == null || clusterList.isEmpty()) {
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { return MQAdminInstance.threadLocalMQAdminExt().queryMessage("", topic, msgId);
return qr.getMessageList().get(0); }
for (String name : clusterList) {
MessageExt messageExt = MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId);
if (messageExt != null) {
return messageExt;
}
} }
else {
return null; return null;
} }
}
@Override @Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic, public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic,
@@ -572,4 +588,256 @@ public class MQAdminExtImpl implements MQAdminExt {
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return false; return false;
} }
@Override
public void addBrokerToContainer(String brokerContainerAddr, String brokerConfig) throws InterruptedException,
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'addBrokerToContainer'");
}
@Override
public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'removeBrokerFromContainer'");
}
@Override
public void updateGlobalWhiteAddrConfig(String addr, String globalWhiteAddrs, String aclFileFullPath)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateGlobalWhiteAddrConfig'");
}
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineTopicStats'");
}
@Override
public AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(String topic) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineTopicStatsConcurrent'");
}
@Override
public ConsumeStats examineConsumeStats(String brokerAddr, String consumerGroup, String topicName,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
// TODO Auto-generated method stub
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis);
}
@Override
public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String consumerGroup, String topic) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStatsConcurrent'");
}
@Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr);
}
@Override
public ProducerTableInfo getAllProducerInfo(String brokerAddr)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getAllProducerInfo'");
}
@Override
public void deleteTopic(String topicName, String clusterName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteTopic'");
}
@Override
public AdminToolResult<BrokerOperatorResult> deleteTopicInBrokerConcurrent(Set<String> addrs, String topic) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInBrokerConcurrent'");
}
@Override
public void deleteTopicInNameServer(Set<String> addrs, String clusterName, String topic)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteTopicInNameServer'");
}
@Override
public AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(String group, String topic, long timestamp) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'resetOffsetNewConcurrent'");
}
@Override
public TopicList queryTopicsByConsumer(String group)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumer'");
}
@Override
public AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(String group) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryTopicsByConsumerConcurrent'");
}
@Override
public SubscriptionData querySubscription(String group, String topic)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'querySubscription'");
}
@Override
public AdminToolResult<List<QueueTimeSpan>> queryConsumeTimeSpanConcurrent(String topic, String group) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryConsumeTimeSpanConcurrent'");
}
@Override
public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLog'");
}
@Override
public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deleteExpiredCommitLogByAddr'");
}
@Override
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack,
boolean metrics) throws RemotingException, MQClientException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getConsumerRunningInfo'");
}
@Override
public List<MessageTrack> messageTrackDetailConcurrent(MessageExt msg)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'messageTrackDetailConcurrent'");
}
@Override
public void setMessageRequestMode(String brokerAddr, String topic, String consumerGroup, MessageRequestMode mode,
int popWorkGroupSize, long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'setMessageRequestMode'");
}
@Override
public long searchOffset(String brokerAddr, String topicName, int queueId, long timestamp, long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'searchOffset'");
}
@Override
public void resetOffsetByQueueId(String brokerAddr, String consumerGroup, String topicName, int queueId,
long resetOffset) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'resetOffsetByQueueId'");
}
@Override
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig,
TopicQueueMappingDetail mappingDetail, boolean force)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'createStaticTopic'");
}
@Override
public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName,
Boolean readable) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateAndGetGroupReadForbidden'");
}
@Override
public MessageExt queryMessage(String clusterName, String topic, String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'queryMessage'");
}
@Override
public HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getBrokerHAStatus'");
}
@Override
public BrokerReplicasInfo getInSyncStateData(String controllerAddress, List<String> brokers)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getInSyncStateData'");
}
@Override
public EpochEntryCache getBrokerEpochCache(String brokerAddr)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getBrokerEpochCache'");
}
@Override
public GetMetaDataResponseHeader getControllerMetaData(String controllerAddr)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getControllerMetaData'");
}
@Override
public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException,
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'resetMasterFlushOffset'");
}
@Override
public Map<String, Properties> getControllerConfig(List<String> controllerServers)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException, UnsupportedEncodingException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getControllerConfig'");
}
@Override
public void updateControllerConfig(Properties properties, List<String> controllers)
throws InterruptedException, RemotingConnectException, UnsupportedEncodingException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateControllerConfig'");
}
@Override
public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'electMaster'");
}
@Override
public void cleanControllerBrokerData(String controllerAddr, String clusterName, String brokerName,
String brokerAddr, boolean isCleanLivingBroker)
throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'cleanControllerBrokerData'");
}
} }

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.client;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
public interface ProxyAdmin {
ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException;
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
@Slf4j
@Service
public class ProxyAdminImpl implements ProxyAdmin {
@Autowired
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
@Override
public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
try {
MQAdminInstance.createMQAdmin(mqAdminExtPool);
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader);
RemotingCommand response = remotingClient.invokeSync(addr, request, 3000);
switch (response.getCode()) {
case 0:
return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
} finally {
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
}
}
}

View File

@@ -36,8 +36,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.model.request.AclRequest;
import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.AclService; import org.apache.rocketmq.dashboard.service.AclService;
@@ -68,7 +68,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} catch (Exception e) { } catch (Exception e) {
log.error("getAclConfig error.", e); log.error("getAclConfig error.", e);
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
AclConfig aclConfig = new AclConfig(); AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(Collections.emptyList()); aclConfig.setGlobalWhiteAddrs(Collections.emptyList());
@@ -100,7 +101,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -116,7 +118,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr); log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -142,7 +145,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -174,7 +178,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -206,7 +211,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -249,7 +255,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -261,7 +268,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config); mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -281,7 +289,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -297,7 +306,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ",")); mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -311,7 +321,8 @@ public class AclServiceImpl extends AbstractCommonService implements AclService
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ",")); mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ","));
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }

View File

@@ -17,9 +17,10 @@
package org.apache.rocketmq.dashboard.service.impl; package org.apache.rocketmq.dashboard.service.impl;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.dashboard.service.ClusterService; import org.apache.rocketmq.dashboard.service.ClusterService;
import org.apache.rocketmq.dashboard.util.JsonUtil; import org.apache.rocketmq.dashboard.util.JsonUtil;
@@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors;
@Service @Service
public class ClusterServiceImpl implements ClusterService { public class ClusterServiceImpl implements ClusterService {
@@ -56,10 +59,14 @@ public class ClusterServiceImpl implements ClusterService {
} }
resultMap.put("clusterInfo", clusterInfo); resultMap.put("clusterInfo", clusterInfo);
resultMap.put("brokerServer", brokerServer); resultMap.put("brokerServer", brokerServer);
// add messageType
resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted()
.collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue()))));
return resultMap; return resultMap;
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -70,7 +77,8 @@ public class ClusterServiceImpl implements ClusterService {
return mqAdminExt.getBrokerConfig(brokerAddr); return mqAdminExt.getBrokerConfig(brokerAddr);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
} }

View File

@@ -23,8 +23,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -44,18 +46,19 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat; import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
@@ -73,12 +76,12 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static com.google.common.base.Throwables.propagate;
@Service @Service
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource
protected ProxyAdmin proxyAdmin;
@Resource @Resource
private RMQConfigure configure; private RMQConfigure configure;
@@ -121,24 +124,33 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) { public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) {
Set<String> consumerGroupSet = Sets.newHashSet(); HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
try { try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
if (!consumerGroupMap.containsKey(groupName)) {
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
}
List<String> addresses = consumerGroupMap.get(groupName);
addresses.add(brokerData.selectBrokerAddr());
consumerGroupMap.put(groupName, addresses);
} }
} }
catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
for (String consumerGroup : consumerGroupSet) { for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
String consumerGroup = entry.getKey();
executorService.submit(() -> { executorService.submit(() -> {
try { try {
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup); GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address);
consumeInfo.setAddress(entry.getValue());
groupConsumeInfoList.add(consumeInfo); groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) { } catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
@@ -166,7 +178,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public GroupConsumeInfo queryGroup(String consumerGroup) { public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try { try {
ConsumeStats consumeStats = null; ConsumeStats consumeStats = null;
@@ -178,14 +190,28 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
ConsumerConnection consumerConnection = null; ConsumerConnection consumerConnection = null;
boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup)
.stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig)
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
try { try {
if (StringUtils.isNotEmpty(address)) {
consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
} else {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
} }
catch (Exception e) { } catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
} }
groupConsumeInfo.setGroup(consumerGroup); groupConsumeInfo.setGroup(consumerGroup);
if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
groupConsumeInfo.setSubGroupType("SYSTEM");
} else if (isFifoType) {
groupConsumeInfo.setSubGroupType("FIFO");
} else {
groupConsumeInfo.setSubGroupType("NORMAL");
}
if (consumeStats != null) { if (consumeStats != null) {
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
@@ -207,8 +233,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) { public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
return queryConsumeStatsList(null, groupName); ConsumeStats consumeStats;
String topic = null;
try {
String[] addresses = address.split(",");
String addr = addresses[0];
consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return toTopicConsumerInfoList(topic, consumeStats, groupName);
} }
@Override @Override
@@ -218,8 +254,13 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return toTopicConsumerInfoList(topic, consumeStats, groupName);
}
private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) {
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
@Override @Override
public boolean apply(MessageQueue o) { public boolean apply(MessageQueue o) {
@@ -278,7 +319,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return group2ConsumerInfoMap; return group2ConsumerInfoMap;
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -341,7 +383,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return consumerConfigInfoList; return consumerConfigInfoList;
} }
@@ -366,7 +409,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception e) { catch (Exception e) {
throw propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return true; return true;
} }
@@ -393,7 +437,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
return true; return true;
} }
@@ -408,19 +453,22 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return brokerNameSet; return brokerNameSet;
} }
@Override @Override
public ConsumerConnection getConsumerConnection(String consumerGroup) { public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
try { try {
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); String[] addresses = address.split(",");
} String addr = addresses[0];
catch (Exception e) { return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
throw Throwables.propagate(e); } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -430,7 +478,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
} }

View File

@@ -107,7 +107,8 @@ public class DashboardCollectServiceImpl implements DashboardCollectService {
strings = Files.readLines(file, Charsets.UTF_8); strings = Files.readLines(file, Charsets.UTF_8);
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
for (String string : strings) { for (String string : strings) {

View File

@@ -25,8 +25,8 @@ import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult; import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
@@ -62,10 +62,12 @@ public class DlqMessageServiceImpl implements DlqMessageService {
&& e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) { && e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId()); return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId());
} else { } else {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return messageService.queryMessageByPage(query); return messageService.queryMessageByPage(query);
} }

View File

@@ -37,9 +37,9 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException; import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
@@ -115,7 +115,8 @@ public class MessageServiceImpl implements MessageService {
if (err instanceof MQClientException) { if (err instanceof MQClientException) {
throw new ServiceException(-1, ((MQClientException) err).getErrorMessage()); throw new ServiceException(-1, ((MQClientException) err).getErrorMessage());
} }
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -185,7 +186,8 @@ public class MessageServiceImpl implements MessageService {
}); });
return messageViewList; return messageViewList;
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
consumer.shutdown(); consumer.shutdown();
} }
@@ -209,7 +211,8 @@ public class MessageServiceImpl implements MessageService {
try { try {
return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -223,7 +226,8 @@ public class MessageServiceImpl implements MessageService {
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
} }
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
throw new IllegalStateException("NO CONSUMER"); throw new IllegalStateException("NO CONSUMER");
@@ -272,7 +276,7 @@ public class MessageServiceImpl implements MessageService {
int idx = 0; int idx = 0;
for (MessageQueue messageQueue : messageQueues) { for (MessageQueue messageQueue : messageQueues) {
Long minOffset = consumer.searchOffset(messageQueue, query.getBegin()); Long minOffset = consumer.searchOffset(messageQueue, query.getBegin());
Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd()) + 1; Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd());
queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue)); queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
} }
@@ -388,7 +392,8 @@ public class MessageServiceImpl implements MessageService {
PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total); PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
return new MessagePageTask(page, queueOffsetInfos); return new MessagePageTask(page, queueOffsetInfos);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
consumer.shutdown(); consumer.shutdown();
} }
@@ -455,7 +460,8 @@ public class MessageServiceImpl implements MessageService {
} }
return new PageImpl<>(messageViews, query.page(), total); return new PageImpl<>(messageViews, query.page(), total);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
consumer.shutdown(); consumer.shutdown();
} }

View File

@@ -82,7 +82,8 @@ public class MonitorServiceImpl implements MonitorService {
MixAll.string2File(dataStr, path); MixAll.string2File(dataStr, path);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }

View File

@@ -19,7 +19,7 @@ package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.dashboard.service.ProducerService; import org.apache.rocketmq.dashboard.service.ProducerService;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -35,7 +35,8 @@ public class ProducerServiceImpl implements ProducerService {
return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic); return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
} }

View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class ProxyServiceImpl implements ProxyService {
@Resource
protected ProxyAdmin proxyAdmin;
@Resource
private RMQConfigure configure;
@Override
public void addProxyAddrList(String proxyAddr) {
List<String> proxyAddrs = configure.getProxyAddrs();
if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) {
proxyAddrs.add(proxyAddr);
}
configure.setProxyAddrs(proxyAddrs);
}
@Override
public void updateProxyAddrList(String proxyAddr) {
configure.setProxyAddr(proxyAddr);
}
@Override
public Map<String, Object> getProxyHomePage() {
Map<String, Object> homePageInfoMap = Maps.newHashMap();
homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr());
homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs());
return homePageInfoMap;
}
}

View File

@@ -18,43 +18,57 @@
package org.apache.rocketmq.dashboard.service.impl; package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta;
import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.TopicService; import org.apache.rocketmq.dashboard.service.TopicService;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.CommandUtil;
import org.joor.Reflect; import org.joor.Reflect;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
@Service @Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService { public class TopicServiceImpl extends AbstractCommonService implements TopicService {
@@ -84,7 +98,43 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
allTopics.getTopicList().addAll(topics); allTopics.getTopicList().addAll(topics);
return allTopics; return allTopics;
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public TopicTypeList examineAllTopicType() {
ArrayList<TopicTypeMeta> topicTypes = new ArrayList<>();
ArrayList<String> names = new ArrayList<>();
ArrayList<String> messageTypes = new ArrayList<>();
TopicList topicList = fetchAllTopicList(false, false);
checkTopicType(topicList, topicTypes);
topicTypes.sort((t1, t2) -> t1.getTopicName().compareTo(t2.getTopicName()));
for (TopicTypeMeta topicTypeMeta : topicTypes) {
names.add(topicTypeMeta.getTopicName());
messageTypes.add(topicTypeMeta.getMessageType());
}
return new TopicTypeList(names, messageTypes);
}
private void checkTopicType(TopicList topicList, ArrayList<TopicTypeMeta> topicTypes) {
for (String topicName : topicList.getTopicList()) {
TopicTypeMeta topicType = new TopicTypeMeta();
topicType.setTopicName(topicName);
if (topicName.startsWith("%R")) {
topicType.setMessageType("RETRY");
} else if (topicName.startsWith("%D")) {
topicType.setMessageType("DELAY");
} else if (topicName.startsWith("%S")) {
topicType.setMessageType("SYSTEM");
} else {
List<TopicConfigInfo> topicConfigInfos = examineTopicConfig(topicName);
if (!CollectionUtils.isEmpty(topicConfigInfos)) {
topicType.setMessageType(topicConfigInfos.get(0).getMessageType());
}
}
topicTypes.add(topicType);
} }
} }
@@ -93,7 +143,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
return mqAdminExt.examineTopicStats(topic); return mqAdminExt.examineTopicStats(topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -102,7 +153,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
return mqAdminExt.examineTopicRouteInfo(topic); return mqAdminExt.examineTopicRouteInfo(topic);
} catch (Exception ex) { } catch (Exception ex) {
throw Throwables.propagate(ex); Throwables.throwIfUnchecked(ex);
throw new RuntimeException(ex);
} }
} }
@@ -111,7 +163,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
return mqAdminExt.queryTopicConsumeByWho(topic); return mqAdminExt.queryTopicConsumeByWho(topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -119,6 +172,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
TopicConfig topicConfig = new TopicConfig(); TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
String messageType = topicCreateOrUpdateRequest.getMessageType();
if (StringUtils.isBlank(messageType)) {
messageType = TopicMessageType.NORMAL.name();
}
topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()), messageType));
try { try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
@@ -126,7 +184,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
} }
} catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -137,7 +196,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
clusterInfo = mqAdminExt.examineBrokerClusterInfo(); clusterInfo = mqAdminExt.examineBrokerClusterInfo();
return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -150,6 +210,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName());
BeanUtils.copyProperties(topicConfig, topicConfigInfo); BeanUtils.copyProperties(topicConfig, topicConfigInfo);
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
String messageType = topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
if (StringUtils.isBlank(messageType)) {
messageType = TopicMessageType.UNSPECIFIED.name();
}
topicConfigInfo.setMessageType(messageType);
topicConfigInfoList.add(topicConfigInfo); topicConfigInfoList.add(topicConfigInfo);
} }
return topicConfigInfoList; return topicConfigInfoList;
@@ -170,7 +235,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
} }
mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
} catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
return true; return true;
} }
@@ -181,7 +247,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo(); clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception err) { } catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
deleteTopic(topic, clusterName); deleteTopic(topic, clusterName);
@@ -197,11 +264,13 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
try { try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo(); clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
return true; return true;
} }
@@ -216,6 +285,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
return defaultMQProducer; return defaultMQProducer;
} }
public TransactionMQProducer buildTransactionMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
TransactionMQProducer defaultMQProducer = new TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
}
private TopicList getSystemTopicList() { private TopicList getSystemTopicList() {
RPCHook rpcHook = null; RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
@@ -230,7 +305,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
producer.start(); producer.start();
return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L); return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
producer.shutdown(); producer.shutdown();
} }
@@ -238,7 +314,8 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Override @Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
DefaultMQProducer producer = null; List<TopicConfigInfo> topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic());
String messageType = topicConfigInfos.get(0).getMessageType();
AclClientRPCHook rpcHook = null; AclClientRPCHook rpcHook = null;
if (configure.isACLEnabled()) { if (configure.isACLEnabled()) {
rpcHook = new AclClientRPCHook(new SessionCredentials( rpcHook = new AclClientRPCHook(new SessionCredentials(
@@ -246,6 +323,32 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
configure.getSecretKey() configure.getSecretKey()
)); ));
} }
if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) {
// transaction message
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
producer.setTransactionListener(transactionListener);
try {
producer.start();
Message msg = new Message(sendTopicMessageRequest.getTopic(),
sendTopicMessageRequest.getTag(),
sendTopicMessageRequest.getKey(),
sendTopicMessageRequest.getMessageBody().getBytes()
);
return producer.sendMessageInTransaction(msg, null);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown();
}
} else {
// no transaction message
DefaultMQProducer producer = null;
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis())); producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr()); producer.setNamesrvAddr(configure.getNamesrvAddr());
@@ -258,13 +361,16 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
); );
return producer.send(msg); return producer.send(msg);
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally { } finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown(); producer.shutdown();
} }
} }
}
private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
if (!traceEnabled) { if (!traceEnabled) {
return; return;
@@ -284,4 +390,20 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
} catch (Exception ignore) { } catch (Exception ignore) {
} }
} }
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
} }

View File

@@ -24,12 +24,12 @@ import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.stats.Stats;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
@@ -63,7 +63,7 @@ public class CollectTaskRunnble implements Runnable {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) { if (masterAddr != null) {
try { try {
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic); BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, Stats.TOPIC_PUT_NUMS, topic);
inTPS += bsd.getStatsMinute().getTps(); inTPS += bsd.getStatsMinute().getTps();
inMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd); inMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd);
} catch (Exception e) { } catch (Exception e) {
@@ -78,7 +78,7 @@ public class CollectTaskRunnble implements Runnable {
if (masterAddr != null) { if (masterAddr != null) {
try { try {
String statsKey = String.format("%s@%s", topic, group); String statsKey = String.format("%s@%s", topic, group);
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey); BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, Stats.GROUP_GET_NUMS, statsKey);
outTPS += bsd.getStatsMinute().getTps(); outTPS += bsd.getStatsMinute().getTps();
outMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd); outMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd);
} catch (Exception e) { } catch (Exception e) {
@@ -93,7 +93,8 @@ public class CollectTaskRunnble implements Runnable {
try { try {
list = dashboardCollectService.getTopicMap().get(topic); list = dashboardCollectService.getTopicMap().get(topic);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
if (null == list) { if (null == list) {
list = Lists.newArrayList(); list = Lists.newArrayList();

View File

@@ -34,10 +34,10 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.dashboard.service.DashboardCollectService;
@@ -84,7 +84,8 @@ public class DashboardCollectTask {
} }
} }
catch (Exception err) { catch (Exception err) {
throw Throwables.propagate(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
} }
} }
@@ -128,7 +129,8 @@ public class DashboardCollectTask {
log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap())); log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap()));
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -144,10 +146,12 @@ public class DashboardCollectTask {
Thread.sleep(1000); Thread.sleep(1000);
} }
catch (InterruptedException e1) { catch (InterruptedException e1) {
throw Throwables.propagate(e1); Throwables.throwIfUnchecked(e1);
throw new RuntimeException(e1);
} }
fetchBrokerRuntimeStats(brokerAddr, retryTime - 1); fetchBrokerRuntimeStats(brokerAddr, retryTime - 1);
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -197,7 +201,8 @@ public class DashboardCollectTask {
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -250,7 +255,7 @@ public class DashboardCollectTask {
private void addSystemTopic() throws Exception { private void addSystemTopic() throws Exception {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable(); Map<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable();
for (Map.Entry<String, Set<String>> entry : clusterTable.entrySet()) { for (Map.Entry<String, Set<String>> entry : clusterTable.entrySet()) {
String clusterName = entry.getKey(); String clusterName = entry.getKey();
TopicValidator.addSystemTopic(clusterName); TopicValidator.addSystemTopic(clusterName);

View File

@@ -40,7 +40,7 @@ public class MonitorTask {
// @Scheduled(cron = "* * * * * ?") // @Scheduled(cron = "* * * * * ?")
public void scanProblemConsumeGroup() { public void scanProblemConsumeGroup() {
for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) { for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) {
GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey()); GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null);
if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) { if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system
} }

View File

@@ -42,7 +42,9 @@ rocketmq:
# configure multiple namesrv addresses to manage multiple different clusters # configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs: namesrvAddrs:
- 127.0.0.1:9876 - 127.0.0.1:9876
- 127.0.0.2:9876 # - 127.0.0.2:9876
# - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876
# - 10.151.47.30:9876
# if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
isVIPChannel: isVIPChannel:
# timeout for mqadminExt, default 5000ms # timeout for mqadminExt, default 5000ms
@@ -57,9 +59,12 @@ rocketmq:
# must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
loginRequired: false loginRequired: false
useTLS: false useTLS: false
proxyAddr: 127.0.0.1:8080
proxyAddrs:
- 127.0.0.1:8080
# set the accessKey and secretKey if you used acl # set the accessKey and secretKey if you used acl
accessKey: # if version > 4.4.0 # accessKey: rocketmq2
secretKey: # if version > 4.4.0 # secretKey: 12345678
threadpool: threadpool:
config: config:

View File

@@ -104,6 +104,7 @@
<script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script> <script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script>
<script type="text/javascript" src="src/cluster.js?timestamp=4"></script> <script type="text/javascript" src="src/cluster.js?timestamp=4"></script>
<script type="text/javascript" src="src/topic.js"></script> <script type="text/javascript" src="src/topic.js"></script>
<script type="text/javascript" src="src/proxy.js"></script>
<script type="text/javascript" src="src/consumer.js?timestamp=6"></script> <script type="text/javascript" src="src/consumer.js?timestamp=6"></script>
<script type="text/javascript" src="src/producer.js"></script> <script type="text/javascript" src="src/producer.js"></script>
<script type="text/javascript" src="src/message.js"></script> <script type="text/javascript" src="src/message.js"></script>

View File

@@ -213,6 +213,9 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro
}).when('/ops', { }).when('/ops', {
templateUrl: 'view/pages/ops.html', templateUrl: 'view/pages/ops.html',
controller:'opsController' controller:'opsController'
}).when('/proxy', {
templateUrl: 'view/pages/proxy.html',
controller:'proxyController'
}).when('/acl', { }).when('/acl', {
templateUrl: 'view/pages/acl.html', templateUrl: 'view/pages/acl.html',
controller: 'aclController' controller: 'aclController'

View File

@@ -45,6 +45,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
$scope.filterNormal = true; $scope.filterNormal = true;
$scope.filterSystem = false; $scope.filterSystem = false;
$scope.filterFIFO = false;
$scope.doSort = function () {// todo how to change this fe's code ? (it's dirty) $scope.doSort = function () {// todo how to change this fe's code ? (it's dirty)
if ($scope.sortKey == 'diffTotal') { if ($scope.sortKey == 'diffTotal') {
@@ -75,7 +76,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
$http({ $http({
method: "GET", method: "GET",
url: "consumer/groupList.query" url: "consumer/groupList.query",
params: {
skipSysGroup: false,
address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null
}
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
$scope.allConsumerGrouopList = resp.data; $scope.allConsumerGrouopList = resp.data;
@@ -135,16 +140,28 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
$scope.filterList(1); $scope.filterList(1);
}); });
$scope.filterByType = function (str) { $scope.$watch('filterFIFO', function () {
$scope.filterList(1);
});
$scope.filterByType = function (str, type,version) {
if ($scope.filterSystem) { if ($scope.filterSystem) {
if (str.startsWith("%S")) { if (type === "SYSTEM") {
return true return true
} }
} }
if ($scope.filterNormal) { if ($scope.filterNormal) {
if (str.startsWith("%") == false) { if (type === "NORMAL") {
return true return true
} }
if(!version && type === "FIFO"){
return true;
}
}
if ($scope.filterFIFO) {
if (type === "FIFO") {
return true;
}
} }
return false; return false;
}; };
@@ -154,7 +171,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
var canShowList = []; var canShowList = [];
$scope.allConsumerGrouopList.forEach(function (element) { $scope.allConsumerGrouopList.forEach(function (element) {
console.log(element) console.log(element)
if ($scope.filterByType(element.group)) { if ($scope.filterByType(element.group, element.subGroupType, $scope.rmqVersion)) {
if (element.group.toLowerCase().indexOf(lowExceptStr) != -1) { if (element.group.toLowerCase().indexOf(lowExceptStr) != -1) {
canShowList.push(element); canShowList.push(element);
} }
@@ -189,6 +206,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
subscriptionGroupConfig: { subscriptionGroupConfig: {
groupName: "", groupName: "",
consumeEnable: true, consumeEnable: true,
consumeMessageOrderly: false,
consumeFromMinEnable: true, consumeFromMinEnable: true,
consumeBroadcastEnable: true, consumeBroadcastEnable: true,
retryQueueNums: 1, retryQueueNums: 1,
@@ -211,7 +229,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
// Refresh topic list // Refresh topic list
$scope.refreshConsumerData(); $scope.refreshConsumerData();
}, },
template: 'consumerModifyDialog', template: $scope.rmqVersion ? 'consumerModifyDialogForV5' : 'consumerModifyDialog',
controller: 'consumerModifyDialogController', controller: 'consumerModifyDialogController',
data: { data: {
consumerRequestList: request, consumerRequestList: request,
@@ -226,11 +244,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
} }
}); });
}; };
$scope.detail = function (consumerGroupName) { $scope.detail = function (consumerGroupName, address) {
$http({ $http({
method: "GET", method: "GET",
url: "consumer/queryTopicByConsumer.query", url: "consumer/queryTopicByConsumer.query",
params: {consumerGroup: consumerGroupName} params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
console.log(resp); console.log(resp);
@@ -245,11 +263,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
}); });
}; };
$scope.client = function (consumerGroupName) { $scope.client = function (consumerGroupName, address) {
$http({ $http({
method: "GET", method: "GET",
url: "consumer/consumerConnection.query", url: "consumer/consumerConnection.query",
params: {consumerGroup: consumerGroupName} params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
console.log(resp); console.log(resp);

View File

@@ -15,10 +15,18 @@
* limitations under the License. * limitations under the License.
*/ */
app.controller('AppCtrl', ['$scope','$window','$translate','$http','Notification', function ($scope,$window,$translate, $http, Notification) { app.controller('AppCtrl', ['$scope','$window','$translate','$http','Notification', function ($scope,$window,$translate, $http, Notification) {
$scope.rmqVersion = localStorage.getItem("isV5");
$scope.changeTranslate = function(langKey){ $scope.changeTranslate = function(langKey){
$translate.use(langKey); $translate.use(langKey);
} }
$scope.changeRMQVersion = function (version) {
$scope.rmqVersion = version === 5;
var v = version === 5;
localStorage.setItem("isV5", v);
}
$scope.logout = function(){ $scope.logout = function(){
$http({ $http({
method: "POST", method: "POST",

View File

@@ -54,9 +54,13 @@ var en = {
"RESET_CUS_OFFSET": "Reset Consumer Offset", "RESET_CUS_OFFSET": "Reset Consumer Offset",
"DELETE": "Delete", "DELETE": "Delete",
"CHANGE_LANG": "ChangeLanguage", "CHANGE_LANG": "ChangeLanguage",
"CHANGE_VERSION": "ChangeVersion",
"BROKER": "Broker", "BROKER": "Broker",
"NORMAL": "NORMAL", "NORMAL": "NORMAL",
"RETRY": "RETRY", "RETRY": "RETRY",
"FIFO": "FIFO",
"TRANSACTION": "TRANSACTION",
"UNSPECIFIED": "UNSPECIFIED",
"DLQ": "DLQ", "DLQ": "DLQ",
"QUANTITY":"Quantity", "QUANTITY":"Quantity",
"TYPE":"Type", "TYPE":"Type",
@@ -97,6 +101,7 @@ var en = {
"RESET_OFFSET":"resetOffset", "RESET_OFFSET":"resetOffset",
"CLUSTER_NAME":"clusterName", "CLUSTER_NAME":"clusterName",
"OPS":"OPS", "OPS":"OPS",
"PROXY":"Proxy",
"AUTO_REFRESH":"AUTO_REFRESH", "AUTO_REFRESH":"AUTO_REFRESH",
"REFRESH":"REFRESH", "REFRESH":"REFRESH",
"LOGOUT":"Logout", "LOGOUT":"Logout",
@@ -123,5 +128,11 @@ var en = {
"GROUP_PERM":"Group Permission", "GROUP_PERM":"Group Permission",
"SYNCHRONIZE":"Synchronize Data", "SYNCHRONIZE":"Synchronize Data",
"SHOW":"Show", "SHOW":"Show",
"HIDE":"Hide" "HIDE":"Hide",
"MESSAGE_TYPE":"messageType",
"MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL",
"MESSAGE_TYPE_NORMAL": "NORMAL",
"MESSAGE_TYPE_FIFO": "FIFO",
"MESSAGE_TYPE_DELAY": "DELAY",
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
} }

View File

@@ -55,9 +55,13 @@ var zh = {
"SKIP_MESSAGE_ACCUMULATE":"跳过堆积", "SKIP_MESSAGE_ACCUMULATE":"跳过堆积",
"DELETE": "删除", "DELETE": "删除",
"CHANGE_LANG": "更换语言", "CHANGE_LANG": "更换语言",
"CHANGE_VERSION": "更换版本",
"BROKER": "Broker", "BROKER": "Broker",
"NORMAL": "普通", "NORMAL": "普通",
"RETRY": "重试", "RETRY": "重试",
"FIFO": "顺序",
"TRANSACTION": "事务",
"UNSPECIFIED": "未指定",
"DLQ": "死信", "DLQ": "死信",
"QUANTITY":"数量", "QUANTITY":"数量",
"TYPE":"类型", "TYPE":"类型",
@@ -98,6 +102,7 @@ var zh = {
"RESET_OFFSET":"重置位点", "RESET_OFFSET":"重置位点",
"CLUSTER_NAME":"集群名", "CLUSTER_NAME":"集群名",
"OPS":"运维", "OPS":"运维",
"PROXY":"代理",
"AUTO_REFRESH":"自动刷新", "AUTO_REFRESH":"自动刷新",
"REFRESH":"刷新", "REFRESH":"刷新",
"LOGOUT":"退出", "LOGOUT":"退出",
@@ -124,5 +129,11 @@ var zh = {
"GROUP_PERM":"消费组权限", "GROUP_PERM":"消费组权限",
"SYNCHRONIZE":"同步", "SYNCHRONIZE":"同步",
"SHOW":"显示", "SHOW":"显示",
"HIDE":"隐藏" "HIDE":"隐藏",
"MESSAGE_TYPE":"消息类型",
"MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息",
"MESSAGE_TYPE_NORMAL": "普通消息",
"MESSAGE_TYPE_FIFO": "顺序消息",
"MESSAGE_TYPE_DELAY": "定时/延时消息",
"MESSAGE_TYPE_TRANSACTION": "事务消息",
} }

View File

@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var module = app;
module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window',
function ($scope, $location, $http, Notification, remoteApi, tools, $window) {
$scope.proxyAddrList = [];
$scope.userRole = $window.sessionStorage.getItem("userrole");
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
$scope.inputReadonly = !$scope.writeOperationEnabled;
$scope.newProxyAddr = "";
$scope.allProxyConfig = {};
$http({
method: "GET",
url: "proxy/homePage.query"
}).success(function (resp) {
if (resp.status == 0) {
$scope.proxyAddrList = resp.data.proxyAddrList;
$scope.selectedProxy = resp.data.currentProxyAddr;
$scope.showProxyDetailConfig($scope.selectedProxy);
localStorage.setItem('proxyAddr',$scope.selectedProxy);
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.eleChange = function (data) {
$scope.proxyAddrList = data;
}
$scope.showDetailConf = function () {
$(".proxyModal").modal();
}
$scope.showProxyDetailConfig = function (proxyAddr) {
$http({
method: "GET",
url: "proxy/proxyDetailConfig.query",
params: {proxyAddress: proxyAddr}
}).success(function (resp) {
if (resp.status == 0) {
$scope.allProxyConfig = resp.data;
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
$scope.updateProxyAddr = function () {
$http({
method: "POST",
url: "proxy/updateProxyAddr.do",
params: {proxyAddr: $scope.selectedProxy}
}).success(function (resp) {
if (resp.status == 0) {
localStorage.setItem('proxyAddr', $scope.selectedProxy);
Notification.info({message: "SUCCESS", delay: 2000});
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.showProxyDetailConfig($scope.selectedProxy);
};
$scope.addProxyAddr = function () {
$http({
method: "POST",
url: "proxy/addProxyAddr.do",
params: {newProxyAddr: $scope.newProxyAddr}
}).success(function (resp) {
if (resp.status == 0) {
if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) {
$scope.proxyAddrList.push($scope.newProxyAddr);
}
$("#proxyAddr").val("");
$scope.newProxyAddr = "";
Notification.info({message: "SUCCESS", delay: 2000});
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
}])

View File

@@ -45,10 +45,16 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
} }
}; };
$scope.filterNormal = true $scope.filterNormal = true
$scope.filterDelay = false
$scope.filterFifo = false
$scope.filterTransaction = false
$scope.filterUnspecified = false
$scope.filterRetry = false $scope.filterRetry = false
$scope.filterDLQ = false $scope.filterDLQ = false
$scope.filterSystem = false $scope.filterSystem = false
$scope.allTopicList = []; $scope.allTopicList = [];
$scope.allTopicNameList = [];
$scope.allMessageTypeList = [];
$scope.topicShowList = []; $scope.topicShowList = [];
$scope.userRole = $window.sessionStorage.getItem("userrole"); $scope.userRole = $window.sessionStorage.getItem("userrole");
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
@@ -56,13 +62,14 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
$scope.refreshTopicList = function () { $scope.refreshTopicList = function () {
$http({ $http({
method: "GET", method: "GET",
url: "topic/list.query" url: "topic/list.queryTopicType"
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
$scope.allTopicList = resp.data.topicList.sort(); $scope.allTopicNameList = resp.data.topicNameList;
console.log($scope.allTopicList); $scope.allMessageTypeList = resp.data.messageTypeList;
console.log($scope.allTopicNameList);
console.log(JSON.stringify(resp)); console.log(JSON.stringify(resp));
$scope.showTopicList(1, $scope.allTopicList.length); $scope.showTopicList(1, $scope.allTopicNameList.length);
} else { } else {
Notification.error({message: resp.errMsg, delay: 5000}); Notification.error({message: resp.errMsg, delay: 5000});
@@ -79,6 +86,18 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
$scope.$watch('filterNormal', function () { $scope.$watch('filterNormal', function () {
$scope.filterList(1); $scope.filterList(1);
}); });
$scope.$watch('filterFifo', function () {
$scope.filterList(1);
});
$scope.$watch('filterTransaction', function () {
$scope.filterList(1);
});
$scope.$watch('filterUnspecified', function () {
$scope.filterList(1);
});
$scope.$watch('filterDelay', function () {
$scope.filterList(1);
});
$scope.$watch('filterRetry', function () { $scope.$watch('filterRetry', function () {
$scope.filterList(1); $scope.filterList(1);
}); });
@@ -92,13 +111,13 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
var lowExceptStr = $scope.filterStr.toLowerCase(); var lowExceptStr = $scope.filterStr.toLowerCase();
var canShowList = []; var canShowList = [];
$scope.allTopicList.forEach(function (element) { for (let i = 0; i < $scope.allTopicNameList.length; ++i) {
if ($scope.filterByType(element)) { if ($scope.filterByType($scope.allTopicNameList[i], $scope.allMessageTypeList[i])) {
if (element.toLowerCase().indexOf(lowExceptStr) != -1) { if ($scope.allTopicNameList[i].toLowerCase().indexOf(lowExceptStr) != -1) {
canShowList.push(element); canShowList.push($scope.allTopicNameList[i]);
}
} }
} }
});
$scope.paginationConf.totalItems = canShowList.length; $scope.paginationConf.totalItems = canShowList.length;
var perPage = $scope.paginationConf.itemsPerPage; var perPage = $scope.paginationConf.itemsPerPage;
var from = (currentPage - 1) * perPage; var from = (currentPage - 1) * perPage;
@@ -106,7 +125,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
$scope.topicShowList = canShowList.slice(from, to); $scope.topicShowList = canShowList.slice(from, to);
}; };
$scope.filterByType = function (str) { $scope.filterByType = function (str, type) {
if ($scope.filterRetry) { if ($scope.filterRetry) {
if (str.startsWith("%R")) { if (str.startsWith("%R")) {
return true return true
@@ -122,8 +141,31 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
return true return true
} }
} }
if (localStorage.getItem('isV5') && $scope.filterUnspecified) {
if (type.includes("UNSPECIFIED")) {
return true
}
}
if ($scope.filterNormal) { if ($scope.filterNormal) {
if (str.startsWith("%") == false) { if (type.includes("NORMAL")) {
return true
}
if (!localStorage.getItem('isV5') && type.includes("UNSPECIFIED")) {
return true
}
}
if (localStorage.getItem('isV5') && $scope.filterDelay) {
if (type.includes("DELAY")) {
return true
}
}
if (localStorage.getItem('isV5') && $scope.filterFifo) {
if (type.includes("FIFO")) {
return true
}
}
if (localStorage.getItem('isV5') && $scope.filterTransaction) {
if (type.includes("TRANSACTION")) {
return true return true
} }
} }
@@ -138,10 +180,10 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
var perPage = $scope.paginationConf.itemsPerPage; var perPage = $scope.paginationConf.itemsPerPage;
var from = (currentPage - 1) * perPage; var from = (currentPage - 1) * perPage;
var to = (from + perPage) > totalItem ? totalItem : from + perPage; var to = (from + perPage) > totalItem ? totalItem : from + perPage;
console.log($scope.allTopicList); console.log($scope.allTopicNameList);
console.log(from) console.log(from)
console.log(to) console.log(to)
$scope.topicShowList = $scope.allTopicList.slice(from, to); $scope.topicShowList = $scope.allTopicNameList.slice(from, to);
$scope.paginationConf.totalItems = totalItem; $scope.paginationConf.totalItems = totalItem;
console.log($scope.topicShowList) console.log($scope.topicShowList)
console.log($scope.paginationConf.totalItems) console.log($scope.paginationConf.totalItems)
@@ -328,8 +370,8 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
var bIsUpdate = true; var bIsUpdate = true;
if (request == null) { if (request == null) {
request = [{ request = [{
writeQueueNums: 16, writeQueueNums: 8,
readQueueNums: 16, readQueueNums: 8,
perm: 6, perm: 6,
order: false, order: false,
topicName: "", topicName: "",
@@ -355,6 +397,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
topicRequestList: request, topicRequestList: request,
allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable), allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable),
allBrokerNameList: Object.keys(resp.data.brokerServer), allBrokerNameList: Object.keys(resp.data.brokerServer),
allMessageTypeList: resp.data.messageTypes,
bIsUpdate: bIsUpdate, bIsUpdate: bIsUpdate,
writeOperationEnabled: $scope.writeOperationEnabled writeOperationEnabled: $scope.writeOperationEnabled
} }

View File

@@ -28,6 +28,7 @@
<div class="navbar-collapse collapse navbar-warning-collapse"> <div class="navbar-collapse collapse navbar-warning-collapse">
<ul class="nav navbar-nav"> <ul class="nav navbar-nav">
<li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li> <li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li>
<li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li> <li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
<li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li> <li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li>
<li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li> <li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li>
@@ -47,6 +48,14 @@
<li><a href="javascript:void(0)" ng-click="changeTranslate('zh')">Simplified Chinese</a></li> <li><a href="javascript:void(0)" ng-click="changeTranslate('zh')">Simplified Chinese</a></li>
</ul> </ul>
</li> </li>
<li class="dropdown">
<a href="bootstrap-elements.html" data-target="#" class="dropdown-toggle" data-toggle="dropdown">{{'CHANGE_VERSION' | translate}}
<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="javascript:void(0)" ng-click="changeRMQVersion(5)">RocketMQ 5.x</a></li>
<li><a href="javascript:void(0)" ng-click="changeRMQVersion(4)">RocketMQ 4.x</a></li>
</ul>
</li>
<li class="dropdown" ng-show="username != ''"> <li class="dropdown" ng-show="username != ''">
<a href="bootstrap-elements.html" data-target="#" class="dropdown-toggle" data-toggle="dropdown">{{username}} <a href="bootstrap-elements.html" data-target="#" class="dropdown-toggle" data-toggle="dropdown">{{username}}
<b class="caret"></b></a> <b class="caret"></b></a>

View File

@@ -50,7 +50,14 @@
<td class="text-center">{{instance.address}}</td> <td class="text-center">{{instance.address}}</td>
<td class="text-center">{{instance.brokerVersionDesc}}</td> <td class="text-center">{{instance.brokerVersionDesc}}</td>
<td class="text-center">{{instance.putTps.split(' ')[0]| number:2}}</td> <td class="text-center">{{instance.putTps.split(' ')[0]| number:2}}</td>
<td class="text-center">{{instance.getTransferedTps.split(' ')[0]| number:2}}</td> <td class="text-center">
<span ng-if="!instance.getTransferedTps || !instance.getTransferedTps.trim()">
{{instance.getTransferredTps.split(' ')[0] | number:2}}
</span>
<span ng-if="instance.getTransferedTps && instance.getTransferedTps.trim()">
{{instance.getTransferedTps.split(' ')[0] | number:2}}
</span>
</td>
<td class="text-center">{{instance.msgPutTotalTodayMorning - <td class="text-center">{{instance.msgPutTotalTodayMorning -
instance.msgPutTotalYesterdayMorning}} instance.msgPutTotalYesterdayMorning}}
</td> </td>

View File

@@ -26,6 +26,8 @@
</div> </div>
<md-checkbox aria-label="Checkbox" ng-model="filterNormal" class="md-primary">{{'NORMAL' | translate}} <md-checkbox aria-label="Checkbox" ng-model="filterNormal" class="md-primary">{{'NORMAL' | translate}}
</md-checkbox> </md-checkbox>
<md-checkbox aria-label="Checkbox" ng-show="rmqVersion" ng-model="filterFIFO" class="md-primary">{{'FIFO' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterSystem" class="md-primary">{{'SYSTEM' | translate}} <md-checkbox aria-label="Checkbox" ng-model="filterSystem" class="md-primary">{{'SYSTEM' | translate}}
</md-checkbox> </md-checkbox>
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}" <button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}"
@@ -64,11 +66,11 @@
<td class="text-center">{{consumerGroup.consumeTps}}</td> <td class="text-center">{{consumerGroup.consumeTps}}</td>
<td class="text-center">{{consumerGroup.diffTotal}}</td> <td class="text-center">{{consumerGroup.diffTotal}}</td>
<td class="text-left"> <td class="text-left">
<button name="client" ng-click="client(consumerGroup.group)" <button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary" class="btn btn-raised btn-sm btn-primary"
type="button">{{'CLIENT' | translate}} type="button">{{'CLIENT' | translate}}
</button> </button>
<button name="client" ng-click="detail(consumerGroup.group)" <button name="client" ng-click="detail(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary" class="btn btn-raised btn-sm btn-primary"
type="button">{{'CONSUME_DETAIL' | translate}} type="button">{{'CONSUME_DETAIL' | translate}}
</button> </button>
@@ -274,15 +276,6 @@
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span> <span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div> </div>
</div> </div>
<!--<div class="form-group">-->
<!--<label class="control-label col-sm-4">retryMaxTimes:</label>-->
<!--<div class="col-sm-8">-->
<!--<input class="form-control" ng-model="item.subscriptionGroupConfig.retryMaxTimes"-->
<!--type="text"-->
<!--required/>-->
<!--<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>-->
<!--</div>-->
<!--</div>-->
<div class="form-group"> <div class="form-group">
<label class="control-label col-sm-3">brokerId:</label> <label class="control-label col-sm-3">brokerId:</label>
<div class="col-sm-9"> <div class="col-sm-9">
@@ -291,6 +284,132 @@
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span> <span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div> </div>
</div> </div>
<div class="form-group">
<label class="control-label col-sm-3">retryMaxTimes:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.retryMaxTimes" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">whichBrokerWhenConsumeSlowly:</label>
<div class="col-sm-9">
<input class="form-control"
ng-model="item.subscriptionGroupConfig.whichBrokerWhenConsumeSlowly" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
</form>
<div class="modal-footer">
<div class="ngdialog-buttons">
<button type="button" class="ngdialog-button ngdialog-button-primary"
ng-disabled="addAppForm.$invalid"
ng-show="{{ngDialogData.writeOperationEnabled}}"
ng-click="postConsumerRequest(item)">{{ 'COMMIT' | translate }}
</button>
<button type="button" class="ngdialog-button ngdialog-button-secondary"
ng-click="closeThisDialog('Cancel')">{{ 'CLOSE' | translate }}
</button>
</div>
</div>
</div>
</div>
</div>
</div>
</script>
<script type="text/ng-template" id="consumerModifyDialogForV5">
<div>
<div>
<div class="modal-header">
<h4 class="modal-title">{{'SUBSCRIPTION_CHANGE'|translate}}</h4>
</div>
<div class="modal-body " ng-repeat="item in ngDialogData.consumerRequestList">
<form id="addAppForm1" name="addAppForm" class="form-horizontal" novalidate>
<div class="form-group" ng-hide="ngDialogData.bIsUpdate">
<label class="control-label col-sm-3">clusterName:</label>
<div class="col-sm-9">
<select name="mySelectClusterNameList" multiple chosen
ng-model="item.clusterNameList"
ng-options="clusterNameItem for clusterNameItem in ngDialogData.allClusterNameList">
<option value=""></option>
</select>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">brokerName:</label>
<div class="col-sm-9">
<select name="mySelectBrokerNameList" multiple chosen
ng-disabled="ngDialogData.bIsUpdate"
ng-model="item.brokerNameList"
ng-options="brokerNameItem for brokerNameItem in ngDialogData.allBrokerNameList">
<option value=""></option>
</select>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">groupName:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.groupName" type="text"
ng-disabled="ngDialogData.bIsUpdate" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">consumeEnable:</label>
<div class="col-sm-9">
<md-switch class="md-primary" ng-disabled="{{!ngDialogData.writeOperationEnabled}}" md-no-ink
aria-label="Switch No Ink" ng-model="item.subscriptionGroupConfig.consumeEnable">
</md-switch>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">consumeOrderlyEnable:</label>
<div class="col-sm-9">
<md-switch class="md-primary custom-md-switch" eng-disabled="{{!ngDialogData.writeOperationEnabled}}" md-no-ink
aria-label="Switch No Ink" ng-model="item.subscriptionGroupConfig.consumeMessageOrderly">
</md-switch>
<span style="font-size: 12px;">[Pay Attention: FIFO ConsumerGroup Need Open 'consumeOrderlyEnable' Option]</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">consumeBroadcastEnable:</label>
<div class="col-sm-9">
<md-switch class="md-primary" ng-disabled="{{!ngDialogData.writeOperationEnabled}}" md-no-ink
aria-label="Switch No Ink"
ng-model="item.subscriptionGroupConfig.consumeBroadcastEnable">
</md-switch>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">retryQueueNums:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.retryQueueNums"
type="text" ng-disabled="{{!ngDialogData.writeOperationEnabled}}"
required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">brokerId:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.brokerId" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">retryMaxTimes:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.retryMaxTimes" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group"> <div class="form-group">
<label class="control-label col-sm-3">whichBrokerWhenConsumeSlowly:</label> <label class="control-label col-sm-3">whichBrokerWhenConsumeSlowly:</label>
<div class="col-sm-9"> <div class="col-sm-9">

View File

@@ -0,0 +1,67 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="container-fluid" id="deployHistoryList">
<div class="page-content">
<h2 class="md-title">ProxyServerAddressList</h2>
<div class="pull-left" style="min-width: 400px; max-width: 500px; padding: 10px 10px 10px 0">
<select ng-model="selectedProxy" chosen
ng-options="x for x in proxyAddrList"
ng-change="updateProxyAddr()"
required></select>
</div>
<div class="pull-left">
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}"
ng-click="updateProxyAddr()">{{'UPDATE' | translate}}
</button>
</div>
<form class="form-inline pull-left" style="margin-left: 20px" ng-show="{{writeOperationEnabled}}">
<div class="form-group" style="margin: 0">
<label for="proxyAddr">ProxyAddr:</label>
<input id="proxyAddr" class="form-control" style="width: 300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/>
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="addProxyAddr()"> {{ 'ADD' | translate}}
</button>
</div>
</form>
</div>
</div>
<div class="modal proxyModal fade" role="dialog" tabindex="-1" aria-hidden="true" aria-labelledby="config-modal-label">
<div class="modal-dialog modal-lg">
<div class="modal-content" >
<div class="modal-header">
<button class="close" type="button" data-dismiss="modal">&times;</button>
<h4 id="config-modal-label" class="modal-title">
[{{selectedProxy}}]
</h4>
</div>
<div class="modal-body limit_height">
<table class="table table-bordered">
<tr ng-repeat="(key, value) in allProxyConfig">
<td>{{key}}</td>
<td>{{value}}</td>
</tr>
</table>
</div>
<div class="modal-footer">
<div class="col-md-12 text-center">
<button type="button" class="btn btn-raised" data-dismiss="modal">{{ 'CLOSE' | translate }}</button>
</div>
</div>
</div>
</div>
</div>

View File

@@ -24,6 +24,14 @@
</div> </div>
<md-checkbox aria-label="Checkbox" ng-model="filterNormal" class="md-primary">{{'NORMAL' | translate}} <md-checkbox aria-label="Checkbox" ng-model="filterNormal" class="md-primary">{{'NORMAL' | translate}}
</md-checkbox> </md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterDelay" class="md-primary" ng-show="rmqVersion">{{'DELAY' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterFifo" class="md-primary" ng-show="rmqVersion">{{'FIFO' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterTransaction" class="md-primary" ng-show="rmqVersion">{{'TRANSACTION' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterUnspecified" class="md-primary" ng-show="rmqVersion">{{'UNSPECIFIED' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterRetry" class="md-primary">{{'RETRY' | translate}} <md-checkbox aria-label="Checkbox" ng-model="filterRetry" class="md-primary">{{'RETRY' | translate}}
</md-checkbox> </md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterDLQ" class="md-primary">{{'DLQ' | translate}} <md-checkbox aria-label="Checkbox" ng-model="filterDLQ" class="md-primary">{{'DLQ' | translate}}
@@ -63,6 +71,7 @@
<button class="btn btn-raised btn-sm btn-primary" type="button" <button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="openUpdateDialog(topic, sysFlag)">topic {{'CONFIG' |translate}} ng-click="openUpdateDialog(topic, sysFlag)">topic {{'CONFIG' |translate}}
</button> </button>
<!-- todo 发送消息,根据消息类型判断-->
<button class="btn btn-raised btn-sm btn-primary" type="button" <button class="btn btn-raised btn-sm btn-primary" type="button"
ng-show="{{!sysFlag}}" ng-show="{{!sysFlag}}"
ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}} ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}}
@@ -189,6 +198,18 @@
<span class="text-danger" ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span> <span class="text-danger" ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span>
</div> </div>
</div> </div>
<!-- 设置topic 类型 -->
<div class="form-group">
<label class="control-label col-sm-2">{{'MESSAGE_TYPE'|translate}}:</label>
<div class="col-sm-10">
<select name="mySelectMessageType" chosen ng-disabled="ngDialogData.bIsUpdate"
ng-model="item.messageType"
ng-options="messageType as value | translate disable when messageType=='UNSPECIFIED' for (messageType , value) in ngDialogData.allMessageTypeList"
>
<option value=""></option>
</select>
</div>
</div>
<div class="form-group"> <div class="form-group">
<label class="control-label col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label> <label class="control-label col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label>
<div class="col-sm-10"> <div class="col-sm-10">

View File

@@ -34,27 +34,27 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl; import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
import org.apache.rocketmq.dashboard.service.client.MQAdminInstance; import org.apache.rocketmq.dashboard.service.client.MQAdminInstance;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;
@@ -82,6 +82,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@@ -570,11 +571,13 @@ public class MQAdminExtImplTest {
public void testCreateTopic() throws Exception { public void testCreateTopic() throws Exception {
assertNotNull(mqAdminExtImpl); assertNotNull(mqAdminExtImpl);
{ {
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt()); doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyMap());
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt()); doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt(), anyMap());
} }
mqAdminExtImpl.createTopic("key", "topic_test", 8); Map<String, String> map = new HashMap<>();
mqAdminExtImpl.createTopic("key", "topic_test", 8, 1); map.put("message.type", "FIFO");
mqAdminExtImpl.createTopic("key", "topic_test", 8, map);
mqAdminExtImpl.createTopic("key", "topic_test", 8, 1, map);
} }
@Test @Test

View File

@@ -18,7 +18,7 @@ package org.apache.rocketmq.dashboard.admin;
import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;

View File

@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.dashboard.model.request.AclRequest; import org.apache.rocketmq.dashboard.model.request.AclRequest;
import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl; import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;

View File

@@ -18,8 +18,8 @@ package org.apache.rocketmq.dashboard.controller;
import java.util.HashMap; import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.dashboard.service.impl.ClusterServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ClusterServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil; import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.junit.Test; import org.junit.Test;

View File

@@ -23,17 +23,17 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;

View File

@@ -21,10 +21,10 @@ import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.MessageView;

View File

@@ -32,9 +32,9 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo; import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl; import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl;

View File

@@ -19,8 +19,8 @@ package org.apache.rocketmq.dashboard.controller;
import java.util.HashSet; import java.util.HashSet;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.dashboard.interceptor.AuthInterceptor; import org.apache.rocketmq.dashboard.interceptor.AuthInterceptor;
import org.apache.rocketmq.dashboard.service.impl.LoginServiceImpl; import org.apache.rocketmq.dashboard.service.impl.LoginServiceImpl;
import org.apache.rocketmq.dashboard.service.impl.ProducerServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ProducerServiceImpl;

View File

@@ -29,16 +29,16 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl; import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl;

View File

@@ -33,12 +33,12 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.BaseTest; import org.apache.rocketmq.dashboard.BaseTest;
import org.apache.rocketmq.dashboard.config.CollectExecutorConfig; import org.apache.rocketmq.dashboard.config.CollectExecutorConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;

View File

@@ -29,7 +29,7 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.ConsumerService; import org.apache.rocketmq.dashboard.service.ConsumerService;
@@ -78,7 +78,8 @@ public abstract class RocketMQConsoleTestBase {
} }
} }
} }
throw Throwables.propagate(exception); Throwables.throwIfUnchecked(exception);
throw new RuntimeException(exception);
} }
} }
@@ -91,7 +92,8 @@ public abstract class RocketMQConsoleTestBase {
producer.start(); producer.start();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }
@@ -137,7 +139,8 @@ public abstract class RocketMQConsoleTestBase {
consumer.start(); consumer.start();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} }
} }

View File

@@ -33,39 +33,39 @@ import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsItem;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.checkerframework.checker.units.qual.A; import org.checkerframework.checker.units.qual.A;
import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; import static org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY;
public class MockObjectUtil { public class MockObjectUtil {