feat: add topic message type

add message type
This commit is contained in:
guangdashao
2024-06-04 11:40:46 +08:00
committed by GitHub
parent 2fb0fce0b1
commit 823bce2b8b
8 changed files with 168 additions and 49 deletions

View File

@@ -31,6 +31,7 @@ public class TopicConfigInfo {
private int perm; private int perm;
private boolean order; private boolean order;
private String messageType;
public List<String> getClusterNameList() { public List<String> getClusterNameList() {
return clusterNameList; return clusterNameList;
} }
@@ -91,6 +92,18 @@ public class TopicConfigInfo {
this.order = order; this.order = order;
} }
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) if (this == o)
@@ -102,12 +115,13 @@ public class TopicConfigInfo {
readQueueNums == that.readQueueNums && readQueueNums == that.readQueueNums &&
perm == that.perm && perm == that.perm &&
order == that.order && order == that.order &&
Objects.equal(topicName, that.topicName); Objects.equal(topicName, that.topicName) &&
Objects.equal(messageType, that.messageType);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order); return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order,messageType);
} }
} }

View File

@@ -17,6 +17,7 @@
package org.apache.rocketmq.dashboard.service.impl; package org.apache.rocketmq.dashboard.service.impl;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors;
@Service @Service
public class ClusterServiceImpl implements ClusterService { public class ClusterServiceImpl implements ClusterService {
@@ -56,6 +59,9 @@ public class ClusterServiceImpl implements ClusterService {
} }
resultMap.put("clusterInfo", clusterInfo); resultMap.put("clusterInfo", clusterInfo);
resultMap.put("brokerServer", brokerServer); resultMap.put("brokerServer", brokerServer);
// add messageType
resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted()
.collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue()))));
return resultMap; return resultMap;
} }
catch (Exception err) { catch (Exception err) {

View File

@@ -18,25 +18,24 @@
package org.apache.rocketmq.dashboard.service.impl; package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest; import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
@@ -44,6 +43,12 @@ import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.AbstractCommonService; import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.TopicService; import org.apache.rocketmq.dashboard.service.TopicService;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.CommandUtil;
import org.joor.Reflect; import org.joor.Reflect;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
@@ -55,6 +60,11 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
@Service @Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService { public class TopicServiceImpl extends AbstractCommonService implements TopicService {
@@ -68,18 +78,18 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
TopicList allTopics = mqAdminExt.fetchAllTopicList(); TopicList allTopics = mqAdminExt.fetchAllTopicList();
TopicList sysTopics = getSystemTopicList(); TopicList sysTopics = getSystemTopicList();
Set<String> topics = Set<String> topics =
allTopics.getTopicList().stream().map(topic -> { allTopics.getTopicList().stream().map(topic -> {
if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) { if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
topic = String.format("%s%s", "%SYS%", topic); topic = String.format("%s%s", "%SYS%", topic);
} }
return topic; return topic;
}).filter(topic -> { }).filter(topic -> {
if (skipRetryAndDlq) { if (skipRetryAndDlq) {
return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)); || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
} }
return true; return true;
}).collect(Collectors.toSet()); }).collect(Collectors.toSet());
allTopics.getTopicList().clear(); allTopics.getTopicList().clear();
allTopics.getTopicList().addAll(topics); allTopics.getTopicList().addAll(topics);
return allTopics; return allTopics;
@@ -123,10 +133,15 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
TopicConfig topicConfig = new TopicConfig(); TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
String messageType = topicCreateOrUpdateRequest.getMessageType();
if (StringUtils.isBlank(messageType)) {
messageType = TopicMessageType.NORMAL.name();
}
topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()), messageType));
try { try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) {
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
} }
} catch (Exception err) { } catch (Exception err) {
@@ -156,6 +171,11 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName());
BeanUtils.copyProperties(topicConfig, topicConfigInfo); BeanUtils.copyProperties(topicConfig, topicConfigInfo);
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
String messageType = topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
if (StringUtils.isBlank(messageType)) {
messageType = TopicMessageType.UNSPECIFIED.name();
}
topicConfigInfo.setMessageType(messageType);
topicConfigInfoList.add(topicConfigInfo); topicConfigInfoList.add(topicConfigInfo);
} }
return topicConfigInfoList; return topicConfigInfoList;
@@ -226,6 +246,12 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
return defaultMQProducer; return defaultMQProducer;
} }
public TransactionMQProducer buildTransactionMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
TransactionMQProducer defaultMQProducer = new TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
}
private TopicList getSystemTopicList() { private TopicList getSystemTopicList() {
RPCHook rpcHook = null; RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
@@ -249,32 +275,61 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
@Override @Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
DefaultMQProducer producer = null; List<TopicConfigInfo> topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic());
String messageType = topicConfigInfos.get(0).getMessageType();
AclClientRPCHook rpcHook = null; AclClientRPCHook rpcHook = null;
if (configure.isACLEnabled()) { if (configure.isACLEnabled()) {
rpcHook = new AclClientRPCHook(new SessionCredentials( rpcHook = new AclClientRPCHook(new SessionCredentials(
configure.getAccessKey(), configure.getAccessKey(),
configure.getSecretKey() configure.getSecretKey()
)); ));
} }
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled()); if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) {
producer.setInstanceName(String.valueOf(System.currentTimeMillis())); // transaction message
producer.setNamesrvAddr(configure.getNamesrvAddr()); TransactionListener transactionListener = new TransactionListenerImpl();
try {
producer.start(); TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
Message msg = new Message(sendTopicMessageRequest.getTopic(), producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
sendTopicMessageRequest.getTag(), producer.setNamesrvAddr(configure.getNamesrvAddr());
sendTopicMessageRequest.getKey(), producer.setTransactionListener(transactionListener);
sendTopicMessageRequest.getMessageBody().getBytes() try {
); producer.start();
return producer.send(msg); Message msg = new Message(sendTopicMessageRequest.getTopic(),
} catch (Exception e) { sendTopicMessageRequest.getTag(),
Throwables.throwIfUnchecked(e); sendTopicMessageRequest.getKey(),
throw new RuntimeException(e); sendTopicMessageRequest.getMessageBody().getBytes()
} finally { );
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled()); return producer.sendMessageInTransaction(msg, null);
producer.shutdown(); } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown();
}
} else {
// no transaction message
DefaultMQProducer producer = null;
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
try {
producer.start();
Message msg = new Message(sendTopicMessageRequest.getTopic(),
sendTopicMessageRequest.getTag(),
sendTopicMessageRequest.getKey(),
sendTopicMessageRequest.getMessageBody().getBytes()
);
return producer.send(msg);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown();
}
} }
} }
private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) { private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
@@ -296,4 +351,20 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
} catch (Exception ignore) { } catch (Exception ignore) {
} }
} }
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
} }

