mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-05-31 15:23:26 +08:00
Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
@@ -80,7 +80,7 @@ public class MessageServiceImpl implements MessageService {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RMQConfigure configure;
|
private RMQConfigure rMQConfigure;
|
||||||
/**
|
/**
|
||||||
* @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
|
* @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
|
||||||
* @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
|
* @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
|
||||||
@@ -117,12 +117,12 @@ public class MessageServiceImpl implements MessageService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
|
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
|
||||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||||
RPCHook rpcHook = null;
|
RPCHook rpcHook = null;
|
||||||
if (isEnableAcl) {
|
if (isEnableAcl) {
|
||||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey()));
|
||||||
}
|
}
|
||||||
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
|
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
|
||||||
List<MessageView> messageViewList = Lists.newArrayList();
|
List<MessageView> messageViewList = Lists.newArrayList();
|
||||||
try {
|
try {
|
||||||
String subExpression = "*";
|
String subExpression = "*";
|
||||||
@@ -250,12 +250,12 @@ public class MessageServiceImpl implements MessageService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
|
private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
|
||||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||||
RPCHook rpcHook = null;
|
RPCHook rpcHook = null;
|
||||||
if (isEnableAcl) {
|
if (isEnableAcl) {
|
||||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey()));
|
||||||
}
|
}
|
||||||
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
|
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
|
||||||
|
|
||||||
long total = 0;
|
long total = 0;
|
||||||
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
|
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
|
||||||
@@ -391,12 +391,12 @@ public class MessageServiceImpl implements MessageService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
|
private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
|
||||||
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
|
boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
|
||||||
RPCHook rpcHook = null;
|
RPCHook rpcHook = null;
|
||||||
if (isEnableAcl) {
|
if (isEnableAcl) {
|
||||||
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
|
rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey()));
|
||||||
}
|
}
|
||||||
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
|
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook);
|
||||||
List<MessageView> messageViews = new ArrayList<>();
|
List<MessageView> messageViews = new ArrayList<>();
|
||||||
|
|
||||||
long offset = query.getPageNum() * query.getPageSize();
|
long offset = query.getPageNum() * query.getPageSize();
|
||||||
@@ -536,4 +536,7 @@ public class MessageServiceImpl implements MessageService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DefaultMQPullConsumer buildDefaultMQPullConsumer(RPCHook rpcHook) {
|
||||||
|
return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,254 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.rocketmq.console.controller;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.apache.rocketmq.client.QueryResult;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.PullResult;
|
||||||
|
import org.apache.rocketmq.client.consumer.PullStatus;
|
||||||
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.common.message.MessageClientIDSetter;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
import org.apache.rocketmq.common.message.MessageQueue;
|
||||||
|
import org.apache.rocketmq.common.protocol.body.CMResult;
|
||||||
|
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
|
||||||
|
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
|
||||||
|
import org.apache.rocketmq.console.model.QueueOffsetInfo;
|
||||||
|
import org.apache.rocketmq.console.model.request.MessageQuery;
|
||||||
|
import org.apache.rocketmq.console.service.impl.MessageServiceImpl;
|
||||||
|
import org.apache.rocketmq.console.util.MockObjectUtil;
|
||||||
|
import org.apache.rocketmq.tools.admin.api.MessageTrack;
|
||||||
|
import org.apache.rocketmq.tools.admin.api.TrackType;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Spy;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.test.web.servlet.ResultActions;
|
||||||
|
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
|
||||||
|
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
|
||||||
|
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||||
|
|
||||||
|
public class MessageControllerTest extends BaseControllerTest {
|
||||||
|
|
||||||
|
@InjectMocks
|
||||||
|
private MessageController messageController;
|
||||||
|
|
||||||
|
private MockHttpServletRequestBuilder requestBuilder = null;
|
||||||
|
|
||||||
|
private ResultActions perform;
|
||||||
|
|
||||||
|
@Spy
|
||||||
|
private MessageServiceImpl messageService;
|
||||||
|
|
||||||
|
private Set<MessageQueue> messageQueues;
|
||||||
|
private DefaultMQPullConsumer defaultMQPullConsumer;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws Exception {
|
||||||
|
super.mockRmqConfigure();
|
||||||
|
{
|
||||||
|
List<MessageExt> wrappers = new ArrayList<>(1);
|
||||||
|
wrappers.add(MockObjectUtil.createMessageExt());
|
||||||
|
defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
|
||||||
|
messageQueues = new HashSet<>(1);
|
||||||
|
MessageQueue messageQueue = new MessageQueue("topic_test", "broker-a", 0);
|
||||||
|
messageQueues.add(messageQueue);
|
||||||
|
when(defaultMQPullConsumer.fetchSubscribeMessageQueues(anyString())).thenReturn(messageQueues);
|
||||||
|
when(defaultMQPullConsumer.searchOffset(messageQueue, Long.MIN_VALUE)).thenReturn(Long.MIN_VALUE);
|
||||||
|
when(defaultMQPullConsumer.searchOffset(messageQueue, Long.MAX_VALUE)).thenReturn(Long.MAX_VALUE - 10L);
|
||||||
|
PullResult pullResult = mock(PullResult.class);
|
||||||
|
when(defaultMQPullConsumer.pull(any(), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
|
||||||
|
when(pullResult.getNextBeginOffset()).thenReturn(Long.MAX_VALUE);
|
||||||
|
when(pullResult.getPullStatus()).thenReturn(PullStatus.FOUND);
|
||||||
|
when(pullResult.getMsgFoundList()).thenReturn(wrappers);
|
||||||
|
when(messageService.buildDefaultMQPullConsumer(any())).thenReturn(defaultMQPullConsumer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testViewMessage() throws Exception {
|
||||||
|
final String url = "/message/viewMessage.query";
|
||||||
|
{
|
||||||
|
MessageExt messageExt = MockObjectUtil.createMessageExt();
|
||||||
|
when(mqAdminExt.viewMessage(anyString(), anyString()))
|
||||||
|
.thenThrow(new MQClientException(208, "no message"))
|
||||||
|
.thenReturn(messageExt);
|
||||||
|
MessageTrack track = new MessageTrack();
|
||||||
|
track.setConsumerGroup("group_test");
|
||||||
|
track.setTrackType(TrackType.CONSUMED);
|
||||||
|
List<MessageTrack> tracks = new ArrayList<>();
|
||||||
|
tracks.add(track);
|
||||||
|
when(mqAdminExt.messageTrackDetail(any()))
|
||||||
|
.thenThrow(new MQBrokerException(206, "consumer not online"))
|
||||||
|
.thenReturn(tracks);
|
||||||
|
}
|
||||||
|
// no message
|
||||||
|
requestBuilder = MockMvcRequestBuilders.get(url);
|
||||||
|
requestBuilder.param("topic", "topic_test");
|
||||||
|
requestBuilder.param("msgId", "0A9A003F00002A9F0000000000000319");
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
performErrorExpect(perform);
|
||||||
|
|
||||||
|
// consumer not online
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data.messageView.msgId").value("0A9A003F00002A9F0000000000000319"))
|
||||||
|
.andExpect(jsonPath("$.data.messageTrackList", hasSize(0)));
|
||||||
|
|
||||||
|
// query message success and has a group consumed.
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data.messageView.msgId").value("0A9A003F00002A9F0000000000000319"))
|
||||||
|
.andExpect(jsonPath("$.data.messageTrackList", hasSize(1)))
|
||||||
|
.andExpect(jsonPath("$.data.messageTrackList[0].consumerGroup").value("group_test"))
|
||||||
|
.andExpect(jsonPath("$.data.messageTrackList[0].trackType").value(TrackType.CONSUMED.name()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueryMessagePageByTopic() throws Exception {
|
||||||
|
final String url = "/message/queryMessagePageByTopic.query";
|
||||||
|
MessageQuery query = new MessageQuery();
|
||||||
|
query.setPageNum(1);
|
||||||
|
query.setPageSize(10);
|
||||||
|
query.setTopic("topic_test");
|
||||||
|
query.setTaskId("");
|
||||||
|
query.setBegin(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000);
|
||||||
|
query.setEnd(System.currentTimeMillis());
|
||||||
|
|
||||||
|
// missed cache
|
||||||
|
requestBuilder = MockMvcRequestBuilders.post(url);
|
||||||
|
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
|
||||||
|
requestBuilder.content(JSON.toJSONString(query));
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data.page.content", hasSize(1)))
|
||||||
|
.andExpect(jsonPath("$.data.page.content[0].msgId").value("0A9A003F00002A9F0000000000000319"));
|
||||||
|
|
||||||
|
String taskId = MessageClientIDSetter.createUniqID();
|
||||||
|
{
|
||||||
|
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
|
||||||
|
int idx = 0;
|
||||||
|
for (MessageQueue messageQueue : messageQueues) {
|
||||||
|
Long minOffset = defaultMQPullConsumer.searchOffset(messageQueue, query.getBegin());
|
||||||
|
Long maxOffset = defaultMQPullConsumer.searchOffset(messageQueue, query.getEnd()) + 1;
|
||||||
|
queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
|
||||||
|
}
|
||||||
|
// Use reflection to add data to the CACHE
|
||||||
|
Field field = MessageServiceImpl.class.getDeclaredField("CACHE");
|
||||||
|
field.setAccessible(true);
|
||||||
|
Cache<String, List<QueueOffsetInfo>> cache = (Cache<String, List<QueueOffsetInfo>>) field.get(messageService);
|
||||||
|
cache.put(taskId, queueOffsetInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
// hit cache
|
||||||
|
query.setTaskId(taskId);
|
||||||
|
requestBuilder.content(JSON.toJSONString(query));
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data.page.content", hasSize(1)))
|
||||||
|
.andExpect(jsonPath("$.data.page.content[0].msgId").value("0A9A003F00002A9F0000000000000319"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueryMessageByTopicAndKey() throws Exception {
|
||||||
|
final String url = "/message/queryMessageByTopicAndKey.query";
|
||||||
|
{
|
||||||
|
List<MessageExt> messageList = new ArrayList<>(2);
|
||||||
|
messageList.add(MockObjectUtil.createMessageExt());
|
||||||
|
QueryResult queryResult = new QueryResult(System.currentTimeMillis(), messageList);
|
||||||
|
when(mqAdminExt.queryMessage(anyString(), anyString(), anyInt(), anyLong(), anyLong()))
|
||||||
|
.thenReturn(queryResult);
|
||||||
|
}
|
||||||
|
requestBuilder = MockMvcRequestBuilders.get(url);
|
||||||
|
requestBuilder.param("topic", "topic_test");
|
||||||
|
requestBuilder.param("key", "KeyA");
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data", hasSize(1)))
|
||||||
|
.andExpect(jsonPath("$.data[0].msgId").value("0A9A003F00002A9F0000000000000319"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueryMessageByTopic() throws Exception {
|
||||||
|
final String url = "/message/queryMessageByTopic.query";
|
||||||
|
requestBuilder = MockMvcRequestBuilders.get(url);
|
||||||
|
requestBuilder.param("topic", "topic_test")
|
||||||
|
.param("begin", Long.toString(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000))
|
||||||
|
.param("end", Long.toString(System.currentTimeMillis()));
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data", hasSize(1)))
|
||||||
|
.andExpect(jsonPath("$.data[0].msgId").value("0A9A003F00002A9F0000000000000319"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsumeMessageDirectly() throws Exception {
|
||||||
|
final String url = "/message/consumeMessageDirectly.do";
|
||||||
|
{
|
||||||
|
ConsumeMessageDirectlyResult result1 = new ConsumeMessageDirectlyResult();
|
||||||
|
result1.setConsumeResult(CMResult.CR_SUCCESS);
|
||||||
|
ConsumeMessageDirectlyResult result2 = new ConsumeMessageDirectlyResult();
|
||||||
|
result2.setConsumeResult(CMResult.CR_LATER);
|
||||||
|
when(mqAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString()))
|
||||||
|
.thenReturn(result1).thenReturn(result2);
|
||||||
|
ConsumerConnection consumerConnection = MockObjectUtil.createConsumerConnection();
|
||||||
|
when(mqAdminExt.examineConsumerConnectionInfo(anyString()))
|
||||||
|
.thenReturn(consumerConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
// clientId is not empty
|
||||||
|
requestBuilder = MockMvcRequestBuilders.post(url);
|
||||||
|
requestBuilder.param("topic", "topic_test")
|
||||||
|
.param("consumerGroup", "group_test")
|
||||||
|
.param("msgId", "0A9A003F00002A9F0000000000000319")
|
||||||
|
.param("clientId", "127.0.0.1@37540#2295913058176000");
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data.consumeResult").value(CMResult.CR_SUCCESS.name()));
|
||||||
|
|
||||||
|
// clientId is empty
|
||||||
|
requestBuilder = MockMvcRequestBuilders.post(url);
|
||||||
|
requestBuilder.param("topic", "topic_test")
|
||||||
|
.param("consumerGroup", "group_test")
|
||||||
|
.param("msgId", "0A9A003F00002A9F0000000000000319");
|
||||||
|
perform = mockMvc.perform(requestBuilder);
|
||||||
|
perform.andExpect(status().isOk())
|
||||||
|
.andExpect(jsonPath("$.data.consumeResult").value(CMResult.CR_LATER.name()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected Object getTestController() {
|
||||||
|
return messageController;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.rocketmq.console.util;
|
package org.apache.rocketmq.console.util;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
|
|||||||
import org.apache.rocketmq.common.admin.TopicOffset;
|
import org.apache.rocketmq.common.admin.TopicOffset;
|
||||||
import org.apache.rocketmq.common.admin.TopicStatsTable;
|
import org.apache.rocketmq.common.admin.TopicStatsTable;
|
||||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
import org.apache.rocketmq.common.message.MessageQueue;
|
import org.apache.rocketmq.common.message.MessageQueue;
|
||||||
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
|
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
|
||||||
import org.apache.rocketmq.common.protocol.body.Connection;
|
import org.apache.rocketmq.common.protocol.body.Connection;
|
||||||
@@ -118,7 +120,7 @@ public class MockObjectUtil {
|
|||||||
queueData.setPerm(6);
|
queueData.setPerm(6);
|
||||||
queueData.setReadQueueNums(4);
|
queueData.setReadQueueNums(4);
|
||||||
queueData.setWriteQueueNums(4);
|
queueData.setWriteQueueNums(4);
|
||||||
queueData.setTopicSynFlag(0);
|
queueData.setTopicSysFlag(0);
|
||||||
queueDataList.add(queueData);
|
queueDataList.add(queueData);
|
||||||
topicRouteData.setQueueDatas(queueDataList);
|
topicRouteData.setQueueDatas(queueDataList);
|
||||||
return topicRouteData;
|
return topicRouteData;
|
||||||
@@ -183,4 +185,23 @@ public class MockObjectUtil {
|
|||||||
consumerRunningInfo.setProperties(properties);
|
consumerRunningInfo.setProperties(properties);
|
||||||
return consumerRunningInfo;
|
return consumerRunningInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static MessageExt createMessageExt() {
|
||||||
|
MessageExt messageExt = new MessageExt();
|
||||||
|
messageExt.setBrokerName("broker-a");
|
||||||
|
messageExt.setQueueId(0);
|
||||||
|
messageExt.setStoreSize(205);
|
||||||
|
messageExt.setQueueOffset(1L);
|
||||||
|
messageExt.setKeys("KeyA");
|
||||||
|
messageExt.setMsgId("0A9A003F00002A9F0000000000000319");
|
||||||
|
messageExt.setTopic("topic_test");
|
||||||
|
messageExt.setBody("this is message ext body".getBytes());
|
||||||
|
messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 8899));
|
||||||
|
messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 7788));
|
||||||
|
messageExt.setBornTimestamp(System.currentTimeMillis());
|
||||||
|
messageExt.setStoreTimestamp(System.currentTimeMillis());
|
||||||
|
messageExt.setCommitLogOffset(793);
|
||||||
|
messageExt.setReconsumeTimes(0);
|
||||||
|
return messageExt;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user