[ISSUE #673] Add unit tests for ConsumerController. (#742)

Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
zhangjidi2016
2021-07-15 18:31:31 +08:00
committed by GitHub
parent 7f413e10ee
commit 8bb1b96d7b
3 changed files with 464 additions and 87 deletions

View File

@@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.controller;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
import org.apache.rocketmq.console.service.impl.ConsumerServiceImpl;
import org.apache.rocketmq.console.util.MockObjectUtil;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Spy;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.ResultActions;
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public class ConsumerControllerTest extends BaseControllerTest {
@InjectMocks
private ConsumerController consumerController;
private MockHttpServletRequestBuilder requestBuilder = null;
private ResultActions perform;
@Spy
private ConsumerServiceImpl consumerService;
@Before
public void init() throws Exception {
super.mockRmqConfigure();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
SubscriptionGroupWrapper wrapper = MockObjectUtil.createSubscriptionGroupWrapper();
when(mqAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(wrapper);
ConsumeStats stats = MockObjectUtil.createConsumeStats();
when(mqAdminExt.examineConsumeStats(anyString())).thenReturn(stats);
when(mqAdminExt.examineConsumeStats(anyString(), isNull())).thenReturn(stats);
ConsumerConnection connection = MockObjectUtil.createConsumerConnection();
when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(connection);
ConsumerRunningInfo runningInfo = MockObjectUtil.createConsumerRunningInfo();
when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean()))
.thenReturn(runningInfo);
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName("group-test");
when(mqAdminExt.examineSubscriptionGroupConfig(anyString(), anyString()))
.thenReturn(config);
}
@Test
public void testList() throws Exception {
final String url = "/consumer/groupList.query";
requestBuilder = MockMvcRequestBuilders.get(url);
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)))
.andExpect(jsonPath("$.data[0].group").value("group_test"))
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
}
@Test
public void testGroupQuery() throws Exception {
final String url = "/consumer/group.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.group").value("group_test"))
.andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name()));
}
@Test
public void testSkipAccumulate() throws Exception {
final String url = "/consumer/skipAccumulate.do";
resetOffsetOrSkipAccumulate(url, -1L);
}
@Test
public void testResetOffset() throws Exception {
final String url = "/consumer/resetOffset.do";
resetOffsetOrSkipAccumulate(url, System.currentTimeMillis());
}
private void resetOffsetOrSkipAccumulate(String url, Long resetTime) throws Exception {
RollbackStats rollbackStats = new RollbackStats();
rollbackStats.setRollbackOffset(10L);
rollbackStats.setQueueId(5L);
rollbackStats.setBrokerName("broker-a");
Map<MessageQueue, Long> rollbackStatsMap = new HashMap<>(0);
rollbackStatsMap.put(new MessageQueue("topic_test", "broker-a", 5), 10L);
{
MQClientException exception = new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "不在线");
when(mqAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean()))
.thenReturn(rollbackStatsMap).thenThrow(exception);
when(mqAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean()))
.thenReturn(Lists.newArrayList(rollbackStats));
}
ResetOffsetRequest request = new ResetOffsetRequest();
String groupId = "group_test";
request.setTopic("topic_test");
request.setResetTime(resetTime);
request.setConsumerGroupList(Lists.newArrayList(groupId));
// 1、consumer not online
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(request));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").isMap())
.andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList").isArray())
.andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList[0].rollbackOffset").value(10L));
// 2、consumer not online
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk()).andExpect(jsonPath("$.data").isMap());
}
@Test
public void testFetchBrokerNameList() throws Exception {
final String url = "/consumer/fetchBrokerNameList.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)))
.andExpect(jsonPath("$.data[0]").value("broker-a"));
}
@Test
public void testExamineSubscriptionGroupConfig() throws Exception {
final String url = "/consumer/examineSubscriptionGroupConfig.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)));
}
@Test
public void testDelete() throws Exception {
final String url = "/consumer/deleteSubGroup.do";
{
doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString());
}
DeleteSubGroupRequest request = new DeleteSubGroupRequest();
request.setBrokerNameList(Lists.newArrayList("broker-a"));
request.setGroupName("group_test");
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(request));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testCreateOrUpdate() throws Exception {
final String url = "/consumer/createOrUpdate.do";
// 1、clusterName and brokerName all blank
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
ConsumerConfigInfo consumerConfigInfo = new ConsumerConfigInfo();
requestBuilder.content(JSON.toJSONString(consumerConfigInfo));
perform = mockMvc.perform(requestBuilder);
performErrorExpect(perform);
{
doNothing().when(mqAdminExt).createAndUpdateSubscriptionGroupConfig(anyString(), any());
}
List<String> clusterNameList = Lists.newArrayList("DefaultCluster");
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName("group_test");
consumerConfigInfo.setClusterNameList(clusterNameList);
consumerConfigInfo.setSubscriptionGroupConfig(config);
// 2、create consumer
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(consumerConfigInfo));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testQueryConsumerByTopic() throws Exception {
final String url = "/consumer/queryTopicByConsumer.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)))
.andExpect(jsonPath("$.data[0].queueStatInfoList", hasSize(2)));
}
@Test
public void testConsumerConnection() throws Exception {
final String url = "/consumer/consumerConnection.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name()));
}
@Test
public void testGetConsumerRunningInfo() throws Exception {
final String url = "/consumer/consumerRunningInfo.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
requestBuilder.param("clientId", "group_test");
requestBuilder.param("jstack", "true");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.jstack").value("test"));
}
@Override protected Object getTestController() {
return consumerController;
}
}

