Compare commits

..

1 Commits

Author SHA1 Message Date
cndoit18
01e5fcb1b4 Merge a4a6000734 into 21dc2acfdc 2024-06-05 18:57:01 +08:00
23 changed files with 48 additions and 667 deletions

View File

@@ -43,8 +43,6 @@ public class RMQConfigure {
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private volatile String proxyAddr;
private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
@@ -64,8 +62,6 @@ public class RMQConfigure {
private List<String> namesrvAddrs = new ArrayList<>();
private List<String> proxyAddrs = new ArrayList<>();
public String getAccessKey() {
return accessKey;
}
@@ -90,25 +86,6 @@ public class RMQConfigure {
return namesrvAddrs;
}
public List<String> getProxyAddrs() {
return this.proxyAddrs;
}
public void setProxyAddrs(List<String> proxyAddrs) {
this.proxyAddrs = proxyAddrs;
if (CollectionUtils.isNotEmpty(proxyAddrs)) {
this.setProxyAddr(proxyAddrs.get(0));
}
}
public String getProxyAddr() {
return proxyAddr;
}
public void setProxyAddr(String proxyAddr) {
this.proxyAddr = proxyAddr;
}
public void setNamesrvAddrs(List<String> namesrvAddrs) {
this.namesrvAddrs = namesrvAddrs;
if (CollectionUtils.isNotEmpty(namesrvAddrs)) {

View File

@@ -47,14 +47,14 @@ public class ConsumerController {
@RequestMapping(value = "/groupList.query")
@ResponseBody
public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) {
return consumerService.queryGroupList(skipSysGroup, address);
public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) {
return consumerService.queryGroupList(skipSysGroup);
}
@RequestMapping(value = "/group.query")
@ResponseBody
public Object groupQuery(@RequestParam String consumerGroup, String address) {
return consumerService.queryGroup(consumerGroup, address);
public Object groupQuery(@RequestParam String consumerGroup) {
return consumerService.queryGroup(consumerGroup);
}
@RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@ public class ConsumerController {
@RequestMapping(value = "/queryTopicByConsumer.query")
@ResponseBody
public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) {
return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address);
public Object queryConsumerByTopic(@RequestParam String consumerGroup) {
return consumerService.queryConsumeStatsListByGroupName(consumerGroup);
}
@RequestMapping(value = "/consumerConnection.query")
@ResponseBody
public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) {
ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address);
public Object consumerConnection(@RequestParam(required = false) String consumerGroup) {
ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup);
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
return consumerConnection;
}

View File

@@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping("/proxy")
@Permission
public class ProxyController {
@Resource
private ProxyService proxyService;
@RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
@ResponseBody
public Object homePage() {
return proxyService.getProxyHomePage();
}
@RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object addProxyAddr(@RequestParam String newProxyAddr) {
proxyService.addProxyAddrList(newProxyAddr);
return true;
}
@RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object updateProxyAddr(@RequestParam String proxyAddr) {
proxyService.updateProxyAddrList(proxyAddr);
return true;
}
}

View File

@@ -19,19 +19,14 @@ package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import java.util.List;
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group;
private String version;
private int count;
private ConsumeType consumeType;
private MessageModel messageModel;
private List<String> address;
private int consumeTps;
private long diffTotal = -1;
private String subGroupType = "NORMAL";
public String getGroup() {
return group;
@@ -73,22 +68,6 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
this.diffTotal = diffTotal;
}
public List<String> getAddress() {
return address;
}
public void setAddress(List<String> address) {
this.address = address;
}
public String getSubGroupType() {
return subGroupType;
}
public void setSubGroupType(String subGroupType) {
this.subGroupType = subGroupType;
}
@Override
public int compareTo(GroupConsumeInfo o) {
if (this.count != o.count) {

View File

@@ -31,12 +31,12 @@ import java.util.Map;
import java.util.Set;
public interface ConsumerService {
List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup);
GroupConsumeInfo queryGroup(String consumerGroup, String address);
GroupConsumeInfo queryGroup(String consumerGroup);
List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address);
List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
@@ -52,7 +52,7 @@ public interface ConsumerService {
Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
ConsumerConnection getConsumerConnection(String consumerGroup, String address);
ConsumerConnection getConsumerConnection(String consumerGroup);
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
}

View File

@@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service;
import java.util.Map;
public interface ProxyService {
void addProxyAddrList(String proxyAddr);
void updateProxyAddrList(String proxyAddr);
Map<String, Object> getProxyHomePage();
}

View File

@@ -627,7 +627,7 @@ public class MQAdminExtImpl implements MQAdminExt {
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
// TODO Auto-generated method stub
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis);
throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'");
}
@Override
@@ -639,7 +639,8 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr);
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'");
}
@Override

View File

@@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.client;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
public interface ProxyAdmin {
ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException;
}

View File

@@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
@Slf4j
@Service
public class ProxyAdminImpl implements ProxyAdmin {
@Autowired
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
@Override
public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
try {
MQAdminInstance.createMQAdmin(mqAdminExtPool);
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader);
RemotingCommand response = remotingClient.invokeSync(addr, request, 3000);
switch (response.getCode()) {
case 0:
return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
} finally {
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
}
}
}

