Proxy Support And ConsumerGroup Enhancement (#207)

* Support dashboard v4-v5 switch And query for v5 topic

* Modify tag name

* Support proxy-module And Fix the problem of showing wrong consumerGroup-info

---------

Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
This commit is contained in:
Akai
2024-06-12 09:12:19 +08:00
committed by GitHub
parent e7cb315050
commit d58e13da95
22 changed files with 516 additions and 48 deletions

View File

@@ -43,6 +43,8 @@ public class RMQConfigure {
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env //use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private volatile String proxyAddr;
private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"); private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
@@ -62,6 +64,8 @@ public class RMQConfigure {
private List<String> namesrvAddrs = new ArrayList<>(); private List<String> namesrvAddrs = new ArrayList<>();
private List<String> proxyAddrs = new ArrayList<>();
public String getAccessKey() { public String getAccessKey() {
return accessKey; return accessKey;
} }
@@ -86,6 +90,25 @@ public class RMQConfigure {
return namesrvAddrs; return namesrvAddrs;
} }
public List<String> getProxyAddrs() {
return this.proxyAddrs;
}
public void setProxyAddrs(List<String> proxyAddrs) {
this.proxyAddrs = proxyAddrs;
if (CollectionUtils.isNotEmpty(proxyAddrs)) {
this.setProxyAddr(proxyAddrs.get(0));
}
}
public String getProxyAddr() {
return proxyAddr;
}
public void setProxyAddr(String proxyAddr) {
this.proxyAddr = proxyAddr;
}
public void setNamesrvAddrs(List<String> namesrvAddrs) { public void setNamesrvAddrs(List<String> namesrvAddrs) {
this.namesrvAddrs = namesrvAddrs; this.namesrvAddrs = namesrvAddrs;
if (CollectionUtils.isNotEmpty(namesrvAddrs)) { if (CollectionUtils.isNotEmpty(namesrvAddrs)) {

View File

@@ -47,14 +47,14 @@ public class ConsumerController {
@RequestMapping(value = "/groupList.query") @RequestMapping(value = "/groupList.query")
@ResponseBody @ResponseBody
public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) { public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) {
return consumerService.queryGroupList(skipSysGroup); return consumerService.queryGroupList(skipSysGroup, address);
} }
@RequestMapping(value = "/group.query") @RequestMapping(value = "/group.query")
@ResponseBody @ResponseBody
public Object groupQuery(@RequestParam String consumerGroup) { public Object groupQuery(@RequestParam String consumerGroup, String address) {
return consumerService.queryGroup(consumerGroup); return consumerService.queryGroup(consumerGroup, address);
} }
@RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST}) @RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@ public class ConsumerController {
@RequestMapping(value = "/queryTopicByConsumer.query") @RequestMapping(value = "/queryTopicByConsumer.query")
@ResponseBody @ResponseBody
public Object queryConsumerByTopic(@RequestParam String consumerGroup) { public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) {
return consumerService.queryConsumeStatsListByGroupName(consumerGroup); return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address);
} }
@RequestMapping(value = "/consumerConnection.query") @RequestMapping(value = "/consumerConnection.query")
@ResponseBody @ResponseBody
public Object consumerConnection(@RequestParam(required = false) String consumerGroup) { public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) {
ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup); ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address);
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet())); consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
return consumerConnection; return consumerConnection;
} }

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping("/proxy")
@Permission
public class ProxyController {
@Resource
private ProxyService proxyService;
@RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
@ResponseBody
public Object homePage() {
return proxyService.getProxyHomePage();
}
@RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object addProxyAddr(@RequestParam String newProxyAddr) {
proxyService.addProxyAddrList(newProxyAddr);
return true;
}
@RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object updateProxyAddr(@RequestParam String proxyAddr) {
proxyService.updateProxyAddrList(proxyAddr);
return true;
}
}

View File