View File

@@ -14,14 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.controller;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -31,11 +28,8 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -44,13 +38,12 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
import org.apache.rocketmq.console.service.impl.ConsumerServiceImpl;
import org.apache.rocketmq.console.service.impl.TopicServiceImpl;
import org.apache.rocketmq.console.util.MockObjectUtil;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
@@ -142,7 +135,7 @@ public class TopicControllerTest extends BaseControllerTest {
@Test
public void testStat() throws Exception {
{
TopicStatsTable topicStatsTable = createTopicStatsTable();
TopicStatsTable topicStatsTable = MockObjectUtil.createTopicStatsTable();
when(mqAdminExt.examineTopicStats(anyString())).thenReturn(topicStatsTable);
}
final String url = "/topic/stats.query";
@@ -155,7 +148,7 @@ public class TopicControllerTest extends BaseControllerTest {
@Test
public void testRoute() throws Exception {
{
TopicRouteData topicRouteData = createTopicRouteData();
TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData();
when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData);
}
final String url = "/topic/route.query";
@@ -178,7 +171,7 @@ public class TopicControllerTest extends BaseControllerTest {
performErrorExpect(perform);
{
ClusterInfo clusterInfo = createClusterInfo();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
doNothing().when(mqAdminExt).createAndUpdateTopicConfig(anyString(), any());
}
@@ -202,9 +195,9 @@ public class TopicControllerTest extends BaseControllerTest {
public void testExamineTopicConfig() throws Exception {
final String url = "/topic/examineTopicConfig.query";
{
TopicRouteData topicRouteData = createTopicRouteData();
TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData();
when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData);
ClusterInfo clusterInfo = createClusterInfo();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
when(mqAdminExt.examineTopicConfig(anyString(), anyString())).thenReturn(new TopicConfig(topicName));
}
@@ -222,7 +215,7 @@ public class TopicControllerTest extends BaseControllerTest {
GroupList list = new GroupList();
list.setGroupList(Sets.newHashSet("group1"));
when(mqAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(list);
ConsumeStats consumeStats = createConsumeStats();
ConsumeStats consumeStats = MockObjectUtil.createConsumeStats();
when(mqAdminExt.examineConsumeStats(anyString(), anyString())).thenReturn(consumeStats);
when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(new ConsumerConnection());
when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())).thenReturn(new ConsumerRunningInfo());
@@ -279,7 +272,7 @@ public class TopicControllerTest extends BaseControllerTest {
public void testDelete() throws Exception {
final String url = "/topic/deleteTopic.do";
{
ClusterInfo clusterInfo = createClusterInfo();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
@@ -302,7 +295,7 @@ public class TopicControllerTest extends BaseControllerTest {
@Test
public void testDeleteTopicByBroker() throws Exception {
{
ClusterInfo clusterInfo = createClusterInfo();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
}
@@ -315,77 +308,6 @@ public class TopicControllerTest extends BaseControllerTest {
.andExpect(jsonPath("$.data").value(true));
}
private ConsumeStats createConsumeStats(){
ConsumeStats stats = new ConsumeStats();
HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
OffsetWrapper wrapper = new OffsetWrapper();
wrapper.setBrokerOffset(10);
wrapper.setConsumerOffset(7);
wrapper.setLastTimestamp(System.currentTimeMillis());
offsetTable.put(new MessageQueue(topicName, "broker-a", 1), wrapper);
offsetTable.put(new MessageQueue(topicName, "broker-a", 2), wrapper);
stats.setOffsetTable(offsetTable);
return stats;
}
private ClusterInfo createClusterInfo() {
ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = new HashMap<>(3);
Set<String> brokerNameSet = new HashSet<>(3);
brokerNameSet.add("broker-a");
clusterAddrTable.put("DefaultCluster", brokerNameSet);
clusterInfo.setClusterAddrTable(clusterAddrTable);
HashMap<String, BrokerData> brokerAddrTable = new HashMap<>(3);
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-a");
HashMap<Long, String> brokerAddrs = new HashMap<>(2);
brokerAddrs.put(MixAll.MASTER_ID, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerAddrTable.put("broker-a", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddrTable);
return clusterInfo;
}
private TopicStatsTable createTopicStatsTable() {
TopicStatsTable topicStatsTable = new TopicStatsTable();
HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<>();
MessageQueue queue = new MessageQueue("topic_test", "broker-a", 2);
TopicOffset offset = new TopicOffset();
offset.setMinOffset(0);
offset.setMaxOffset(100);
offset.setLastUpdateTimestamp(System.currentTimeMillis());
offsetTable.put(queue, offset);
topicStatsTable.setOffsetTable(offsetTable);
return topicStatsTable;
}
private TopicRouteData createTopicRouteData() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<>());
List<BrokerData> brokerDataList = new ArrayList<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-a");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker-a");
queueData.setPerm(6);
queueData.setReadQueueNums(4);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
@Override protected Object getTestController() {
return topicController;
}

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.util;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY;
public class MockObjectUtil {
public static ConsumeStats createConsumeStats() {
ConsumeStats stats = new ConsumeStats();
HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
OffsetWrapper wrapper = new OffsetWrapper();
wrapper.setBrokerOffset(10);
wrapper.setConsumerOffset(7);
wrapper.setLastTimestamp(System.currentTimeMillis());
offsetTable.put(new MessageQueue("topic_test", "broker-a", 1), wrapper);
offsetTable.put(new MessageQueue("topic_test", "broker-a", 2), wrapper);
stats.setOffsetTable(offsetTable);
return stats;
}
public static ClusterInfo createClusterInfo() {
ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = new HashMap<>(3);
Set<String> brokerNameSet = new HashSet<>(3);
brokerNameSet.add("broker-a");
clusterAddrTable.put("DefaultCluster", brokerNameSet);
clusterInfo.setClusterAddrTable(clusterAddrTable);
HashMap<String, BrokerData> brokerAddrTable = new HashMap<>(3);
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-a");
HashMap<Long, String> brokerAddrs = new HashMap<>(2);
brokerAddrs.put(MixAll.MASTER_ID, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerAddrTable.put("broker-a", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddrTable);
return clusterInfo;
}
public static TopicStatsTable createTopicStatsTable() {
TopicStatsTable topicStatsTable = new TopicStatsTable();
HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<>();
MessageQueue queue = new MessageQueue("topic_test", "broker-a", 2);
TopicOffset offset = new TopicOffset();
offset.setMinOffset(0);
offset.setMaxOffset(100);
offset.setLastUpdateTimestamp(System.currentTimeMillis());
offsetTable.put(queue, offset);
topicStatsTable.setOffsetTable(offsetTable);
return topicStatsTable;
}
public static TopicRouteData createTopicRouteData() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<>());
List<BrokerData> brokerDataList = new ArrayList<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-a");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker-a");
queueData.setPerm(6);
queueData.setReadQueueNums(4);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
public static SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName("group_test");
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap(2);
subscriptionGroupTable.put("group_test", config);
wrapper.setSubscriptionGroupTable(subscriptionGroupTable);
wrapper.setDataVersion(new DataVersion());
return wrapper;
}
public static ConsumerConnection createConsumerConnection() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connections = new HashSet<Connection>();
Connection conn = new Connection();
conn.setClientAddr("127.0.0.1");
conn.setClientId("clientId");
conn.setVersion(LanguageCode.JAVA.getCode());
connections.add(conn);
ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionTable.put("topic_test", subscriptionData);
ConsumeType consumeType = ConsumeType.CONSUME_ACTIVELY;
MessageModel messageModel = MessageModel.CLUSTERING;
ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(subscriptionTable);
consumerConnection.setConsumeType(consumeType);
consumerConnection.setMessageModel(messageModel);
consumerConnection.setConsumeFromWhere(consumeFromWhere);
return consumerConnection;
}
public static ConsumerRunningInfo createConsumerRunningInfo() {
ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
consumerRunningInfo.setJstack("test");
TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
MessageQueue messageQueue = new MessageQueue("topic_test", "broker-a", 1);
mqTable.put(messageQueue, new ProcessQueueInfo());
consumerRunningInfo.setMqTable(mqTable);
TreeMap<String, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
statusTable.put("topic_test", new ConsumeStatus());
consumerRunningInfo.setStatusTable(statusTable);
TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
subscriptionSet.add(new SubscriptionData());
consumerRunningInfo.setSubscriptionSet(subscriptionSet);
Properties properties = new Properties();
properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY.name());
properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, Long.toString(System.currentTimeMillis()));
consumerRunningInfo.setProperties(properties);
return consumerRunningInfo;
}
}