View File

@@ -23,10 +23,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -46,7 +44,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -80,8 +77,6 @@ import org.springframework.stereotype.Service;
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource
protected ProxyAdmin proxyAdmin;
@Resource
private RMQConfigure configure;
@@ -124,33 +119,25 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) {
HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) {
Set<String> consumerGroupSet = Sets.newHashSet();
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
if (!consumerGroupMap.containsKey(groupName)) {
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
}
List<String> addresses = consumerGroupMap.get(groupName);
addresses.add(brokerData.selectBrokerAddr());
consumerGroupMap.put(groupName, addresses);
}
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
}
} catch (Exception err) {
}
catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
String consumerGroup = entry.getKey();
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
for (String consumerGroup : consumerGroupSet) {
executorService.submit(() -> {
try {
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address);
consumeInfo.setAddress(entry.getValue());
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
@@ -178,7 +165,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
public GroupConsumeInfo queryGroup(String consumerGroup) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try {
ConsumeStats consumeStats = null;
@@ -190,28 +177,14 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
ConsumerConnection consumerConnection = null;
boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup)
.stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig)
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
try {
if (StringUtils.isNotEmpty(address)) {
consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
} else {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
}
} catch (Exception e) {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
}
catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
}
groupConsumeInfo.setGroup(consumerGroup);
if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
groupConsumeInfo.setSubGroupType("SYSTEM");
} else if (isFifoType) {
groupConsumeInfo.setSubGroupType("FIFO");
} else {
groupConsumeInfo.setSubGroupType("NORMAL");
}
if (consumeStats != null) {
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
@@ -233,18 +206,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
ConsumeStats consumeStats;
String topic = null;
try {
String[] addresses = address.split(",");
String addr = addresses[0];
consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return toTopicConsumerInfoList(topic, consumeStats, groupName);
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) {
return queryConsumeStatsList(null, groupName);
}
@Override
@@ -257,10 +220,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return toTopicConsumerInfoList(topic, consumeStats, groupName);
}
private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) {
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
@Override
public boolean apply(MessageQueue o) {
@@ -461,12 +420,11 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
public ConsumerConnection getConsumerConnection(String consumerGroup) {
try {
String[] addresses = address.split(",");
String addr = addresses[0];
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
} catch (Exception e) {
return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
}
catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}

View File

@@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class ProxyServiceImpl implements ProxyService {
@Resource
protected ProxyAdmin proxyAdmin;
@Resource
private RMQConfigure configure;
@Override
public void addProxyAddrList(String proxyAddr) {
List<String> proxyAddrs = configure.getProxyAddrs();
if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) {
proxyAddrs.add(proxyAddr);
}
configure.setProxyAddrs(proxyAddrs);
}
@Override
public void updateProxyAddrList(String proxyAddr) {
configure.setProxyAddr(proxyAddr);
}
@Override
public Map<String, Object> getProxyHomePage() {
Map<String, Object> homePageInfoMap = Maps.newHashMap();
homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr());
homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs());
return homePageInfoMap;
}
}

View File

@@ -40,7 +40,7 @@ public class MonitorTask {
// @Scheduled(cron = "* * * * * ?")
public void scanProblemConsumeGroup() {
for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) {
GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null);
GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey());
if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system
}

View File

@@ -59,9 +59,6 @@ rocketmq:
# must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
loginRequired: false
useTLS: false
proxyAddr: 127.0.0.1:8080
proxyAddrs:
- 127.0.0.1:8080
# set the accessKey and secretKey if you used acl
# accessKey: rocketmq2
# secretKey: 12345678

View File

@@ -104,7 +104,6 @@
<script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script>
<script type="text/javascript" src="src/cluster.js?timestamp=4"></script>
<script type="text/javascript" src="src/topic.js"></script>
<script type="text/javascript" src="src/proxy.js"></script>
<script type="text/javascript" src="src/consumer.js?timestamp=6"></script>
<script type="text/javascript" src="src/producer.js"></script>
<script type="text/javascript" src="src/message.js"></script>