@@ -19,12 +19,15 @@ package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import java.util.List;
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group; private String group;
private String version; private String version;
private int count; private int count;
private ConsumeType consumeType; private ConsumeType consumeType;
private MessageModel messageModel; private MessageModel messageModel;
private List<String> address;
private int consumeTps; private int consumeTps;
private long diffTotal = -1; private long diffTotal = -1;
private String subGroupType = "NORMAL"; private String subGroupType = "NORMAL";
@@ -70,6 +73,22 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
this.diffTotal = diffTotal; this.diffTotal = diffTotal;
} }
public List<String> getAddress() {
return address;
}
public void setAddress(List<String> address) {
this.address = address;
}
public String getSubGroupType() {
return subGroupType;
}
public void setSubGroupType(String subGroupType) {
this.subGroupType = subGroupType;
}
@Override @Override
public int compareTo(GroupConsumeInfo o) { public int compareTo(GroupConsumeInfo o) {
if (this.count != o.count) { if (this.count != o.count) {
@@ -93,12 +112,4 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
public void setVersion(String version) { public void setVersion(String version) {
this.version = version; this.version = version;
} }
public String getSubGroupType() {
return subGroupType;
}
public void setSubGroupType(String subGroupType) {
this.subGroupType = subGroupType;
}
} }

View File

@@ -31,12 +31,12 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
public interface ConsumerService { public interface ConsumerService {
List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup); List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
GroupConsumeInfo queryGroup(String consumerGroup); GroupConsumeInfo queryGroup(String consumerGroup, String address);
List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName); List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address);
List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName); List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
@@ -52,7 +52,7 @@ public interface ConsumerService {
Set<String> fetchBrokerNameSetBySubscriptionGroup(String group); Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
ConsumerConnection getConsumerConnection(String consumerGroup); ConsumerConnection getConsumerConnection(String consumerGroup, String address);
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
} }

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service;
import java.util.Map;
public interface ProxyService {
void addProxyAddrList(String proxyAddr);
void updateProxyAddrList(String proxyAddr);
Map<String, Object> getProxyHomePage();
}

View File

@@ -627,7 +627,7 @@ public class MQAdminExtImpl implements MQAdminExt {
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException { RemotingConnectException, MQBrokerException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'"); return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis);
} }
@Override @Override
@@ -639,8 +639,7 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override @Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException { throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
// TODO Auto-generated method stub return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr);
throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'");
} }
@Override @Override

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.client;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
public interface ProxyAdmin {
ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException;
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
@Slf4j
@Service
public class ProxyAdminImpl implements ProxyAdmin {
@Autowired
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
@Override
public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
try {
MQAdminInstance.createMQAdmin(mqAdminExtPool);
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader);
RemotingCommand response = remotingClient.invokeSync(addr, request, 3000);
switch (response.getCode()) {
case 0:
return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
} finally {
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
}
}
}

View File

@@ -23,8 +23,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
@@ -77,6 +80,8 @@ import org.springframework.stereotype.Service;
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource
protected ProxyAdmin proxyAdmin;
@Resource @Resource
private RMQConfigure configure; private RMQConfigure configure;
@@ -119,25 +124,33 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) { public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) {
Set<String> consumerGroupSet = Sets.newHashSet(); HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
try { try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
if (!consumerGroupMap.containsKey(groupName)) {
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
}
List<String> addresses = consumerGroupMap.get(groupName);
addresses.add(brokerData.selectBrokerAddr());
consumerGroupMap.put(groupName, addresses);
}
} }
} } catch (Exception err) {
catch (Exception err) {
Throwables.throwIfUnchecked(err); Throwables.throwIfUnchecked(err);
throw new RuntimeException(err); throw new RuntimeException(err);
} }
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
for (String consumerGroup : consumerGroupSet) { for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
String consumerGroup = entry.getKey();
executorService.submit(() -> { executorService.submit(() -> {
try { try {
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup); GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address);
consumeInfo.setAddress(entry.getValue());
groupConsumeInfoList.add(consumeInfo); groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) { } catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
@@ -165,7 +178,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public GroupConsumeInfo queryGroup(String consumerGroup) { public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try { try {
ConsumeStats consumeStats = null; ConsumeStats consumeStats = null;
@@ -182,9 +195,12 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly); .allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
try { try {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); if (StringUtils.isNotEmpty(address)) {
} consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
catch (Exception e) { } else {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
}
} catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
} }
@@ -217,8 +233,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) { public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
return queryConsumeStatsList(null, groupName); ConsumeStats consumeStats;
String topic = null;
try {
String[] addresses = address.split(",");
String addr = addresses[0];
consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return toTopicConsumerInfoList(topic, consumeStats, groupName);
} }
@Override @Override
@@ -231,6 +257,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return toTopicConsumerInfoList(topic, consumeStats, groupName);
}
private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) {
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
@Override @Override
public boolean apply(MessageQueue o) { public boolean apply(MessageQueue o) {
@@ -431,11 +461,12 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
} }
@Override @Override
public ConsumerConnection getConsumerConnection(String consumerGroup) { public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
try { try {
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); String[] addresses = address.split(",");
} String addr = addresses[0];
catch (Exception e) { return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
} catch (Exception e) {
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class ProxyServiceImpl implements ProxyService {
@Resource
protected ProxyAdmin proxyAdmin;
@Resource
private RMQConfigure configure;
@Override
public void addProxyAddrList(String proxyAddr) {
List<String> proxyAddrs = configure.getProxyAddrs();
if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) {
proxyAddrs.add(proxyAddr);
}
configure.setProxyAddrs(proxyAddrs);
}
@Override
public void updateProxyAddrList(String proxyAddr) {
configure.setProxyAddr(proxyAddr);
}
@Override
public Map<String, Object> getProxyHomePage() {
Map<String, Object> homePageInfoMap = Maps.newHashMap();
homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr());
homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs());
return homePageInfoMap;
}
}

View File

@@ -40,7 +40,7 @@ public class MonitorTask {
// @Scheduled(cron = "* * * * * ?") // @Scheduled(cron = "* * * * * ?")
public void scanProblemConsumeGroup() { public void scanProblemConsumeGroup() {
for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) { for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) {
GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey()); GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null);
if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) { if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system
} }