View File

@@ -42,7 +42,9 @@ rocketmq:
# configure multiple namesrv addresses to manage multiple different clusters # configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs: namesrvAddrs:
- 127.0.0.1:9876 - 127.0.0.1:9876
- 127.0.0.2:9876 # - 127.0.0.2:9876
# - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876
# - 10.151.47.30:9876
# if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
isVIPChannel: isVIPChannel:
# timeout for mqadminExt, default 5000ms # timeout for mqadminExt, default 5000ms
@@ -58,8 +60,8 @@ rocketmq:
loginRequired: false loginRequired: false
useTLS: false useTLS: false
# set the accessKey and secretKey if you used acl # set the accessKey and secretKey if you used acl
accessKey: # if version > 4.4.0 # accessKey: rocketmq2
secretKey: # if version > 4.4.0 # secretKey: 12345678
threadpool: threadpool:
config: config:

View File

@@ -123,5 +123,11 @@ var en = {
"GROUP_PERM":"Group Permission", "GROUP_PERM":"Group Permission",
"SYNCHRONIZE":"Synchronize Data", "SYNCHRONIZE":"Synchronize Data",
"SHOW":"Show", "SHOW":"Show",
"HIDE":"Hide" "HIDE":"Hide",
"MESSAGE_TYPE":"messageType",
"MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL",
"MESSAGE_TYPE_NORMAL": "NORMAL",
"MESSAGE_TYPE_FIFO": "FIFO",
"MESSAGE_TYPE_DELAY": "DELAY",
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
} }

View File

@@ -124,5 +124,11 @@ var zh = {
"GROUP_PERM":"消费组权限", "GROUP_PERM":"消费组权限",
"SYNCHRONIZE":"同步", "SYNCHRONIZE":"同步",
"SHOW":"显示", "SHOW":"显示",
"HIDE":"隐藏" "HIDE":"隐藏",
"MESSAGE_TYPE":"消息类型",
"MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息",
"MESSAGE_TYPE_NORMAL": "普通消息",
"MESSAGE_TYPE_FIFO": "顺序消息",
"MESSAGE_TYPE_DELAY": "定时/延时消息",
"MESSAGE_TYPE_TRANSACTION": "事务消息",
} }

View File

@@ -328,8 +328,8 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
var bIsUpdate = true; var bIsUpdate = true;
if (request == null) { if (request == null) {
request = [{ request = [{
writeQueueNums: 16, writeQueueNums: 8,
readQueueNums: 16, readQueueNums: 8,
perm: 6, perm: 6,
order: false, order: false,
topicName: "", topicName: "",
@@ -355,6 +355,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
topicRequestList: request, topicRequestList: request,
allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable), allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable),
allBrokerNameList: Object.keys(resp.data.brokerServer), allBrokerNameList: Object.keys(resp.data.brokerServer),
allMessageTypeList: resp.data.messageTypes,
bIsUpdate: bIsUpdate, bIsUpdate: bIsUpdate,
writeOperationEnabled: $scope.writeOperationEnabled writeOperationEnabled: $scope.writeOperationEnabled
} }

View File

@@ -63,6 +63,7 @@
<button class="btn btn-raised btn-sm btn-primary" type="button" <button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="openUpdateDialog(topic, sysFlag)">topic {{'CONFIG' |translate}} ng-click="openUpdateDialog(topic, sysFlag)">topic {{'CONFIG' |translate}}
</button> </button>
<!-- todo 发送消息,根据消息类型判断-->
<button class="btn btn-raised btn-sm btn-primary" type="button" <button class="btn btn-raised btn-sm btn-primary" type="button"
ng-show="{{!sysFlag}}" ng-show="{{!sysFlag}}"
ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}} ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}}
@@ -189,6 +190,18 @@
<span class="text-danger" ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span> <span class="text-danger" ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span>
</div> </div>
</div> </div>
<!-- 设置topic 类型 -->
<div class="form-group">
<label class="control-label col-sm-2">{{'MESSAGE_TYPE'|translate}}:</label>
<div class="col-sm-10">
<select name="mySelectMessageType" chosen ng-disabled="ngDialogData.bIsUpdate"
ng-model="item.messageType"
ng-options="messageType as value | translate disable when messageType=='UNSPECIFIED' for (messageType , value) in ngDialogData.allMessageTypeList"
>
<option value=""></option>
</select>
</div>
</div>
<div class="form-group"> <div class="form-group">
<label class="control-label col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label> <label class="control-label col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label>
<div class="col-sm-10"> <div class="col-sm-10">