From 54b8eb7ac1d99fdebdfbebec6ce7789534292c4d Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Thu, 12 Aug 2021 09:19:17 +0800 Subject: [PATCH] [ISSUE #673]Add unit tests to improve code coverage (#777) * [ISSUE #673]Add unit tests to improve code coverage * format code Co-authored-by: zhangjidi --- pom.xml | 25 +- .../console/admin/MQAdminAspectTest.java | 38 + .../console/admin/MQAdminExtImplTest.java | 800 ++++++++++++++++++ .../console/config/RMQConfigureTest.java | 50 ++ .../controller/ProducerControllerTest.java | 43 +- .../task/DashboardCollectTaskTest.java | 177 ++++ .../rocketmq/console/util/MockObjectUtil.java | 38 +- .../console/util/MsgTraceDecodeUtilTest.java | 30 + 8 files changed, 1186 insertions(+), 15 deletions(-) create mode 100644 src/test/java/org/apache/rocketmq/console/admin/MQAdminAspectTest.java create mode 100644 src/test/java/org/apache/rocketmq/console/admin/MQAdminExtImplTest.java create mode 100644 src/test/java/org/apache/rocketmq/console/config/RMQConfigureTest.java create mode 100644 src/test/java/org/apache/rocketmq/console/task/DashboardCollectTaskTest.java diff --git a/pom.xml b/pom.xml index 343ff51..1ac12f3 100644 --- a/pom.xml +++ b/pom.xml @@ -46,11 +46,11 @@ - - Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - repo - + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + repo + @@ -70,6 +70,7 @@ ${basedir}/../.. apacherocketmq 2.2.2.RELEASE + 3.3.3 @@ -179,15 +180,21 @@ 0.9.6 - org.bouncycastle - bcpkix-jdk15on - 1.68 - + org.bouncycastle + bcpkix-jdk15on + 1.68 + org.projectlombok lombok ${lombok.version} + + org.mockito + mockito-inline + ${mockito-inline.version} + test + diff --git a/src/test/java/org/apache/rocketmq/console/admin/MQAdminAspectTest.java b/src/test/java/org/apache/rocketmq/console/admin/MQAdminAspectTest.java new file mode 100644 index 0000000..fe4bad6 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/admin/MQAdminAspectTest.java @@ -0,0 +1,38 @@ +package org.apache.rocketmq.console.admin; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import org.apache.rocketmq.console.aspect.admin.MQAdminAspect; +import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod; +import org.apache.rocketmq.console.config.RMQConfigure; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.reflect.MethodSignature; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MQAdminAspectTest { + + @Test + public void testAroundMQAdminMethod() throws Throwable { + MQAdminAspect mqAdminAspect = new MQAdminAspect(); + ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class); + MethodSignature signature = mock(MethodSignature.class); + Method method = mock(Method.class); + MultiMQAdminCmdMethod annotationValue = mock(MultiMQAdminCmdMethod.class); + when(annotationValue.timeoutMillis()).thenReturn(0L).thenReturn(3000L); + when(method.getAnnotation(MultiMQAdminCmdMethod.class)).thenReturn(annotationValue); + when(signature.getMethod()).thenReturn(method); + when(joinPoint.getSignature()).thenReturn(signature); + + RMQConfigure rmqConfigure = mock(RMQConfigure.class); + when(rmqConfigure.getAccessKey()).thenReturn("rocketmq"); + when(rmqConfigure.getSecretKey()).thenReturn("12345678"); + Field field = mqAdminAspect.getClass().getDeclaredField("rmqConfigure"); + field.setAccessible(true); + field.set(mqAdminAspect, rmqConfigure); + + mqAdminAspect.aroundMQAdminMethod(joinPoint); + } +} diff --git a/src/test/java/org/apache/rocketmq/console/admin/MQAdminExtImplTest.java b/src/test/java/org/apache/rocketmq/console/admin/MQAdminExtImplTest.java new file mode 100644 index 0000000..705d7ab --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/admin/MQAdminExtImplTest.java @@ -0,0 +1,800 @@ +package org.apache.rocketmq.console.admin; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.console.service.client.MQAdminExtImpl; +import org.apache.rocketmq.console.service.client.MQAdminInstance; +import org.apache.rocketmq.console.util.MockObjectUtil; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class MQAdminExtImplTest { + + @InjectMocks + private MQAdminExtImpl mqAdminExtImpl; + + @Mock + private DefaultMQAdminExt defaultMQAdminExt; + + @Mock + private DefaultMQAdminExtImpl defaultMQAdminExtImpl; + + @Mock + private MQClientInstance mqClientInstance; + + @Mock + private MQClientAPIImpl mQClientAPIImpl; + + @Mock + private RemotingClient remotingClient; + + private String brokerAddr = "127.0.0.1:10911"; + + @Before + public void init() throws Exception { + Field field = MQAdminInstance.class.getDeclaredField("MQ_ADMIN_EXT_THREAD_LOCAL"); + field.setAccessible(true); + Object object = field.get(mqAdminExtImpl); + assertNotNull(object); + ThreadLocal threadLocal = (ThreadLocal) object; + defaultMQAdminExt = mock(DefaultMQAdminExt.class); + threadLocal.set(defaultMQAdminExt); + + field = MQAdminInstance.class.getDeclaredField("INIT_COUNTER"); + field.setAccessible(true); + object = field.get(mqAdminExtImpl); + assertNotNull(object); + ThreadLocal threadLocal1 = (ThreadLocal) object; + threadLocal1.set(1); + + ReflectionTestUtils.setField(defaultMQAdminExt, "defaultMQAdminExtImpl", defaultMQAdminExtImpl); + ReflectionTestUtils.setField(defaultMQAdminExtImpl, "mqClientInstance", mqClientInstance); + ReflectionTestUtils.setField(mqClientInstance, "mQClientAPIImpl", mQClientAPIImpl); + ReflectionTestUtils.setField(mQClientAPIImpl, "remotingClient", remotingClient); + } + + @After + public void destroy() { + MQAdminInstance.destroyMQAdminInstance(); + } + + @Test + public void testUpdateBrokerConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + doNothing() + .doThrow(new MQBrokerException(0, "")) + .when(defaultMQAdminExt).updateBrokerConfig(anyString(), any()); + mqAdminExtImpl.updateBrokerConfig(brokerAddr, new Properties()); + boolean hasException = false; + try { + mqAdminExtImpl.updateBrokerConfig(brokerAddr, new Properties()); + } catch (Exception e) { + hasException = true; + assertThat(e).isInstanceOf(MQBrokerException.class); + assertThat(((MQBrokerException) e).getResponseCode()).isEqualTo(0); + } + assertTrue(hasException); + } + + @Test + public void testCreateAndUpdateTopicConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + mqAdminExtImpl.createAndUpdateTopicConfig(brokerAddr, new TopicConfig()); + } + + @Test + public void testDeletePlainAccessConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + mqAdminExtImpl.deletePlainAccessConfig(brokerAddr, "rocketmq"); + } + + @Test + public void testUpdateGlobalWhiteAddrConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + mqAdminExtImpl.updateGlobalWhiteAddrConfig(brokerAddr, "192.168.*.*"); + } + + @Test + public void testCreateAndUpdatePlainAccessConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + mqAdminExtImpl.createAndUpdatePlainAccessConfig(brokerAddr, new PlainAccessConfig()); + } + + @Test + public void testExamineBrokerClusterAclVersionInfo() throws Exception { + assertNotNull(mqAdminExtImpl); + assertNull(mqAdminExtImpl.examineBrokerClusterAclVersionInfo(brokerAddr)); + } + + @Test + public void testExamineBrokerClusterAclConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + assertNull(mqAdminExtImpl.examineBrokerClusterAclConfig(brokerAddr)); + } + + @Test + public void testQueryConsumerStatus() throws Exception { + assertNotNull(mqAdminExtImpl); + } + + @Test + public void testCreateAndUpdateSubscriptionGroupConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + mqAdminExtImpl.createAndUpdateSubscriptionGroupConfig(brokerAddr, new SubscriptionGroupConfig()); + } + + @Test + public void testExamineSubscriptionGroupConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + RemotingCommand response1 = RemotingCommand.createResponseCommand(null); + RemotingCommand response2 = RemotingCommand.createResponseCommand(null); + response2.setCode(ResponseCode.SUCCESS); + response2.setBody(RemotingSerializable.encode(MockObjectUtil.createSubscriptionGroupWrapper())); + when(remotingClient.invokeSync(anyString(), any(), anyLong())) + .thenThrow(new RuntimeException("invokeSync exception")) + .thenReturn(response1).thenReturn(response2); + } + // invokeSync exception + try { + mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "topic_test"); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "invokeSync exception"); + } + + // responseCode is not success + try { + mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test"); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(MQBrokerException.class); + assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1); + } + // GET_ALL_SUBSCRIPTIONGROUP_CONFIG success + SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test"); + Assert.assertEquals(subscriptionGroupConfig.getGroupName(), "group_test"); + } + + @Test + public void testExamineTopicConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + RemotingCommand response1 = RemotingCommand.createResponseCommand(null); + RemotingCommand response2 = RemotingCommand.createResponseCommand(null); + response2.setCode(ResponseCode.SUCCESS); + response2.setBody(RemotingSerializable.encode(MockObjectUtil.createTopicConfigWrapper())); + when(remotingClient.invokeSync(anyString(), any(), anyLong())) + .thenThrow(new RuntimeException("invokeSync exception")) + .thenReturn(response1).thenReturn(response2); + } + // invokeSync exception + try { + mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test"); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "invokeSync exception"); + } + // responseCode is not success + try { + mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test"); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(MQBrokerException.class); + assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1); + } + // GET_ALL_TOPIC_CONFIG success + TopicConfig topicConfig = mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test"); + Assert.assertEquals(topicConfig.getTopicName(), "topic_test"); + } + + @Test + public void testExamineTopicStats() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.examineTopicStats(anyString())).thenReturn(MockObjectUtil.createTopicStatsTable()); + } + TopicStatsTable topicStatsTable = mqAdminExtImpl.examineTopicStats("topic_test"); + Assert.assertNotNull(topicStatsTable); + Assert.assertEquals(topicStatsTable.getOffsetTable().size(), 1); + } + + @Test + public void testExamineAllTopicConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + + } + + @Test + public void testFetchAllTopicList() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(new TopicList()); + } + TopicList topicList = mqAdminExtImpl.fetchAllTopicList(); + Assert.assertNotNull(topicList); + } + + @Test + public void testFetchBrokerRuntimeStats() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.fetchBrokerRuntimeStats(anyString())).thenReturn(new KVTable()); + } + KVTable kvTable = mqAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr); + Assert.assertNotNull(kvTable); + } + + @Test + public void testExamineConsumeStats() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.examineConsumeStats(anyString())).thenReturn(MockObjectUtil.createConsumeStats()); + when(defaultMQAdminExt.examineConsumeStats(anyString(), anyString())).thenReturn(MockObjectUtil.createConsumeStats()); + } + ConsumeStats consumeStats = mqAdminExtImpl.examineConsumeStats("group_test"); + ConsumeStats consumeStatsWithTopic = mqAdminExtImpl.examineConsumeStats("group_test", "topic_test"); + Assert.assertNotNull(consumeStats); + Assert.assertEquals(consumeStats.getOffsetTable().size(), 2); + Assert.assertNotNull(consumeStatsWithTopic); + Assert.assertEquals(consumeStatsWithTopic.getOffsetTable().size(), 2); + } + + @Test + public void testExamineBrokerClusterInfo() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(MockObjectUtil.createClusterInfo()); + } + ClusterInfo clusterInfo = mqAdminExtImpl.examineBrokerClusterInfo(); + Assert.assertNotNull(clusterInfo); + Assert.assertEquals(clusterInfo.getBrokerAddrTable().size(), 1); + Assert.assertEquals(clusterInfo.getClusterAddrTable().size(), 1); + } + + @Test + public void testExamineTopicRouteInfo() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.examineTopicRouteInfo(anyString())).thenReturn(MockObjectUtil.createTopicRouteData()); + } + TopicRouteData topicRouteData = mqAdminExtImpl.examineTopicRouteInfo("topic_test"); + Assert.assertNotNull(topicRouteData); + } + + @Test + public void testExamineConsumerConnectionInfo() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(new ConsumerConnection()); + } + ConsumerConnection consumerConnection = mqAdminExtImpl.examineConsumerConnectionInfo("group_test"); + Assert.assertNotNull(consumerConnection); + } + + @Test + public void testExamineProducerConnectionInfo() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.examineProducerConnectionInfo(anyString(), anyString())).thenReturn(new ProducerConnection()); + } + ProducerConnection producerConnection = mqAdminExtImpl.examineProducerConnectionInfo("group_test", "topic_test"); + Assert.assertNotNull(producerConnection); + } + + @Test + public void testGetNameServerAddressList() { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getNameServerAddressList()).thenReturn(Lists.asList("127.0.0.1:9876", new String[] {"127.0.0.2:9876"})); + } + List list = mqAdminExtImpl.getNameServerAddressList(); + Assert.assertEquals(list.size(), 2); + } + + @Test + public void testWipeWritePermOfBroker() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.wipeWritePermOfBroker(anyString(), anyString())).thenReturn(6); + } + int result = mqAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "broker-a"); + Assert.assertEquals(result, 6); + } + + @Test + public void testPutKVConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).putKVConfig(anyString(), anyString(), anyString()); + } + mqAdminExtImpl.putKVConfig("namespace", "key", "value"); + } + + @Test + public void testGetKVConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getKVConfig(anyString(), anyString())).thenReturn("value"); + } + String value = mqAdminExtImpl.getKVConfig("namespace", "key"); + Assert.assertEquals(value, "value"); + } + + @Test + public void testGetKVListByNamespace() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getKVListByNamespace(anyString())).thenReturn(new KVTable()); + } + KVTable kvTable = mqAdminExtImpl.getKVListByNamespace("namespace"); + Assert.assertNotNull(kvTable); + } + + @Test + public void testDeleteTopicInBroker() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).deleteTopicInBroker(any(), anyString()); + } + mqAdminExtImpl.deleteTopicInBroker(Sets.newHashSet("127.0.0.1:10911"), "topic_test"); + } + + @Test + public void testDeleteTopicInNameServer() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).deleteTopicInNameServer(any(), anyString()); + } + mqAdminExtImpl.deleteTopicInNameServer(Sets.newHashSet("127.0.0.1:9876", "127.0.0.2:9876"), "topic_test"); + } + + @Test + public void testDeleteSubscriptionGroup() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).deleteSubscriptionGroup(anyString(), anyString()); + } + mqAdminExtImpl.deleteSubscriptionGroup(brokerAddr, "group_test"); + } + + @Test + public void testCreateAndUpdateKvConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).createAndUpdateKvConfig(anyString(), anyString(), anyString()); + } + mqAdminExtImpl.createAndUpdateKvConfig("namespace", "key", "value"); + } + + @Test + public void testDeleteKvConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).deleteKvConfig(anyString(), anyString()); + } + mqAdminExtImpl.deleteKvConfig("namespace", "key"); + } + + @Test + public void testDeleteConsumerOffset() throws Exception { + assertNotNull(mqAdminExtImpl); + } + + @Test + public void testResetOffsetByTimestampOld() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean())).thenReturn(new ArrayList()); + } + List stats = mqAdminExtImpl.resetOffsetByTimestampOld("group_test", "topic_test", 1628495765398L, false); + Assert.assertNotNull(stats); + } + + @Test + public void testResetOffsetByTimestamp() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean())).thenReturn(new HashMap()); + } + Map map = mqAdminExtImpl.resetOffsetByTimestamp("group_test", "topic_test", 1628495765398L, false); + Assert.assertNotNull(map); + } + + @Test + public void testResetOffsetNew() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).resetOffsetNew(anyString(), anyString(), anyLong()); + } + mqAdminExtImpl.resetOffsetNew("group_test", "topic_test", 1628495765398L); + } + + @Test + public void testGetConsumeStatus() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getConsumeStatus(anyString(), anyString(), anyString())).thenReturn(new HashMap>()); + } + mqAdminExtImpl.getConsumeStatus("topic_test", "group_test", ""); + } + + @Test + public void testCreateOrUpdateOrderConf() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).createOrUpdateOrderConf(anyString(), anyString(), anyBoolean()); + } + mqAdminExtImpl.createOrUpdateOrderConf("key", "value", false); + } + + @Test + public void testQueryTopicConsumeByWho() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(new GroupList()); + } + GroupList groupList = mqAdminExtImpl.queryTopicConsumeByWho("topic_test"); + Assert.assertNotNull(groupList); + } + + @Test + public void testCleanExpiredConsumerQueue() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.cleanExpiredConsumerQueue(anyString())).thenReturn(true); + } + boolean result = mqAdminExtImpl.cleanExpiredConsumerQueue("DefaultCluster"); + Assert.assertEquals(result, true); + } + + @Test + public void testCleanExpiredConsumerQueueByAddr() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(anyString())).thenReturn(true); + } + boolean result = mqAdminExtImpl.cleanExpiredConsumerQueueByAddr("DefaultCluster"); + Assert.assertEquals(result, true); + } + + @Test + public void testGetConsumerRunningInfo() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())).thenReturn(new ConsumerRunningInfo()); + } + ConsumerRunningInfo consumerRunningInfo = mqAdminExtImpl.getConsumerRunningInfo("group_test", "", true); + Assert.assertNotNull(consumerRunningInfo); + } + + @Test + public void testConsumeMessageDirectly() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString())).thenReturn(new ConsumeMessageDirectlyResult()); + when(defaultMQAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString())).thenReturn(new ConsumeMessageDirectlyResult()); + } + ConsumeMessageDirectlyResult result1 = mqAdminExtImpl.consumeMessageDirectly("group_test", "", "7F000001ACC018B4AAC2116AF6500000"); + ConsumeMessageDirectlyResult result2 = mqAdminExtImpl.consumeMessageDirectly("group_test", "", "topic_test", "7F000001ACC018B4AAC2116AF6500000"); + Assert.assertNotNull(result1); + Assert.assertNotNull(result2); + } + + @Test + public void testMessageTrackDetail() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.messageTrackDetail(any())).thenReturn(new ArrayList()); + } + List tracks = mqAdminExtImpl.messageTrackDetail(new MessageExt()); + Assert.assertNotNull(tracks); + } + + @Test + public void testCloneGroupOffset() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).cloneGroupOffset(anyString(), anyString(), anyString(), anyBoolean()); + } + mqAdminExtImpl.cloneGroupOffset("group_test", "group_test1", "topic_test", false); + } + + @Test + public void testCreateTopic() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt()); + doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt()); + } + mqAdminExtImpl.createTopic("key", "topic_test", 8); + mqAdminExtImpl.createTopic("key", "topic_test", 8, 1); + } + + @Test + public void testSearchOffset() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.searchOffset(any(), anyLong())).thenReturn(Long.MAX_VALUE); + } + long offset = mqAdminExtImpl.searchOffset(new MessageQueue(), 1628495765398L); + Assert.assertEquals(offset, Long.MAX_VALUE); + } + + @Test + public void testMaxOffset() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.maxOffset(any())).thenReturn(Long.MAX_VALUE); + } + long offset = mqAdminExtImpl.maxOffset(new MessageQueue()); + Assert.assertEquals(offset, Long.MAX_VALUE); + } + + @Test + public void testMinOffset() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.minOffset(any())).thenReturn(Long.MIN_VALUE); + } + long offset = mqAdminExtImpl.minOffset(new MessageQueue()); + Assert.assertEquals(offset, Long.MIN_VALUE); + } + + @Test + public void testEarliestMsgStoreTime() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.earliestMsgStoreTime(any())).thenReturn(1628495765398L); + } + long storeTime = mqAdminExtImpl.earliestMsgStoreTime(new MessageQueue()); + Assert.assertEquals(storeTime, 1628495765398L); + } + + @Test + public void testViewMessage() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.viewMessage(anyString())).thenReturn(new MessageExt()); + } + MessageExt messageExt = mqAdminExtImpl.viewMessage("7F000001ACC018B4AAC2116AF6500000"); + Assert.assertNotNull(messageExt); + } + + @Test + public void testQueryMessage() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.queryMessage(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(mock(QueryResult.class)); + } + QueryResult result = mqAdminExtImpl.queryMessage("topic_test", "key", 32, 1627804565000L, System.currentTimeMillis()); + Assert.assertNotNull(result); + } + + @Test + public void testStart() { + assertNotNull(mqAdminExtImpl); + try { + mqAdminExtImpl.start(); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalStateException); + } + } + + @Test + public void testShutdown() { + assertNotNull(mqAdminExtImpl); + try { + mqAdminExtImpl.shutdown(); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalStateException); + } + } + + @Test + public void testQueryConsumeTimeSpan() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(new ArrayList()); + } + List timeSpans = mqAdminExtImpl.queryConsumeTimeSpan("topic_test", "group_test"); + Assert.assertNotNull(timeSpans); + } + + @Test + public void testViewMessage2() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl()).thenReturn(mock(MQAdminImpl.class)); + when(defaultMQAdminExt.viewMessage(anyString())).thenThrow(new RuntimeException("viewMessage exception")); + } + mqAdminExtImpl.viewMessage("topic_test", "7F000001ACC018B4AAC2116AF6500000"); + } + + @Test + public void testGetBrokerConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getBrokerConfig(anyString())).thenReturn(new Properties()); + } + Properties brokerConfig = mqAdminExtImpl.getBrokerConfig(brokerAddr); + Assert.assertNotNull(brokerConfig); + } + + @Test + public void testFetchTopicsByCLuster() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.fetchTopicsByCLuster(anyString())).thenReturn(new TopicList()); + } + TopicList topicList = mqAdminExtImpl.fetchTopicsByCLuster("DefaultCluster"); + Assert.assertNotNull(topicList); + } + + @Test + public void testCleanUnusedTopic() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.cleanUnusedTopic(anyString())).thenReturn(true); + when(defaultMQAdminExt.cleanUnusedTopicByAddr(anyString())).thenReturn(true); + } + Boolean result1 = mqAdminExtImpl.cleanUnusedTopic("DefaultCluster"); + Boolean result2 = mqAdminExtImpl.cleanUnusedTopic(brokerAddr); + Assert.assertEquals(result1, true); + Assert.assertEquals(result2, true); + } + + @Test + public void testViewBrokerStatsData() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.viewBrokerStatsData(anyString(), anyString(), anyString())).thenReturn(new BrokerStatsData()); + } + BrokerStatsData brokerStatsData = mqAdminExtImpl.viewBrokerStatsData(brokerAddr, BrokerStatsManager.TOPIC_PUT_NUMS, "topic_test"); + Assert.assertNotNull(brokerStatsData); + } + + @Test + public void testGetClusterList() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getClusterList(anyString())).thenReturn(new HashSet<>()); + } + Set clusterList = mqAdminExtImpl.getClusterList("topic_test"); + Assert.assertNotNull(clusterList); + } + + @Test + public void testFetchConsumeStatsInBroker() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.fetchConsumeStatsInBroker(anyString(), anyBoolean(), anyLong())).thenReturn(new ConsumeStatsList()); + } + ConsumeStatsList consumeStatsList = mqAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, false, System.currentTimeMillis()); + Assert.assertNotNull(consumeStatsList); + } + + @Test + public void testGetTopicClusterList() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.fetchTopicsByCLuster(anyString())).thenReturn(new TopicList()); + } + TopicList topicList = mqAdminExtImpl.fetchTopicsByCLuster("DefaultCluster"); + Assert.assertNotNull(topicList); + } + + @Test + public void testGetAllSubscriptionGroup() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(new SubscriptionGroupWrapper()); + } + SubscriptionGroupWrapper wrapper = mqAdminExtImpl.getAllSubscriptionGroup(brokerAddr, 5000L); + Assert.assertNotNull(wrapper); + } + + @Test + public void testGetAllTopicGroup() throws Exception { + assertNotNull(mqAdminExtImpl); + { + when(defaultMQAdminExt.getAllTopicGroup(anyString(), anyLong())).thenReturn(new TopicConfigSerializeWrapper()); + } + TopicConfigSerializeWrapper wrapper = mqAdminExtImpl.getAllTopicGroup(brokerAddr, 5000L); + Assert.assertNotNull(wrapper); + } + + @Test + public void testUpdateConsumeOffset() throws Exception { + assertNotNull(mqAdminExtImpl); + { + doNothing().when(defaultMQAdminExt).updateConsumeOffset(anyString(), anyString(), any(), anyLong()); + } + mqAdminExtImpl.updateConsumeOffset(brokerAddr, "group_test", new MessageQueue(), 10000L); + } + + @Test + public void testUpdateNameServerConfig() { + assertNotNull(mqAdminExtImpl); + } + + @Test + public void testGetNameServerConfig() throws Exception { + assertNotNull(mqAdminExtImpl); + assertNull(mqAdminExtImpl.getNameServerConfig(new ArrayList<>())); + } + + @Test + public void testQueryConsumeQueue() throws Exception { + assertNotNull(mqAdminExtImpl); + assertNull(mqAdminExtImpl.queryConsumeQueue(brokerAddr, "topic_test", 2, 1, 10, "group_test")); + } + + @Test + public void testResumeCheckHalfMessage() throws Exception { + assertNotNull(mqAdminExtImpl); + Assert.assertFalse(mqAdminExtImpl.resumeCheckHalfMessage("7F000001ACC018B4AAC2116AF6500000")); + Assert.assertFalse(mqAdminExtImpl.resumeCheckHalfMessage("topic_test", "7F000001ACC018B4AAC2116AF6500000")); + } + +} diff --git a/src/test/java/org/apache/rocketmq/console/config/RMQConfigureTest.java b/src/test/java/org/apache/rocketmq/console/config/RMQConfigureTest.java new file mode 100644 index 0000000..058f540 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/config/RMQConfigureTest.java @@ -0,0 +1,50 @@ +package org.apache.rocketmq.console.config; + +import java.io.File; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.boot.web.server.ErrorPage; +import org.springframework.boot.web.server.ErrorPageRegistrar; +import org.springframework.boot.web.server.ErrorPageRegistry; + +public class RMQConfigureTest { + + private RMQConfigure rmqConfigure = new RMQConfigure(); + + @Test + public void testSet() { + rmqConfigure.setAccessKey("rocketmq"); + rmqConfigure.setSecretKey("12345678"); + rmqConfigure.setDataPath("/tmp/rocketmq-console/data/test"); + rmqConfigure.setEnableDashBoardCollect("true"); + rmqConfigure.setIsVIPChannel("true"); + rmqConfigure.setUseTLS(true); + rmqConfigure.setLoginRequired(true); + rmqConfigure.setMsgTrackTopicName(null); + rmqConfigure.setNamesrvAddr("127.0.0.1:9876"); + } + + @Test + public void testGet() { + testSet(); + Assert.assertEquals(rmqConfigure.getAccessKey(), "rocketmq"); + Assert.assertEquals(rmqConfigure.getSecretKey(), "12345678"); + Assert.assertTrue(rmqConfigure.isACLEnabled()); + Assert.assertTrue(rmqConfigure.isUseTLS()); + Assert.assertEquals(rmqConfigure.getConsoleCollectData(), "/tmp/rocketmq-console/data/test" + File.separator + "dashboard"); + Assert.assertEquals(rmqConfigure.getRocketMqConsoleDataPath(), "/tmp/rocketmq-console/data/test"); + Assert.assertEquals(rmqConfigure.getIsVIPChannel(), "true"); + Assert.assertTrue(rmqConfigure.isEnableDashBoardCollect()); + Assert.assertTrue(rmqConfigure.isLoginRequired()); + Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC); + Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876"); + ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar(); + registrar.registerErrorPages(new ErrorPageRegistry() { + @Override + public void addErrorPages(ErrorPage... errorPages) { + + } + }); + } +} diff --git a/src/test/java/org/apache/rocketmq/console/controller/ProducerControllerTest.java b/src/test/java/org/apache/rocketmq/console/controller/ProducerControllerTest.java index 9e9cc96..99cb3b5 100644 --- a/src/test/java/org/apache/rocketmq/console/controller/ProducerControllerTest.java +++ b/src/test/java/org/apache/rocketmq/console/controller/ProducerControllerTest.java @@ -21,12 +21,22 @@ import java.util.HashSet; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.console.interceptor.AuthInterceptor; +import org.apache.rocketmq.console.service.impl.LoginServiceImpl; import org.apache.rocketmq.console.service.impl.ProducerServiceImpl; +import org.apache.rocketmq.console.support.GlobalExceptionHandler; +import org.apache.rocketmq.console.support.GlobalRestfulResponseBodyAdvice; +import org.apache.rocketmq.console.util.MyPrintingResultHandler; +import org.apache.rocketmq.console.util.WebUtil; import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.junit.Before; import org.junit.Test; import org.mockito.InjectMocks; import org.mockito.Spy; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.anyString; @@ -42,9 +52,33 @@ public class ProducerControllerTest extends BaseControllerTest { @Spy private ProducerServiceImpl producerService; + @Override protected MockMvc createMockMvc() { + AuthInterceptor authInterceptor = new AuthInterceptor(); + ReflectionTestUtils.setField(authInterceptor, "loginService", new LoginServiceImpl()); + MockMvc innerMockMvc = MockMvcBuilders.standaloneSetup(getTestController()) + .addInterceptors(authInterceptor) + .alwaysDo(MyPrintingResultHandler.me()) + .setControllerAdvice(new GlobalExceptionHandler(), new GlobalRestfulResponseBodyAdvice()) + .build(); + this.mockMvc = innerMockMvc; + return innerMockMvc; + } + + @Before + public void init(){ + createMockMvc(); + } + @Test public void testProducerConnection() throws Exception { final String url = "/producer/producerConnection.query"; + // user not login, request will redirect + requestBuilder = MockMvcRequestBuilders.get(url); + requestBuilder.param("producerGroup", "producer_test") + .param("topic", "topic_test"); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().is3xxRedirection()); + // user login { ProducerConnection producerConnection = new ProducerConnection(); HashSet connections = new HashSet<>(); @@ -58,10 +92,8 @@ public class ProducerControllerTest extends BaseControllerTest { .thenThrow(new MQClientException("Not found the producer group connection", null)) .thenReturn(producerConnection); } - requestBuilder = MockMvcRequestBuilders.get(url); - requestBuilder.param("producerGroup", "producer_test") - .param("topic", "topic_test"); - // 1、no connection + // 1、no producer connection + requestBuilder.sessionAttr(WebUtil.USER_NAME, "admin"); perform = mockMvc.perform(requestBuilder); perform.andExpect(status().isOk()) .andExpect(jsonPath("$").exists()) @@ -79,7 +111,8 @@ public class ProducerControllerTest extends BaseControllerTest { .andExpect(jsonPath("$.data.connectionSet[0].clientId").value("clientId")); } - @Override protected Object getTestController() { + @Override + protected Object getTestController() { return producerController; } } diff --git a/src/test/java/org/apache/rocketmq/console/task/DashboardCollectTaskTest.java b/src/test/java/org/apache/rocketmq/console/task/DashboardCollectTaskTest.java new file mode 100644 index 0000000..11be909 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/console/task/DashboardCollectTaskTest.java @@ -0,0 +1,177 @@ +package org.apache.rocketmq.console.task; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.File; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.console.BaseTest; +import org.apache.rocketmq.console.config.RMQConfigure; +import org.apache.rocketmq.console.service.impl.DashboardCollectServiceImpl; +import org.apache.rocketmq.console.util.JsonUtil; +import org.apache.rocketmq.console.util.MockObjectUtil; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +public class DashboardCollectTaskTest extends BaseTest { + + @Spy + private DashboardCollectTask dashboardCollectTask; + + @Spy + private DashboardCollectServiceImpl dashboardCollectService; + + @Mock + private MQAdminExt mqAdminExt; + + @Mock + private RMQConfigure rmqConfigure; + + private int taskExecuteNum = 10; + + private File brokerFile; + + private File topicFile; + + @Before + public void init() throws Exception { + MockitoAnnotations.initMocks(this); + when(rmqConfigure.getConsoleCollectData()).thenReturn("/tmp/rocketmq-console/test/data"); + ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); + when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + String dataLocationPath = rmqConfigure.getConsoleCollectData(); + DateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + String nowDateStr = format.format(new Date()); + brokerFile = new File(dataLocationPath + nowDateStr + ".json"); + topicFile = new File(dataLocationPath + nowDateStr + "_topic" + ".json"); + autoInjection(); + } + + @Test + public void testCollectTopic() throws Exception { + // enableDashBoardCollect = false + when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(false); + dashboardCollectTask.collectTopic(); + { + TopicList topicList = new TopicList(); + Set topicSet = new HashSet<>(); + topicSet.add("topic_test"); + topicSet.add("%RETRY%group_test"); + topicSet.add("%DLQ%group_test"); + topicList.setTopicList(topicSet); + when(mqAdminExt.fetchAllTopicList()) + .thenThrow(new RuntimeException("fetchAllTopicList exception")) + .thenReturn(topicList); + TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData(); + when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData); + GroupList list = new GroupList(); + list.setGroupList(Sets.newHashSet("group_test")); + when(mqAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(list); + BrokerStatsData brokerStatsData = MockObjectUtil.createBrokerStatsData(); + when(mqAdminExt.viewBrokerStatsData(anyString(), anyString(), anyString())) + .thenThrow(new RuntimeException("viewBrokerStatsData TOPIC_PUT_NUMS exception")) + .thenThrow(new RuntimeException("viewBrokerStatsData GROUP_GET_NUMS exception")) + .thenReturn(brokerStatsData); + when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(true); + } + // fetchAllTopicList exception + try { + dashboardCollectTask.collectTopic(); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "fetchAllTopicList exception"); + } + for (int i = 0; i < taskExecuteNum; i++) { + dashboardCollectTask.collectTopic(); + } + LoadingCache> map = dashboardCollectService.getTopicMap(); + Assert.assertEquals(map.size(), 1); + Assert.assertEquals(map.get("topic_test").size(), taskExecuteNum); + dashboardCollectTask.saveData(); + Assert.assertEquals(topicFile.exists(), true); + Map> topicData = + JsonUtil.string2Obj(MixAll.file2String(topicFile), + new TypeReference>>() { + }); + Assert.assertEquals(topicData.get("topic_test").size(), taskExecuteNum); + } + + @Test + public void testCollectBroker() throws Exception { + // enableDashBoardCollect = false + when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(false); + dashboardCollectTask.collectBroker(); + { + HashMap result = new HashMap<>(); + result.put("getTotalTps", "0.0 0.033330000333300004 0.03332972261338355"); + result.put("commitLogMinOffset", "0"); + KVTable kvTable = new KVTable(); + kvTable.setTable(result); + when(mqAdminExt.fetchBrokerRuntimeStats(anyString())) + .thenThrow(new RuntimeException("fetchBrokerRuntimeStats exception")) + .thenReturn(kvTable); + when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(true); + } + // fetchBrokerRuntimeStats exception + try { + dashboardCollectTask.collectBroker(); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "fetchBrokerRuntimeStats exception"); + } + + for (int i = 0; i < taskExecuteNum; i++) { + dashboardCollectTask.collectBroker(); + } + LoadingCache> map = dashboardCollectService.getBrokerMap(); + Assert.assertEquals(map.size(), 1); + Assert.assertEquals(map.get("broker-a" + ":" + MixAll.MASTER_ID).size(), taskExecuteNum); + mockBrokerFileExistBeforeSaveData(); + dashboardCollectTask.saveData(); + Assert.assertEquals(brokerFile.exists(), true); + Map> brokerData = + JsonUtil.string2Obj(MixAll.file2String(brokerFile), + new TypeReference>>() { + }); + Assert.assertEquals(brokerData.get("broker-a" + ":" + MixAll.MASTER_ID).size(), taskExecuteNum + 2); + } + + @After + public void after() { + if (brokerFile != null && brokerFile.exists()) { + brokerFile.delete(); + } + if (topicFile != null && topicFile.exists()) { + topicFile.delete(); + } + } + + private void mockBrokerFileExistBeforeSaveData() throws Exception { + Map> map = new HashMap<>(); + map.put("broker-a" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"})); + map.put("broker-b" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"})); + MixAll.string2File(JsonUtil.obj2String(map), brokerFile.getAbsolutePath()); + } +} diff --git a/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java b/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java index e58ec5d..75e3272 100644 --- a/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java +++ b/src/test/java/org/apache/rocketmq/console/util/MockObjectUtil.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; @@ -39,7 +40,8 @@ import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; @@ -47,6 +49,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; @@ -141,6 +144,17 @@ public class MockObjectUtil { return wrapper; } + public static TopicConfigSerializeWrapper createTopicConfigWrapper() { + TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper(); + TopicConfig config = new TopicConfig(); + config.setTopicName("topic_test"); + ConcurrentMap topicConfigTable = new ConcurrentHashMap(2); + topicConfigTable.put("topic_test", config); + wrapper.setTopicConfigTable(topicConfigTable); + wrapper.setDataVersion(new DataVersion()); + return wrapper; + } + public static ConsumerConnection createConsumerConnection() { ConsumerConnection consumerConnection = new ConsumerConnection(); HashSet connections = new HashSet(); @@ -259,4 +273,26 @@ public class MockObjectUtil { .append("true").append(TraceConstants.FIELD_SPLITOR); return sb.toString(); } + + public static BrokerStatsData createBrokerStatsData() { + BrokerStatsData brokerStatsData = new BrokerStatsData(); + BrokerStatsItem statsDay = new BrokerStatsItem(); + statsDay.setAvgpt(100.0); + statsDay.setSum(10000L); + statsDay.setTps(100.0); + brokerStatsData.setStatsDay(statsDay); + + BrokerStatsItem statsHour = new BrokerStatsItem(); + statsHour.setAvgpt(10.0); + statsHour.setSum(100L); + statsHour.setTps(100.0); + brokerStatsData.setStatsHour(statsHour); + + BrokerStatsItem statsMinute = new BrokerStatsItem(); + statsMinute.setAvgpt(10.0); + statsMinute.setSum(100L); + statsMinute.setTps(100.0); + brokerStatsData.setStatsMinute(statsMinute); + return brokerStatsData; + } } diff --git a/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java b/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java index 7403e1b..468a324 100644 --- a/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java +++ b/src/test/java/org/apache/rocketmq/console/util/MsgTraceDecodeUtilTest.java @@ -48,6 +48,8 @@ public class MsgTraceDecodeUtilTest { @Test public void testDecodePubTraceMessage() { + List contexts = MsgTraceDecodeUtil.decoderFromTraceDataString(null); + Assert.assertEquals(contexts.size(), 0); String pubTraceData_V1 = new String(pubTraceDataBase); List traceContextListV1 = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_V1); Assert.assertEquals(traceContextListV1.size(), 1); @@ -96,6 +98,21 @@ public class MsgTraceDecodeUtilTest { Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getOffsetMsgId(), "0A741D02000078BF000000000132F7C9"); Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911"); Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getClientHost(), "10.10.10.11"); + + String pubTraceData_default = new StringBuilder(pubTraceDataBase) + .append("0A741D02000078BF000000000132F7C9").append(TraceConstants.CONTENT_SPLITOR) + .append("true").append(TraceConstants.CONTENT_SPLITOR) + .append("10.10.10.11").append(TraceConstants.CONTENT_SPLITOR) + .append("10.10.10.11").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextList = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_default); + Assert.assertEquals(traceContextList.size(), 1); + Assert.assertEquals(traceContextList.get(0).getTraceType().toString(), "Pub"); + Assert.assertEquals(traceContextList.get(0).isSuccess(), true); + Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc6980189"); + Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getOffsetMsgId(), "0A741D02000078BF000000000132F7C9"); + Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911"); + Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getClientHost(), "10.10.10.11"); } @Test @@ -134,5 +151,18 @@ public class MsgTraceDecodeUtilTest { Assert.assertEquals(traceContextListV3.get(1).isSuccess(), false); Assert.assertEquals(traceContextListV3.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f"); Assert.assertEquals(traceContextListV3.get(1).getGroupName(), "test_consumer_group"); + + String subTraceData_default = new StringBuilder(subTraceDataBase) + .append("4").append(TraceConstants.CONTENT_SPLITOR) + .append("1614666740499").append(TraceConstants.CONTENT_SPLITOR) + .append("test_consumer_group").append(TraceConstants.CONTENT_SPLITOR) + .append("test_consumer_group").append(TraceConstants.CONTENT_SPLITOR) + .toString(); + List traceContextList = MsgTraceDecodeUtil.decoderFromTraceDataString(subTraceData_default); + Assert.assertEquals(traceContextList.size(), 2); + Assert.assertEquals(traceContextList.get(1).getTraceType().toString(), "SubAfter"); + Assert.assertEquals(traceContextList.get(1).isSuccess(), false); + Assert.assertEquals(traceContextList.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f"); + Assert.assertEquals(traceContextList.get(1).getGroupName(), "test_consumer_group"); } }