From 8bb1b96d7b4f908384fc8a4dd7ffe14e0d74b110 Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Thu, 15 Jul 2021 18:31:31 +0800 Subject: [PATCH] [ISSUE #673] Add unit tests for ConsumerController. (#742) Co-authored-by: zhangjidi2016 --- .../controller/ConsumerControllerTest.java | 269 ++++++++++++++++++ .../controller/TopicControllerTest.java | 96 +------ .../rocketmq/console/util/MockObjectUtil.java | 186 ++++++++++++ 3 files changed, 464 insertions(+), 87 deletions(-) create mode 100644 src/test/java/org/apache/rocketmq/console/controller/ConsumerControllerTest.java create mode 100644 src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java diff --git a/src/test/java/org/apache/rocketmq/console/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/console/controller/ConsumerControllerTest.java new file mode 100644 index 0000000..ae40897 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/controller/ConsumerControllerTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.controller; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Lists; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.console.model.request.ConsumerConfigInfo; +import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest; +import org.apache.rocketmq.console.model.request.ResetOffsetRequest; +import org.apache.rocketmq.console.service.impl.ConsumerServiceImpl; +import org.apache.rocketmq.console.util.MockObjectUtil; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Spy; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.ResultActions; +import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; + +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +public class ConsumerControllerTest extends BaseControllerTest { + + @InjectMocks + private ConsumerController consumerController; + + private MockHttpServletRequestBuilder requestBuilder = null; + + private ResultActions perform; + + @Spy + private ConsumerServiceImpl consumerService; + + @Before + public void init() throws Exception { + super.mockRmqConfigure(); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); + when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + SubscriptionGroupWrapper wrapper = MockObjectUtil.createSubscriptionGroupWrapper(); + when(mqAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(wrapper); + ConsumeStats stats = MockObjectUtil.createConsumeStats(); + when(mqAdminExt.examineConsumeStats(anyString())).thenReturn(stats); + when(mqAdminExt.examineConsumeStats(anyString(), isNull())).thenReturn(stats); + ConsumerConnection connection = MockObjectUtil.createConsumerConnection(); + when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(connection); + ConsumerRunningInfo runningInfo = MockObjectUtil.createConsumerRunningInfo(); + when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())) + .thenReturn(runningInfo); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName("group-test"); + when(mqAdminExt.examineSubscriptionGroupConfig(anyString(), anyString())) + .thenReturn(config); + } + + @Test + public void testList() throws Exception { + final String url = "/consumer/groupList.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(1))) + .andExpect(jsonPath("$.data[0].group").value("group_test")) + .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) + .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name())); + } + + @Test + public void testGroupQuery() throws Exception { + final String url = "/consumer/group.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("consumerGroup", "group_test"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.group").value("group_test")) + .andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) + .andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name())); + } + + @Test + public void testSkipAccumulate() throws Exception { + final String url = "/consumer/skipAccumulate.do"; + resetOffsetOrSkipAccumulate(url, -1L); + } + + @Test + public void testResetOffset() throws Exception { + final String url = "/consumer/resetOffset.do"; + resetOffsetOrSkipAccumulate(url, System.currentTimeMillis()); + } + + private void resetOffsetOrSkipAccumulate(String url, Long resetTime) throws Exception { + RollbackStats rollbackStats = new RollbackStats(); + rollbackStats.setRollbackOffset(10L); + rollbackStats.setQueueId(5L); + rollbackStats.setBrokerName("broker-a"); + Map rollbackStatsMap = new HashMap<>(0); + rollbackStatsMap.put(new MessageQueue("topic_test", "broker-a", 5), 10L); + { + MQClientException exception = new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "不在线"); + when(mqAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean())) + .thenReturn(rollbackStatsMap).thenThrow(exception); + when(mqAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean())) + .thenReturn(Lists.newArrayList(rollbackStats)); + } + ResetOffsetRequest request = new ResetOffsetRequest(); + String groupId = "group_test"; + request.setTopic("topic_test"); + request.setResetTime(resetTime); + request.setConsumerGroupList(Lists.newArrayList(groupId)); + // 1、consumer not online + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + requestBuilder.content(JSON.toJSONString(request)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data").isMap()) + .andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList").isArray()) + .andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList[0].rollbackOffset").value(10L)); + + // 2、consumer not online + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()).andExpect(jsonPath("$.data").isMap()); + } + + @Test + public void testFetchBrokerNameList() throws Exception { + final String url = "/consumer/fetchBrokerNameList.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("consumerGroup", "group_test"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(1))) + .andExpect(jsonPath("$.data[0]").value("broker-a")); + } + + @Test + public void testExamineSubscriptionGroupConfig() throws Exception { + final String url = "/consumer/examineSubscriptionGroupConfig.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("consumerGroup", "group_test"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(1))); + } + + @Test + public void testDelete() throws Exception { + final String url = "/consumer/deleteSubGroup.do"; + { + doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString()); + } + DeleteSubGroupRequest request = new DeleteSubGroupRequest(); + request.setBrokerNameList(Lists.newArrayList("broker-a")); + request.setGroupName("group_test"); + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + requestBuilder.content(JSON.toJSONString(request)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data").value(true)); + } + + @Test + public void testCreateOrUpdate() throws Exception { + final String url = "/consumer/createOrUpdate.do"; + // 1、clusterName and brokerName all blank + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + ConsumerConfigInfo consumerConfigInfo = new ConsumerConfigInfo(); + requestBuilder.content(JSON.toJSONString(consumerConfigInfo)); + perform = mockMvc.perform(requestBuilder); + performErrorExpect(perform); + + { + doNothing().when(mqAdminExt).createAndUpdateSubscriptionGroupConfig(anyString(), any()); + } + + List clusterNameList = Lists.newArrayList("DefaultCluster"); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName("group_test"); + consumerConfigInfo.setClusterNameList(clusterNameList); + consumerConfigInfo.setSubscriptionGroupConfig(config); + // 2、create consumer + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + requestBuilder.content(JSON.toJSONString(consumerConfigInfo)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data").value(true)); + } + + @Test + public void testQueryConsumerByTopic() throws Exception { + final String url = "/consumer/queryTopicByConsumer.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("consumerGroup", "group_test"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(1))) + .andExpect(jsonPath("$.data[0].queueStatInfoList", hasSize(2))); + } + + @Test + public void testConsumerConnection() throws Exception { + final String url = "/consumer/consumerConnection.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("consumerGroup", "group_test"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) + .andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name())); + } + + @Test + public void testGetConsumerRunningInfo() throws Exception { + final String url = "/consumer/consumerRunningInfo.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("consumerGroup", "group_test"); + requestBuilder.param("clientId", "group_test"); + requestBuilder.param("jstack", "true"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.jstack").value("test")); + } + + @Override protected Object getTestController() { + return consumerController; + } +} diff --git a/src/test/java/org/apache/rocketmq/console/controller/TopicControllerTest.java b/src/test/java/org/apache/rocketmq/console/controller/TopicControllerTest.java index ed52469..fbab910 100644 --- a/src/test/java/org/apache/rocketmq/console/controller/TopicControllerTest.java +++ b/src/test/java/org/apache/rocketmq/console/controller/TopicControllerTest.java @@ -14,14 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.console.controller; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -31,11 +28,8 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.OffsetWrapper; -import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; @@ -44,13 +38,12 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.console.model.request.SendTopicMessageRequest; import org.apache.rocketmq.console.model.request.TopicConfigInfo; import org.apache.rocketmq.console.service.impl.ConsumerServiceImpl; import org.apache.rocketmq.console.service.impl.TopicServiceImpl; +import org.apache.rocketmq.console.util.MockObjectUtil; import org.junit.Before; import org.junit.Test; import org.mockito.InjectMocks; @@ -142,7 +135,7 @@ public class TopicControllerTest extends BaseControllerTest { @Test public void testStat() throws Exception { { - TopicStatsTable topicStatsTable = createTopicStatsTable(); + TopicStatsTable topicStatsTable = MockObjectUtil.createTopicStatsTable(); when(mqAdminExt.examineTopicStats(anyString())).thenReturn(topicStatsTable); } final String url = "/topic/stats.query"; @@ -155,7 +148,7 @@ public class TopicControllerTest extends BaseControllerTest { @Test public void testRoute() throws Exception { { - TopicRouteData topicRouteData = createTopicRouteData(); + TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData(); when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData); } final String url = "/topic/route.query"; @@ -178,7 +171,7 @@ public class TopicControllerTest extends BaseControllerTest { performErrorExpect(perform); { - ClusterInfo clusterInfo = createClusterInfo(); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); doNothing().when(mqAdminExt).createAndUpdateTopicConfig(anyString(), any()); } @@ -202,9 +195,9 @@ public class TopicControllerTest extends BaseControllerTest { public void testExamineTopicConfig() throws Exception { final String url = "/topic/examineTopicConfig.query"; { - TopicRouteData topicRouteData = createTopicRouteData(); + TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData(); when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData); - ClusterInfo clusterInfo = createClusterInfo(); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); when(mqAdminExt.examineTopicConfig(anyString(), anyString())).thenReturn(new TopicConfig(topicName)); } @@ -222,7 +215,7 @@ public class TopicControllerTest extends BaseControllerTest { GroupList list = new GroupList(); list.setGroupList(Sets.newHashSet("group1")); when(mqAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(list); - ConsumeStats consumeStats = createConsumeStats(); + ConsumeStats consumeStats = MockObjectUtil.createConsumeStats(); when(mqAdminExt.examineConsumeStats(anyString(), anyString())).thenReturn(consumeStats); when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(new ConsumerConnection()); when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())).thenReturn(new ConsumerRunningInfo()); @@ -279,7 +272,7 @@ public class TopicControllerTest extends BaseControllerTest { public void testDelete() throws Exception { final String url = "/topic/deleteTopic.do"; { - ClusterInfo clusterInfo = createClusterInfo(); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString()); doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString()); @@ -302,7 +295,7 @@ public class TopicControllerTest extends BaseControllerTest { @Test public void testDeleteTopicByBroker() throws Exception { { - ClusterInfo clusterInfo = createClusterInfo(); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString()); } @@ -315,77 +308,6 @@ public class TopicControllerTest extends BaseControllerTest { .andExpect(jsonPath("$.data").value(true)); } - - private ConsumeStats createConsumeStats(){ - ConsumeStats stats = new ConsumeStats(); - HashMap offsetTable = new HashMap(); - OffsetWrapper wrapper = new OffsetWrapper(); - wrapper.setBrokerOffset(10); - wrapper.setConsumerOffset(7); - wrapper.setLastTimestamp(System.currentTimeMillis()); - offsetTable.put(new MessageQueue(topicName, "broker-a", 1), wrapper); - offsetTable.put(new MessageQueue(topicName, "broker-a", 2), wrapper); - stats.setOffsetTable(offsetTable); - return stats; - } - - private ClusterInfo createClusterInfo() { - ClusterInfo clusterInfo = new ClusterInfo(); - HashMap> clusterAddrTable = new HashMap<>(3); - Set brokerNameSet = new HashSet<>(3); - brokerNameSet.add("broker-a"); - clusterAddrTable.put("DefaultCluster", brokerNameSet); - clusterInfo.setClusterAddrTable(clusterAddrTable); - HashMap brokerAddrTable = new HashMap<>(3); - BrokerData brokerData = new BrokerData(); - brokerData.setBrokerName("broker-a"); - HashMap brokerAddrs = new HashMap<>(2); - brokerAddrs.put(MixAll.MASTER_ID, "127.0.0.1:10911"); - brokerData.setBrokerAddrs(brokerAddrs); - brokerAddrTable.put("broker-a", brokerData); - clusterInfo.setBrokerAddrTable(brokerAddrTable); - return clusterInfo; - } - - private TopicStatsTable createTopicStatsTable() { - TopicStatsTable topicStatsTable = new TopicStatsTable(); - HashMap offsetTable = new HashMap<>(); - MessageQueue queue = new MessageQueue("topic_test", "broker-a", 2); - TopicOffset offset = new TopicOffset(); - offset.setMinOffset(0); - offset.setMaxOffset(100); - offset.setLastUpdateTimestamp(System.currentTimeMillis()); - offsetTable.put(queue, offset); - topicStatsTable.setOffsetTable(offsetTable); - return topicStatsTable; - } - - private TopicRouteData createTopicRouteData() { - TopicRouteData topicRouteData = new TopicRouteData(); - - topicRouteData.setFilterServerTable(new HashMap<>()); - List brokerDataList = new ArrayList<>(); - BrokerData brokerData = new BrokerData(); - brokerData.setBrokerName("broker-a"); - brokerData.setCluster("DefaultCluster"); - HashMap brokerAddrs = new HashMap<>(); - brokerAddrs.put(0L, "127.0.0.1:10911"); - brokerData.setBrokerAddrs(brokerAddrs); - brokerDataList.add(brokerData); - topicRouteData.setBrokerDatas(brokerDataList); - - List queueDataList = new ArrayList<>(); - QueueData queueData = new QueueData(); - queueData.setBrokerName("broker-a"); - queueData.setPerm(6); - queueData.setReadQueueNums(4); - queueData.setWriteQueueNums(4); - queueData.setTopicSysFlag(0); - queueDataList.add(queueData); - topicRouteData.setQueueDatas(queueDataList); - return topicRouteData; - } - @Override protected Object getTestController() { return topicController; } diff --git a/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java b/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java new file mode 100644 index 0000000..733de1e --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.LanguageCode; + +import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; + +public class MockObjectUtil { + + public static ConsumeStats createConsumeStats() { + ConsumeStats stats = new ConsumeStats(); + HashMap offsetTable = new HashMap(); + OffsetWrapper wrapper = new OffsetWrapper(); + wrapper.setBrokerOffset(10); + wrapper.setConsumerOffset(7); + wrapper.setLastTimestamp(System.currentTimeMillis()); + offsetTable.put(new MessageQueue("topic_test", "broker-a", 1), wrapper); + offsetTable.put(new MessageQueue("topic_test", "broker-a", 2), wrapper); + stats.setOffsetTable(offsetTable); + return stats; + } + + public static ClusterInfo createClusterInfo() { + ClusterInfo clusterInfo = new ClusterInfo(); + HashMap> clusterAddrTable = new HashMap<>(3); + Set brokerNameSet = new HashSet<>(3); + brokerNameSet.add("broker-a"); + clusterAddrTable.put("DefaultCluster", brokerNameSet); + clusterInfo.setClusterAddrTable(clusterAddrTable); + HashMap brokerAddrTable = new HashMap<>(3); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker-a"); + HashMap brokerAddrs = new HashMap<>(2); + brokerAddrs.put(MixAll.MASTER_ID, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerAddrTable.put("broker-a", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + return clusterInfo; + } + + public static TopicStatsTable createTopicStatsTable() { + TopicStatsTable topicStatsTable = new TopicStatsTable(); + HashMap offsetTable = new HashMap<>(); + MessageQueue queue = new MessageQueue("topic_test", "broker-a", 2); + TopicOffset offset = new TopicOffset(); + offset.setMinOffset(0); + offset.setMaxOffset(100); + offset.setLastUpdateTimestamp(System.currentTimeMillis()); + offsetTable.put(queue, offset); + topicStatsTable.setOffsetTable(offsetTable); + return topicStatsTable; + } + + public static TopicRouteData createTopicRouteData() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap<>()); + List brokerDataList = new ArrayList<>(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker-a"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList<>(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker-a"); + queueData.setPerm(6); + queueData.setReadQueueNums(4); + queueData.setWriteQueueNums(4); + queueData.setTopicSynFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + public static SubscriptionGroupWrapper createSubscriptionGroupWrapper() { + SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper(); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName("group_test"); + ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap(2); + subscriptionGroupTable.put("group_test", config); + wrapper.setSubscriptionGroupTable(subscriptionGroupTable); + wrapper.setDataVersion(new DataVersion()); + return wrapper; + } + + public static ConsumerConnection createConsumerConnection() { + ConsumerConnection consumerConnection = new ConsumerConnection(); + HashSet connections = new HashSet(); + Connection conn = new Connection(); + conn.setClientAddr("127.0.0.1"); + conn.setClientId("clientId"); + conn.setVersion(LanguageCode.JAVA.getCode()); + connections.add(conn); + + ConcurrentHashMap subscriptionTable = new ConcurrentHashMap(); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionTable.put("topic_test", subscriptionData); + + ConsumeType consumeType = ConsumeType.CONSUME_ACTIVELY; + MessageModel messageModel = MessageModel.CLUSTERING; + ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET; + + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(subscriptionTable); + consumerConnection.setConsumeType(consumeType); + consumerConnection.setMessageModel(messageModel); + consumerConnection.setConsumeFromWhere(consumeFromWhere); + return consumerConnection; + } + + public static ConsumerRunningInfo createConsumerRunningInfo() { + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + + TreeMap mqTable = new TreeMap(); + MessageQueue messageQueue = new MessageQueue("topic_test", "broker-a", 1); + mqTable.put(messageQueue, new ProcessQueueInfo()); + consumerRunningInfo.setMqTable(mqTable); + + TreeMap statusTable = new TreeMap(); + statusTable.put("topic_test", new ConsumeStatus()); + consumerRunningInfo.setStatusTable(statusTable); + + TreeSet subscriptionSet = new TreeSet(); + subscriptionSet.add(new SubscriptionData()); + consumerRunningInfo.setSubscriptionSet(subscriptionSet); + + Properties properties = new Properties(); + properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY.name()); + properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, Long.toString(System.currentTimeMillis())); + consumerRunningInfo.setProperties(properties); + return consumerRunningInfo; + } +}