View File

@@ -59,6 +59,9 @@ rocketmq:
# must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
loginRequired: false loginRequired: false
useTLS: false useTLS: false
proxyAddr: 127.0.0.1:8080
proxyAddrs:
- 127.0.0.1:8080
# set the accessKey and secretKey if you used acl # set the accessKey and secretKey if you used acl
# accessKey: rocketmq2 # accessKey: rocketmq2
# secretKey: 12345678 # secretKey: 12345678

View File

@@ -104,6 +104,7 @@
<script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script> <script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script>
<script type="text/javascript" src="src/cluster.js?timestamp=4"></script> <script type="text/javascript" src="src/cluster.js?timestamp=4"></script>
<script type="text/javascript" src="src/topic.js"></script> <script type="text/javascript" src="src/topic.js"></script>
<script type="text/javascript" src="src/proxy.js"></script>
<script type="text/javascript" src="src/consumer.js?timestamp=6"></script> <script type="text/javascript" src="src/consumer.js?timestamp=6"></script>
<script type="text/javascript" src="src/producer.js"></script> <script type="text/javascript" src="src/producer.js"></script>
<script type="text/javascript" src="src/message.js"></script> <script type="text/javascript" src="src/message.js"></script>

View File

@@ -213,6 +213,9 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro
}).when('/ops', { }).when('/ops', {
templateUrl: 'view/pages/ops.html', templateUrl: 'view/pages/ops.html',
controller:'opsController' controller:'opsController'
}).when('/proxy', {
templateUrl: 'view/pages/proxy.html',
controller:'proxyController'
}).when('/acl', { }).when('/acl', {
templateUrl: 'view/pages/acl.html', templateUrl: 'view/pages/acl.html',
controller: 'aclController' controller: 'aclController'

View File

@@ -79,6 +79,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
url: "consumer/groupList.query", url: "consumer/groupList.query",
params: { params: {
skipSysGroup: false, skipSysGroup: false,
address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null
} }
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
@@ -243,11 +244,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
} }
}); });
}; };
$scope.detail = function (consumerGroupName) { $scope.detail = function (consumerGroupName, address) {
$http({ $http({
method: "GET", method: "GET",
url: "consumer/queryTopicByConsumer.query", url: "consumer/queryTopicByConsumer.query",
params: {consumerGroup: consumerGroupName} params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
console.log(resp); console.log(resp);
@@ -262,11 +263,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
}); });
}; };
$scope.client = function (consumerGroupName) { $scope.client = function (consumerGroupName, address) {
$http({ $http({
method: "GET", method: "GET",
url: "consumer/consumerConnection.query", url: "consumer/consumerConnection.query",
params: {consumerGroup: consumerGroupName} params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) { }).success(function (resp) {
if (resp.status == 0) { if (resp.status == 0) {
console.log(resp); console.log(resp);

View File

@@ -100,6 +100,7 @@ var en = {
"RESET_OFFSET":"resetOffset", "RESET_OFFSET":"resetOffset",
"CLUSTER_NAME":"clusterName", "CLUSTER_NAME":"clusterName",
"OPS":"OPS", "OPS":"OPS",
"PROXY":"Proxy",
"AUTO_REFRESH":"AUTO_REFRESH", "AUTO_REFRESH":"AUTO_REFRESH",
"REFRESH":"REFRESH", "REFRESH":"REFRESH",
"LOGOUT":"Logout", "LOGOUT":"Logout",

View File

@@ -101,6 +101,7 @@ var zh = {
"RESET_OFFSET":"重置位点", "RESET_OFFSET":"重置位点",
"CLUSTER_NAME":"集群名", "CLUSTER_NAME":"集群名",
"OPS":"运维", "OPS":"运维",
"PROXY":"代理",
"AUTO_REFRESH":"自动刷新", "AUTO_REFRESH":"自动刷新",
"REFRESH":"刷新", "REFRESH":"刷新",
"LOGOUT":"退出", "LOGOUT":"退出",

View File

@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var module = app;
module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window',
function ($scope, $location, $http, Notification, remoteApi, tools, $window) {
$scope.proxyAddrList = [];
$scope.userRole = $window.sessionStorage.getItem("userrole");
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
$scope.inputReadonly = !$scope.writeOperationEnabled;
$scope.newProxyAddr = "";
$scope.allProxyConfig = {};
$http({
method: "GET",
url: "proxy/homePage.query"
}).success(function (resp) {
if (resp.status == 0) {
$scope.proxyAddrList = resp.data.proxyAddrList;
$scope.selectedProxy = resp.data.currentProxyAddr;
$scope.showProxyDetailConfig($scope.selectedProxy);
localStorage.setItem('proxyAddr',$scope.selectedProxy);
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.eleChange = function (data) {
$scope.proxyAddrList = data;
}
$scope.showDetailConf = function () {
$(".proxyModal").modal();
}
$scope.showProxyDetailConfig = function (proxyAddr) {
$http({
method: "GET",
url: "proxy/proxyDetailConfig.query",
params: {proxyAddress: proxyAddr}
}).success(function (resp) {
if (resp.status == 0) {
$scope.allProxyConfig = resp.data;
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
$scope.updateProxyAddr = function () {
$http({
method: "POST",
url: "proxy/updateProxyAddr.do",
params: {proxyAddr: $scope.selectedProxy}
}).success(function (resp) {
if (resp.status == 0) {
localStorage.setItem('proxyAddr', $scope.selectedProxy);
Notification.info({message: "SUCCESS", delay: 2000});
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.showProxyDetailConfig($scope.selectedProxy);
};
$scope.addProxyAddr = function () {
$http({
method: "POST",
url: "proxy/addProxyAddr.do",
params: {newProxyAddr: $scope.newProxyAddr}
}).success(function (resp) {
if (resp.status == 0) {
if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) {
$scope.proxyAddrList.push($scope.newProxyAddr);
}
$("#proxyAddr").val("");
$scope.newProxyAddr = "";
Notification.info({message: "SUCCESS", delay: 2000});
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
}])

View File

@@ -28,6 +28,7 @@
<div class="navbar-collapse collapse navbar-warning-collapse"> <div class="navbar-collapse collapse navbar-warning-collapse">
<ul class="nav navbar-nav"> <ul class="nav navbar-nav">
<li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li> <li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li>
<li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li> <li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
<li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li> <li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li>
<li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li> <li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li>

View File

@@ -66,11 +66,11 @@
<td class="text-center">{{consumerGroup.consumeTps}}</td> <td class="text-center">{{consumerGroup.consumeTps}}</td>
<td class="text-center">{{consumerGroup.diffTotal}}</td> <td class="text-center">{{consumerGroup.diffTotal}}</td>
<td class="text-left"> <td class="text-left">
<button name="client" ng-click="client(consumerGroup.group)" <button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary" class="btn btn-raised btn-sm btn-primary"
type="button">{{'CLIENT' | translate}} type="button">{{'CLIENT' | translate}}
</button> </button>
<button name="client" ng-click="detail(consumerGroup.group)" <button name="client" ng-click="detail(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary" class="btn btn-raised btn-sm btn-primary"
type="button">{{'CONSUME_DETAIL' | translate}} type="button">{{'CONSUME_DETAIL' | translate}}
</button> </button>

View File

@@ -0,0 +1,67 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="container-fluid" id="deployHistoryList">
<div class="page-content">
<h2 class="md-title">ProxyServerAddressList</h2>
<div class="pull-left" style="min-width: 400px; max-width: 500px; padding: 10px 10px 10px 0">
<select ng-model="selectedProxy" chosen
ng-options="x for x in proxyAddrList"
ng-change="updateProxyAddr()"
required></select>
</div>
<div class="pull-left">
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}"
ng-click="updateProxyAddr()">{{'UPDATE' | translate}}
</button>
</div>
<form class="form-inline pull-left" style="margin-left: 20px" ng-show="{{writeOperationEnabled}}">
<div class="form-group" style="margin: 0">
<label for="proxyAddr">ProxyAddr:</label>
<input id="proxyAddr" class="form-control" style="width: 300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/>
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="addProxyAddr()"> {{ 'ADD' | translate}}
</button>
</div>
</form>
</div>
</div>
<div class="modal proxyModal fade" role="dialog" tabindex="-1" aria-hidden="true" aria-labelledby="config-modal-label">
<div class="modal-dialog modal-lg">
<div class="modal-content" >
<div class="modal-header">
<button class="close" type="button" data-dismiss="modal">&times;</button>
<h4 id="config-modal-label" class="modal-title">
[{{selectedProxy}}]
</h4>
</div>
<div class="modal-body limit_height">
<table class="table table-bordered">
<tr ng-repeat="(key, value) in allProxyConfig">
<td>{{key}}</td>
<td>{{value}}</td>
</tr>
</table>
</div>
<div class="modal-footer">
<div class="col-md-12 text-center">
<button type="button" class="btn btn-raised" data-dismiss="modal">{{ 'CLOSE' | translate }}</button>
</div>
</div>
</div>
</div>
</div>