From 38c1b97589d583e1ff8279f282be69d62e7fa182 Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Mon, 19 Jul 2021 12:26:08 +0800 Subject: [PATCH] [ISSUE #673] Add unit tests for MessageController. (#747) Co-authored-by: zhangjidi2016 --- .../service/impl/MessageServiceImpl.java | 23 +- .../controller/MessageControllerTest.java | 254 ++++++++++++++++++ .../rocketmq/console/util/MockObjectUtil.java | 23 +- 3 files changed, 289 insertions(+), 11 deletions(-) create mode 100644 src/test/java/org/apache/rocketmq/console/controller/MessageControllerTest.java diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java index a7961b4..8e2ac19 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java @@ -80,7 +80,7 @@ public class MessageServiceImpl implements MessageService { .build(); @Autowired - private RMQConfigure configure; + private RMQConfigure rMQConfigure; /** * @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64; * @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); @@ -117,12 +117,12 @@ public class MessageServiceImpl implements MessageService { @Override public List queryMessageByTopic(String topic, final long begin, final long end) { - boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); + boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey()); RPCHook rpcHook = null; if (isEnableAcl) { - rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); + rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey())); } - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook); List messageViewList = Lists.newArrayList(); try { String subExpression = "*"; @@ -250,12 +250,12 @@ public class MessageServiceImpl implements MessageService { } private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) { - boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); + boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey()); RPCHook rpcHook = null; if (isEnableAcl) { - rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); + rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey())); } - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook); long total = 0; List queueOffsetInfos = new ArrayList<>(); @@ -391,12 +391,12 @@ public class MessageServiceImpl implements MessageService { } private Page queryMessageByTaskPage(MessageQueryByPage query, List queueOffsetInfos) { - boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); + boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey()); RPCHook rpcHook = null; if (isEnableAcl) { - rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); + rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(), rMQConfigure.getSecretKey())); } - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook); List messageViews = new ArrayList<>(); long offset = query.getPageNum() * query.getPageSize(); @@ -536,4 +536,7 @@ public class MessageServiceImpl implements MessageService { } } + public DefaultMQPullConsumer buildDefaultMQPullConsumer(RPCHook rpcHook) { + return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + } } diff --git a/src/test/java/org/apache/rocketmq/console/controller/MessageControllerTest.java b/src/test/java/org/apache/rocketmq/console/controller/MessageControllerTest.java new file mode 100644 index 0000000..ab8cd4d --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/controller/MessageControllerTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.controller; + +import com.alibaba.fastjson.JSON; +import com.google.common.cache.Cache; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.CMResult; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.console.model.QueueOffsetInfo; +import org.apache.rocketmq.console.model.request.MessageQuery; +import org.apache.rocketmq.console.service.impl.MessageServiceImpl; +import org.apache.rocketmq.console.util.MockObjectUtil; +import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.apache.rocketmq.tools.admin.api.TrackType; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Spy; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.ResultActions; +import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; + +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +public class MessageControllerTest extends BaseControllerTest { + + @InjectMocks + private MessageController messageController; + + private MockHttpServletRequestBuilder requestBuilder = null; + + private ResultActions perform; + + @Spy + private MessageServiceImpl messageService; + + private Set messageQueues; + private DefaultMQPullConsumer defaultMQPullConsumer; + + @Before + public void init() throws Exception { + super.mockRmqConfigure(); + { + List wrappers = new ArrayList<>(1); + wrappers.add(MockObjectUtil.createMessageExt()); + defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + messageQueues = new HashSet<>(1); + MessageQueue messageQueue = new MessageQueue("topic_test", "broker-a", 0); + messageQueues.add(messageQueue); + when(defaultMQPullConsumer.fetchSubscribeMessageQueues(anyString())).thenReturn(messageQueues); + when(defaultMQPullConsumer.searchOffset(messageQueue, Long.MIN_VALUE)).thenReturn(Long.MIN_VALUE); + when(defaultMQPullConsumer.searchOffset(messageQueue, Long.MAX_VALUE)).thenReturn(Long.MAX_VALUE - 10L); + PullResult pullResult = mock(PullResult.class); + when(defaultMQPullConsumer.pull(any(), anyString(), anyLong(), anyInt())).thenReturn(pullResult); + when(pullResult.getNextBeginOffset()).thenReturn(Long.MAX_VALUE); + when(pullResult.getPullStatus()).thenReturn(PullStatus.FOUND); + when(pullResult.getMsgFoundList()).thenReturn(wrappers); + when(messageService.buildDefaultMQPullConsumer(any())).thenReturn(defaultMQPullConsumer); + } + } + + @Test + public void testViewMessage() throws Exception { + final String url = "/message/viewMessage.query"; + { + MessageExt messageExt = MockObjectUtil.createMessageExt(); + when(mqAdminExt.viewMessage(anyString(), anyString())) + .thenThrow(new MQClientException(208, "no message")) + .thenReturn(messageExt); + MessageTrack track = new MessageTrack(); + track.setConsumerGroup("group_test"); + track.setTrackType(TrackType.CONSUMED); + List tracks = new ArrayList<>(); + tracks.add(track); + when(mqAdminExt.messageTrackDetail(any())) + .thenThrow(new MQBrokerException(206, "consumer not online")) + .thenReturn(tracks); + } + // no message + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("topic", "topic_test"); + requestBuilder.param("msgId", "0A9A003F00002A9F0000000000000319"); + perform = mockMvc.perform(requestBuilder); + performErrorExpect(perform); + + // consumer not online + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.messageView.msgId").value("0A9A003F00002A9F0000000000000319")) + .andExpect(jsonPath("$.data.messageTrackList", hasSize(0))); + + // query message success and has a group consumed. + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.messageView.msgId").value("0A9A003F00002A9F0000000000000319")) + .andExpect(jsonPath("$.data.messageTrackList", hasSize(1))) + .andExpect(jsonPath("$.data.messageTrackList[0].consumerGroup").value("group_test")) + .andExpect(jsonPath("$.data.messageTrackList[0].trackType").value(TrackType.CONSUMED.name())); + } + + @Test + public void testQueryMessagePageByTopic() throws Exception { + final String url = "/message/queryMessagePageByTopic.query"; + MessageQuery query = new MessageQuery(); + query.setPageNum(1); + query.setPageSize(10); + query.setTopic("topic_test"); + query.setTaskId(""); + query.setBegin(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000); + query.setEnd(System.currentTimeMillis()); + + // missed cache + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + requestBuilder.content(JSON.toJSONString(query)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.page.content", hasSize(1))) + .andExpect(jsonPath("$.data.page.content[0].msgId").value("0A9A003F00002A9F0000000000000319")); + + String taskId = MessageClientIDSetter.createUniqID(); + { + List queueOffsetInfos = new ArrayList<>(); + int idx = 0; + for (MessageQueue messageQueue : messageQueues) { + Long minOffset = defaultMQPullConsumer.searchOffset(messageQueue, query.getBegin()); + Long maxOffset = defaultMQPullConsumer.searchOffset(messageQueue, query.getEnd()) + 1; + queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue)); + } + // Use reflection to add data to the CACHE + Field field = MessageServiceImpl.class.getDeclaredField("CACHE"); + field.setAccessible(true); + Cache> cache = (Cache>) field.get(messageService); + cache.put(taskId, queueOffsetInfos); + } + + // hit cache + query.setTaskId(taskId); + requestBuilder.content(JSON.toJSONString(query)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.page.content", hasSize(1))) + .andExpect(jsonPath("$.data.page.content[0].msgId").value("0A9A003F00002A9F0000000000000319")); + } + + @Test + public void testQueryMessageByTopicAndKey() throws Exception { + final String url = "/message/queryMessageByTopicAndKey.query"; + { + List messageList = new ArrayList<>(2); + messageList.add(MockObjectUtil.createMessageExt()); + QueryResult queryResult = new QueryResult(System.currentTimeMillis(), messageList); + when(mqAdminExt.queryMessage(anyString(), anyString(), anyInt(), anyLong(), anyLong())) + .thenReturn(queryResult); + } + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("topic", "topic_test"); + requestBuilder.param("key", "KeyA"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(1))) + .andExpect(jsonPath("$.data[0].msgId").value("0A9A003F00002A9F0000000000000319")); + } + + @Test + public void testQueryMessageByTopic() throws Exception { + final String url = "/message/queryMessageByTopic.query"; + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("topic", "topic_test") + .param("begin", Long.toString(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000)) + .param("end", Long.toString(System.currentTimeMillis())); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(1))) + .andExpect(jsonPath("$.data[0].msgId").value("0A9A003F00002A9F0000000000000319")); + } + + @Test + public void testConsumeMessageDirectly() throws Exception { + final String url = "/message/consumeMessageDirectly.do"; + { + ConsumeMessageDirectlyResult result1 = new ConsumeMessageDirectlyResult(); + result1.setConsumeResult(CMResult.CR_SUCCESS); + ConsumeMessageDirectlyResult result2 = new ConsumeMessageDirectlyResult(); + result2.setConsumeResult(CMResult.CR_LATER); + when(mqAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString())) + .thenReturn(result1).thenReturn(result2); + ConsumerConnection consumerConnection = MockObjectUtil.createConsumerConnection(); + when(mqAdminExt.examineConsumerConnectionInfo(anyString())) + .thenReturn(consumerConnection); + } + + // clientId is not empty + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.param("topic", "topic_test") + .param("consumerGroup", "group_test") + .param("msgId", "0A9A003F00002A9F0000000000000319") + .param("clientId", "127.0.0.1@37540#2295913058176000"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.consumeResult").value(CMResult.CR_SUCCESS.name())); + + // clientId is empty + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.param("topic", "topic_test") + .param("consumerGroup", "group_test") + .param("msgId", "0A9A003F00002A9F0000000000000319"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data.consumeResult").value(CMResult.CR_LATER.name())); + } + + @Override protected Object getTestController() { + return messageController; + } +} diff --git a/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java b/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java index 733de1e..cb89355 100644 --- a/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java +++ b/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.console.util; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.Connection; @@ -118,7 +120,7 @@ public class MockObjectUtil { queueData.setPerm(6); queueData.setReadQueueNums(4); queueData.setWriteQueueNums(4); - queueData.setTopicSynFlag(0); + queueData.setTopicSysFlag(0); queueDataList.add(queueData); topicRouteData.setQueueDatas(queueDataList); return topicRouteData; @@ -183,4 +185,23 @@ public class MockObjectUtil { consumerRunningInfo.setProperties(properties); return consumerRunningInfo; } + + public static MessageExt createMessageExt() { + MessageExt messageExt = new MessageExt(); + messageExt.setBrokerName("broker-a"); + messageExt.setQueueId(0); + messageExt.setStoreSize(205); + messageExt.setQueueOffset(1L); + messageExt.setKeys("KeyA"); + messageExt.setMsgId("0A9A003F00002A9F0000000000000319"); + messageExt.setTopic("topic_test"); + messageExt.setBody("this is message ext body".getBytes()); + messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 8899)); + messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 7788)); + messageExt.setBornTimestamp(System.currentTimeMillis()); + messageExt.setStoreTimestamp(System.currentTimeMillis()); + messageExt.setCommitLogOffset(793); + messageExt.setReconsumeTimes(0); + return messageExt; + } }