View File

@@ -213,9 +213,6 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro
}).when('/ops', {
templateUrl: 'view/pages/ops.html',
controller:'opsController'
}).when('/proxy', {
templateUrl: 'view/pages/proxy.html',
controller:'proxyController'
}).when('/acl', {
templateUrl: 'view/pages/acl.html',
controller: 'aclController'

View File

@@ -45,7 +45,6 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
$scope.filterNormal = true;
$scope.filterSystem = false;
$scope.filterFIFO = false;
$scope.doSort = function () {// todo how to change this fe's code ? (it's dirty)
if ($scope.sortKey == 'diffTotal') {
@@ -76,11 +75,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
$http({
method: "GET",
url: "consumer/groupList.query",
params: {
skipSysGroup: false,
address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null
}
url: "consumer/groupList.query"
}).success(function (resp) {
if (resp.status == 0) {
$scope.allConsumerGrouopList = resp.data;
@@ -140,28 +135,16 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
$scope.filterList(1);
});
$scope.$watch('filterFIFO', function () {
$scope.filterList(1);
});
$scope.filterByType = function (str, type,version) {
$scope.filterByType = function (str) {
if ($scope.filterSystem) {
if (type === "SYSTEM") {
if (str.startsWith("%S")) {
return true
}
}
if ($scope.filterNormal) {
if (type === "NORMAL") {
if (str.startsWith("%") == false) {
return true
}
if(!version && type === "FIFO"){
return true;
}
}
if ($scope.filterFIFO) {
if (type === "FIFO") {
return true;
}
}
return false;
};
@@ -171,7 +154,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
var canShowList = [];
$scope.allConsumerGrouopList.forEach(function (element) {
console.log(element)
if ($scope.filterByType(element.group, element.subGroupType, $scope.rmqVersion)) {
if ($scope.filterByType(element.group)) {
if (element.group.toLowerCase().indexOf(lowExceptStr) != -1) {
canShowList.push(element);
}
@@ -206,7 +189,6 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
subscriptionGroupConfig: {
groupName: "",
consumeEnable: true,
consumeMessageOrderly: false,
consumeFromMinEnable: true,
consumeBroadcastEnable: true,
retryQueueNums: 1,
@@ -229,7 +211,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
// Refresh topic list
$scope.refreshConsumerData();
},
template: $scope.rmqVersion ? 'consumerModifyDialogForV5' : 'consumerModifyDialog',
template: 'consumerModifyDialog',
controller: 'consumerModifyDialogController',
data: {
consumerRequestList: request,
@@ -244,11 +226,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
}
});
};
$scope.detail = function (consumerGroupName, address) {
$scope.detail = function (consumerGroupName) {
$http({
method: "GET",
url: "consumer/queryTopicByConsumer.query",
params: {consumerGroup: consumerGroupName, address: address}
params: {consumerGroup: consumerGroupName}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
@@ -263,11 +245,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
});
};
$scope.client = function (consumerGroupName, address) {
$scope.client = function (consumerGroupName) {
$http({
method: "GET",
url: "consumer/consumerConnection.query",
params: {consumerGroup: consumerGroupName, address: address}
params: {consumerGroup: consumerGroupName}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);

View File

@@ -15,7 +15,7 @@
* limitations under the License.
*/
app.controller('AppCtrl', ['$scope','$window','$translate','$http','Notification', function ($scope,$window,$translate, $http, Notification) {
$scope.rmqVersion = localStorage.getItem("isV5");
$scope.rmqVersion = localStorage.getItem("isV5") === "true" ? true : false;
$scope.changeTranslate = function(langKey){
$translate.use(langKey);
@@ -23,8 +23,7 @@ app.controller('AppCtrl', ['$scope','$window','$translate','$http','Notification
$scope.changeRMQVersion = function (version) {
$scope.rmqVersion = version === 5;
var v = version === 5;
localStorage.setItem("isV5", v);
localStorage.setItem("isV5", $scope.rmqVersion);
}
$scope.logout = function(){

View File

@@ -100,7 +100,6 @@ var en = {
"RESET_OFFSET":"resetOffset",
"CLUSTER_NAME":"clusterName",
"OPS":"OPS",
"PROXY":"Proxy",
"AUTO_REFRESH":"AUTO_REFRESH",
"REFRESH":"REFRESH",
"LOGOUT":"Logout",

View File

@@ -101,7 +101,6 @@ var zh = {
"RESET_OFFSET":"重置位点",
"CLUSTER_NAME":"集群名",
"OPS":"运维",
"PROXY":"代理",
"AUTO_REFRESH":"自动刷新",
"REFRESH":"刷新",
"LOGOUT":"退出",

View File

@@ -1,97 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var module = app;
module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window',
function ($scope, $location, $http, Notification, remoteApi, tools, $window) {
$scope.proxyAddrList = [];
$scope.userRole = $window.sessionStorage.getItem("userrole");
$scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
$scope.inputReadonly = !$scope.writeOperationEnabled;
$scope.newProxyAddr = "";
$scope.allProxyConfig = {};
$http({
method: "GET",
url: "proxy/homePage.query"
}).success(function (resp) {
if (resp.status == 0) {
$scope.proxyAddrList = resp.data.proxyAddrList;
$scope.selectedProxy = resp.data.currentProxyAddr;
$scope.showProxyDetailConfig($scope.selectedProxy);
localStorage.setItem('proxyAddr',$scope.selectedProxy);
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.eleChange = function (data) {
$scope.proxyAddrList = data;
}
$scope.showDetailConf = function () {
$(".proxyModal").modal();
}
$scope.showProxyDetailConfig = function (proxyAddr) {
$http({
method: "GET",
url: "proxy/proxyDetailConfig.query",
params: {proxyAddress: proxyAddr}
}).success(function (resp) {
if (resp.status == 0) {
$scope.allProxyConfig = resp.data;
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
$scope.updateProxyAddr = function () {
$http({
method: "POST",
url: "proxy/updateProxyAddr.do",
params: {proxyAddr: $scope.selectedProxy}
}).success(function (resp) {
if (resp.status == 0) {
localStorage.setItem('proxyAddr', $scope.selectedProxy);
Notification.info({message: "SUCCESS", delay: 2000});
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
$scope.showProxyDetailConfig($scope.selectedProxy);
};
$scope.addProxyAddr = function () {
$http({
method: "POST",
url: "proxy/addProxyAddr.do",
params: {newProxyAddr: $scope.newProxyAddr}
}).success(function (resp) {
if (resp.status == 0) {
if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) {
$scope.proxyAddrList.push($scope.newProxyAddr);
}
$("#proxyAddr").val("");
$scope.newProxyAddr = "";
Notification.info({message: "SUCCESS", delay: 2000});
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
};
}])

View File

@@ -28,7 +28,6 @@
<div class="navbar-collapse collapse navbar-warning-collapse">
<ul class="nav navbar-nav">
<li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li>
<li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
<li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li>
<li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li>

View File

@@ -26,8 +26,6 @@
</div>
<md-checkbox aria-label="Checkbox" ng-model="filterNormal" class="md-primary">{{'NORMAL' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-show="rmqVersion" ng-model="filterFIFO" class="md-primary">{{'FIFO' | translate}}
</md-checkbox>
<md-checkbox aria-label="Checkbox" ng-model="filterSystem" class="md-primary">{{'SYSTEM' | translate}}
</md-checkbox>
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}"
@@ -66,11 +64,11 @@
<td class="text-center">{{consumerGroup.consumeTps}}</td>
<td class="text-center">{{consumerGroup.diffTotal}}</td>
<td class="text-left">
<button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)"
<button name="client" ng-click="client(consumerGroup.group)"
class="btn btn-raised btn-sm btn-primary"
type="button">{{'CLIENT' | translate}}
</button>
<button name="client" ng-click="detail(consumerGroup.group, consumerGroup.address)"
<button name="client" ng-click="detail(consumerGroup.group)"
class="btn btn-raised btn-sm btn-primary"
type="button">{{'CONSUME_DETAIL' | translate}}
</button>
@@ -322,116 +320,6 @@
</div>
</script>
<script type="text/ng-template" id="consumerModifyDialogForV5">
<div>
<div>
<div class="modal-header">
<h4 class="modal-title">{{'SUBSCRIPTION_CHANGE'|translate}}</h4>
</div>
<div class="modal-body " ng-repeat="item in ngDialogData.consumerRequestList">
<form id="addAppForm1" name="addAppForm" class="form-horizontal" novalidate>
<div class="form-group" ng-hide="ngDialogData.bIsUpdate">
<label class="control-label col-sm-3">clusterName:</label>
<div class="col-sm-9">
<select name="mySelectClusterNameList" multiple chosen
ng-model="item.clusterNameList"
ng-options="clusterNameItem for clusterNameItem in ngDialogData.allClusterNameList">
<option value=""></option>
</select>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">brokerName:</label>
<div class="col-sm-9">
<select name="mySelectBrokerNameList" multiple chosen
ng-disabled="ngDialogData.bIsUpdate"
ng-model="item.brokerNameList"
ng-options="brokerNameItem for brokerNameItem in ngDialogData.allBrokerNameList">
<option value=""></option>
</select>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">groupName:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.groupName" type="text"
ng-disabled="ngDialogData.bIsUpdate" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">consumeEnable:</label>
<div class="col-sm-9">
<md-switch class="md-primary" ng-disabled="{{!ngDialogData.writeOperationEnabled}}" md-no-ink
aria-label="Switch No Ink" ng-model="item.subscriptionGroupConfig.consumeEnable">
</md-switch>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">consumeOrderlyEnable:</label>
<div class="col-sm-9">
<md-switch class="md-primary custom-md-switch" eng-disabled="{{!ngDialogData.writeOperationEnabled}}" md-no-ink
aria-label="Switch No Ink" ng-model="item.subscriptionGroupConfig.consumeMessageOrderly">
</md-switch>
<span style="font-size: 12px;">[Pay Attention: FIFO ConsumerGroup Need Open 'consumeOrderlyEnable' Option]</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">consumeBroadcastEnable:</label>
<div class="col-sm-9">
<md-switch class="md-primary" ng-disabled="{{!ngDialogData.writeOperationEnabled}}" md-no-ink
aria-label="Switch No Ink"
ng-model="item.subscriptionGroupConfig.consumeBroadcastEnable">
</md-switch>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">retryQueueNums:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.retryQueueNums"
type="text" ng-disabled="{{!ngDialogData.writeOperationEnabled}}"
required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">brokerId:</label>
<div class="col-sm-9">
<input class="form-control" ng-model="item.subscriptionGroupConfig.brokerId" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-3">whichBrokerWhenConsumeSlowly:</label>
<div class="col-sm-9">
<input class="form-control"
ng-model="item.subscriptionGroupConfig.whichBrokerWhenConsumeSlowly" type="text"
ng-disabled="{{!ngDialogData.writeOperationEnabled}}" required/>
<span class="text-danger" ng-show="addAppForm.name.$error.required">编号不能为空.</span>
</div>
</div>
</form>
<div class="modal-footer">
<div class="ngdialog-buttons">
<button type="button" class="ngdialog-button ngdialog-button-primary"
ng-disabled="addAppForm.$invalid"
ng-show="{{ngDialogData.writeOperationEnabled}}"
ng-click="postConsumerRequest(item)">{{ 'COMMIT' | translate }}
</button>
<button type="button" class="ngdialog-button ngdialog-button-secondary"
ng-click="closeThisDialog('Cancel')">{{ 'CLOSE' | translate }}
</button>
</div>
</div>
</div>
</div>
</div>
</div>
</script>
<!--consumer monitor config-->
<script type="text/ng-template" id="consumerMonitorDialog">
<div class="modal-header">

View File

@@ -1,67 +0,0 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="container-fluid" id="deployHistoryList">
<div class="page-content">
<h2 class="md-title">ProxyServerAddressList</h2>
<div class="pull-left" style="min-width: 400px; max-width: 500px; padding: 10px 10px 10px 0">
<select ng-model="selectedProxy" chosen
ng-options="x for x in proxyAddrList"
ng-change="updateProxyAddr()"
required></select>
</div>
<div class="pull-left">
<button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}"
ng-click="updateProxyAddr()">{{'UPDATE' | translate}}
</button>
</div>
<form class="form-inline pull-left" style="margin-left: 20px" ng-show="{{writeOperationEnabled}}">
<div class="form-group" style="margin: 0">
<label for="proxyAddr">ProxyAddr:</label>
<input id="proxyAddr" class="form-control" style="width: 300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/>
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="addProxyAddr()"> {{ 'ADD' | translate}}
</button>
</div>
</form>
</div>
</div>
<div class="modal proxyModal fade" role="dialog" tabindex="-1" aria-hidden="true" aria-labelledby="config-modal-label">
<div class="modal-dialog modal-lg">
<div class="modal-content" >
<div class="modal-header">
<button class="close" type="button" data-dismiss="modal">&times;</button>
<h4 id="config-modal-label" class="modal-title">
[{{selectedProxy}}]
</h4>
</div>
<div class="modal-body limit_height">
<table class="table table-bordered">
<tr ng-repeat="(key, value) in allProxyConfig">
<td>{{key}}</td>
<td>{{value}}</td>
</tr>
</table>
</div>
<div class="modal-footer">
<div class="col-md-12 text-center">
<button type="button" class="btn btn-raised" data-dismiss="modal">{{ 'CLOSE' | translate }}</button>
</div>
</div>
</div>
</div>
</div>