diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java index 2c633cd..6b9eb67 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java @@ -31,6 +31,7 @@ public class TopicConfigInfo { private int perm; private boolean order; + private String messageType; public List getClusterNameList() { return clusterNameList; } @@ -91,6 +92,18 @@ public class TopicConfigInfo { this.order = order; } + + public String getMessageType() { + return messageType; + } + + public void setMessageType(String messageType) { + this.messageType = messageType; + } + + + + @Override public boolean equals(Object o) { if (this == o) @@ -102,12 +115,13 @@ public class TopicConfigInfo { readQueueNums == that.readQueueNums && perm == that.perm && order == that.order && - Objects.equal(topicName, that.topicName); + Objects.equal(topicName, that.topicName) && + Objects.equal(messageType, that.messageType); } @Override public int hashCode() { - return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order); + return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order,messageType); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java index facf448..12e7f71 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.dashboard.service.impl; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.route.BrokerData; @@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.Arrays; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; @Service public class ClusterServiceImpl implements ClusterService { @@ -56,6 +59,9 @@ public class ClusterServiceImpl implements ClusterService { } resultMap.put("clusterInfo", clusterInfo); resultMap.put("brokerServer", brokerServer); + // add messageType + resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted() + .collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue())))); return resultMap; } catch (Exception err) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index 1d7d571..ecd08de 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -18,25 +18,24 @@ package org.apache.rocketmq.dashboard.service.impl; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; -import org.apache.rocketmq.remoting.protocol.body.GroupList; -import org.apache.rocketmq.remoting.protocol.body.TopicList; -import org.apache.rocketmq.remoting.protocol.route.BrokerData; -import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; @@ -44,6 +43,12 @@ import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo; import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.TopicService; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.tools.command.CommandUtil; import org.joor.Reflect; import org.springframework.beans.BeanUtils; @@ -55,6 +60,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE; @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { @@ -68,18 +78,18 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ TopicList allTopics = mqAdminExt.fetchAllTopicList(); TopicList sysTopics = getSystemTopicList(); Set topics = - allTopics.getTopicList().stream().map(topic -> { - if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { - topic = String.format("%s%s", "%SYS%", topic); - } - return topic; - }).filter(topic -> { - if (skipRetryAndDlq) { - return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) - || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); - } - return true; - }).collect(Collectors.toSet()); + allTopics.getTopicList().stream().map(topic -> { + if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { + topic = String.format("%s%s", "%SYS%", topic); + } + return topic; + }).filter(topic -> { + if (skipRetryAndDlq) { + return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) + || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); + } + return true; + }).collect(Collectors.toSet()); allTopics.getTopicList().clear(); allTopics.getTopicList().addAll(topics); return allTopics; @@ -123,10 +133,15 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { TopicConfig topicConfig = new TopicConfig(); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); + String messageType = topicCreateOrUpdateRequest.getMessageType(); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.NORMAL.name(); + } + topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()), messageType)); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), - topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { + topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); } } catch (Exception err) { @@ -156,6 +171,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); BeanUtils.copyProperties(topicConfig, topicConfigInfo); topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); + String messageType = topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()); + if (StringUtils.isBlank(messageType)) { + messageType = TopicMessageType.UNSPECIFIED.name(); + } + topicConfigInfo.setMessageType(messageType); topicConfigInfoList.add(topicConfigInfo); } return topicConfigInfoList; @@ -226,6 +246,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ return defaultMQProducer; } + public TransactionMQProducer buildTransactionMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) { + TransactionMQProducer defaultMQProducer = new TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC); + defaultMQProducer.setUseTLS(configure.isUseTLS()); + return defaultMQProducer; + } + private TopicList getSystemTopicList() { RPCHook rpcHook = null; boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); @@ -249,32 +275,61 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ @Override public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { - DefaultMQProducer producer = null; + List topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic()); + String messageType = topicConfigInfos.get(0).getMessageType(); AclClientRPCHook rpcHook = null; if (configure.isACLEnabled()) { rpcHook = new AclClientRPCHook(new SessionCredentials( - configure.getAccessKey(), - configure.getSecretKey() + configure.getAccessKey(), + configure.getSecretKey() )); } - producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); - producer.setInstanceName(String.valueOf(System.currentTimeMillis())); - producer.setNamesrvAddr(configure.getNamesrvAddr()); - try { - producer.start(); - Message msg = new Message(sendTopicMessageRequest.getTopic(), - sendTopicMessageRequest.getTag(), - sendTopicMessageRequest.getKey(), - sendTopicMessageRequest.getMessageBody().getBytes() - ); - return producer.send(msg); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } finally { - waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); - producer.shutdown(); + if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) { + // transaction message + TransactionListener transactionListener = new TransactionListenerImpl(); + + TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(configure.getNamesrvAddr()); + producer.setTransactionListener(transactionListener); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.sendMessageInTransaction(msg, null); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); + producer.shutdown(); + } + } else { + // no transaction message + DefaultMQProducer producer = null; + producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(configure.getNamesrvAddr()); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.send(msg); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); + producer.shutdown(); + } } + } private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { @@ -296,4 +351,20 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ } catch (Exception ignore) { } } + + static class TransactionListenerImpl implements TransactionListener { + private AtomicInteger transactionIndex = new AtomicInteger(0); + + private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); + + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0ab405e..090e421 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -42,7 +42,9 @@ rocketmq: # configure multiple namesrv addresses to manage multiple different clusters namesrvAddrs: - 127.0.0.1:9876 - - 127.0.0.2:9876 + # - 127.0.0.2:9876 + # - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876 + # - 10.151.47.30:9876 # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true isVIPChannel: # timeout for mqadminExt, default 5000ms @@ -58,8 +60,8 @@ rocketmq: loginRequired: false useTLS: false # set the accessKey and secretKey if you used acl - accessKey: # if version > 4.4.0 - secretKey: # if version > 4.4.0 +# accessKey: rocketmq2 +# secretKey: 12345678 threadpool: config: diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index f9a4e3c..943ce48 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -123,5 +123,11 @@ var en = { "GROUP_PERM":"Group Permission", "SYNCHRONIZE":"Synchronize Data", "SHOW":"Show", - "HIDE":"Hide" + "HIDE":"Hide", + "MESSAGE_TYPE":"messageType", + "MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL", + "MESSAGE_TYPE_NORMAL": "NORMAL", + "MESSAGE_TYPE_FIFO": "FIFO", + "MESSAGE_TYPE_DELAY": "DELAY", + "MESSAGE_TYPE_TRANSACTION": "TRANSACTION", } diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index b6fa589..8a3b3ff 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -124,5 +124,11 @@ var zh = { "GROUP_PERM":"消费组权限", "SYNCHRONIZE":"同步", "SHOW":"显示", - "HIDE":"隐藏" + "HIDE":"隐藏", + "MESSAGE_TYPE":"消息类型", + "MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息", + "MESSAGE_TYPE_NORMAL": "普通消息", + "MESSAGE_TYPE_FIFO": "顺序消息", + "MESSAGE_TYPE_DELAY": "定时/延时消息", + "MESSAGE_TYPE_TRANSACTION": "事务消息", } \ No newline at end of file diff --git a/src/main/resources/static/src/topic.js b/src/main/resources/static/src/topic.js index 998f219..bce0df8 100644 --- a/src/main/resources/static/src/topic.js +++ b/src/main/resources/static/src/topic.js @@ -328,8 +328,8 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati var bIsUpdate = true; if (request == null) { request = [{ - writeQueueNums: 16, - readQueueNums: 16, + writeQueueNums: 8, + readQueueNums: 8, perm: 6, order: false, topicName: "", @@ -355,6 +355,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati topicRequestList: request, allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable), allBrokerNameList: Object.keys(resp.data.brokerServer), + allMessageTypeList: resp.data.messageTypes, bIsUpdate: bIsUpdate, writeOperationEnabled: $scope.writeOperationEnabled } diff --git a/src/main/resources/static/view/pages/topic.html b/src/main/resources/static/view/pages/topic.html index bea5ac7..7547c10 100644 --- a/src/main/resources/static/view/pages/topic.html +++ b/src/main/resources/static/view/pages/topic.html @@ -63,6 +63,7 @@ +