[ISSUE #673]Add unit tests to improve code coverage (#777)

* [ISSUE #673]Add unit tests to improve code coverage

* format code

Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
zhangjidi2016
2021-08-12 09:19:17 +08:00
committed by GitHub
parent eb9103f082
commit 54b8eb7ac1
8 changed files with 1186 additions and 15 deletions

25
pom.xml
View File

@@ -46,11 +46,11 @@
</developers>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
<distribution>repo</distribution>
</license>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
@@ -70,6 +70,7 @@
<main.basedir>${basedir}/../..</main.basedir>
<docker.image.prefix>apacherocketmq</docker.image.prefix>
<spring.boot.version>2.2.2.RELEASE</spring.boot.version>
<mockito-inline.version>3.3.3</mockito-inline.version>
</properties>
<dependencies>
@@ -179,15 +180,21 @@
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.68</version>
</dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito-inline.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@@ -0,0 +1,38 @@
package org.apache.rocketmq.console.admin;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.apache.rocketmq.console.aspect.admin.MQAdminAspect;
import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MQAdminAspectTest {
@Test
public void testAroundMQAdminMethod() throws Throwable {
MQAdminAspect mqAdminAspect = new MQAdminAspect();
ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
MethodSignature signature = mock(MethodSignature.class);
Method method = mock(Method.class);
MultiMQAdminCmdMethod annotationValue = mock(MultiMQAdminCmdMethod.class);
when(annotationValue.timeoutMillis()).thenReturn(0L).thenReturn(3000L);
when(method.getAnnotation(MultiMQAdminCmdMethod.class)).thenReturn(annotationValue);
when(signature.getMethod()).thenReturn(method);
when(joinPoint.getSignature()).thenReturn(signature);
RMQConfigure rmqConfigure = mock(RMQConfigure.class);
when(rmqConfigure.getAccessKey()).thenReturn("rocketmq");
when(rmqConfigure.getSecretKey()).thenReturn("12345678");
Field field = mqAdminAspect.getClass().getDeclaredField("rmqConfigure");
field.setAccessible(true);
field.set(mqAdminAspect, rmqConfigure);
mqAdminAspect.aroundMQAdminMethod(joinPoint);
}
}

View File

@@ -0,0 +1,800 @@
package org.apache.rocketmq.console.admin;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.console.service.client.MQAdminExtImpl;
import org.apache.rocketmq.console.service.client.MQAdminInstance;
import org.apache.rocketmq.console.util.MockObjectUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class MQAdminExtImplTest {
@InjectMocks
private MQAdminExtImpl mqAdminExtImpl;
@Mock
private DefaultMQAdminExt defaultMQAdminExt;
@Mock
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
@Mock
private MQClientInstance mqClientInstance;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private RemotingClient remotingClient;
private String brokerAddr = "127.0.0.1:10911";
@Before
public void init() throws Exception {
Field field = MQAdminInstance.class.getDeclaredField("MQ_ADMIN_EXT_THREAD_LOCAL");
field.setAccessible(true);
Object object = field.get(mqAdminExtImpl);
assertNotNull(object);
ThreadLocal<DefaultMQAdminExt> threadLocal = (ThreadLocal<DefaultMQAdminExt>) object;
defaultMQAdminExt = mock(DefaultMQAdminExt.class);
threadLocal.set(defaultMQAdminExt);
field = MQAdminInstance.class.getDeclaredField("INIT_COUNTER");
field.setAccessible(true);
object = field.get(mqAdminExtImpl);
assertNotNull(object);
ThreadLocal<Integer> threadLocal1 = (ThreadLocal<Integer>) object;
threadLocal1.set(1);
ReflectionTestUtils.setField(defaultMQAdminExt, "defaultMQAdminExtImpl", defaultMQAdminExtImpl);
ReflectionTestUtils.setField(defaultMQAdminExtImpl, "mqClientInstance", mqClientInstance);
ReflectionTestUtils.setField(mqClientInstance, "mQClientAPIImpl", mQClientAPIImpl);
ReflectionTestUtils.setField(mQClientAPIImpl, "remotingClient", remotingClient);
}
@After
public void destroy() {
MQAdminInstance.destroyMQAdminInstance();
}
@Test
public void testUpdateBrokerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
doNothing()
.doThrow(new MQBrokerException(0, ""))
.when(defaultMQAdminExt).updateBrokerConfig(anyString(), any());
mqAdminExtImpl.updateBrokerConfig(brokerAddr, new Properties());
boolean hasException = false;
try {
mqAdminExtImpl.updateBrokerConfig(brokerAddr, new Properties());
} catch (Exception e) {
hasException = true;
assertThat(e).isInstanceOf(MQBrokerException.class);
assertThat(((MQBrokerException) e).getResponseCode()).isEqualTo(0);
}
assertTrue(hasException);
}
@Test
public void testCreateAndUpdateTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.createAndUpdateTopicConfig(brokerAddr, new TopicConfig());
}
@Test
public void testDeletePlainAccessConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.deletePlainAccessConfig(brokerAddr, "rocketmq");
}
@Test
public void testUpdateGlobalWhiteAddrConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.updateGlobalWhiteAddrConfig(brokerAddr, "192.168.*.*");
}
@Test
public void testCreateAndUpdatePlainAccessConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.createAndUpdatePlainAccessConfig(brokerAddr, new PlainAccessConfig());
}
@Test
public void testExamineBrokerClusterAclVersionInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.examineBrokerClusterAclVersionInfo(brokerAddr));
}
@Test
public void testExamineBrokerClusterAclConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.examineBrokerClusterAclConfig(brokerAddr));
}
@Test
public void testQueryConsumerStatus() throws Exception {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testCreateAndUpdateSubscriptionGroupConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.createAndUpdateSubscriptionGroupConfig(brokerAddr, new SubscriptionGroupConfig());
}
@Test
public void testExamineSubscriptionGroupConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
RemotingCommand response1 = RemotingCommand.createResponseCommand(null);
RemotingCommand response2 = RemotingCommand.createResponseCommand(null);
response2.setCode(ResponseCode.SUCCESS);
response2.setBody(RemotingSerializable.encode(MockObjectUtil.createSubscriptionGroupWrapper()));
when(remotingClient.invokeSync(anyString(), any(), anyLong()))
.thenThrow(new RuntimeException("invokeSync exception"))
.thenReturn(response1).thenReturn(response2);
}
// invokeSync exception
try {
mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "topic_test");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "invokeSync exception");
}
// responseCode is not success
try {
mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(MQBrokerException.class);
assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1);
}
// GET_ALL_SUBSCRIPTIONGROUP_CONFIG success
SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test");
Assert.assertEquals(subscriptionGroupConfig.getGroupName(), "group_test");
}
@Test
public void testExamineTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
RemotingCommand response1 = RemotingCommand.createResponseCommand(null);
RemotingCommand response2 = RemotingCommand.createResponseCommand(null);
response2.setCode(ResponseCode.SUCCESS);
response2.setBody(RemotingSerializable.encode(MockObjectUtil.createTopicConfigWrapper()));
when(remotingClient.invokeSync(anyString(), any(), anyLong()))
.thenThrow(new RuntimeException("invokeSync exception"))
.thenReturn(response1).thenReturn(response2);
}
// invokeSync exception
try {
mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "invokeSync exception");
}
// responseCode is not success
try {
mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(MQBrokerException.class);
assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1);
}
// GET_ALL_TOPIC_CONFIG success
TopicConfig topicConfig = mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
Assert.assertEquals(topicConfig.getTopicName(), "topic_test");
}
@Test
public void testExamineTopicStats() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineTopicStats(anyString())).thenReturn(MockObjectUtil.createTopicStatsTable());
}
TopicStatsTable topicStatsTable = mqAdminExtImpl.examineTopicStats("topic_test");
Assert.assertNotNull(topicStatsTable);
Assert.assertEquals(topicStatsTable.getOffsetTable().size(), 1);
}
@Test
public void testExamineAllTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testFetchAllTopicList() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(new TopicList());
}
TopicList topicList = mqAdminExtImpl.fetchAllTopicList();
Assert.assertNotNull(topicList);
}
@Test
public void testFetchBrokerRuntimeStats() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchBrokerRuntimeStats(anyString())).thenReturn(new KVTable());
}
KVTable kvTable = mqAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr);
Assert.assertNotNull(kvTable);
}
@Test
public void testExamineConsumeStats() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineConsumeStats(anyString())).thenReturn(MockObjectUtil.createConsumeStats());
when(defaultMQAdminExt.examineConsumeStats(anyString(), anyString())).thenReturn(MockObjectUtil.createConsumeStats());
}
ConsumeStats consumeStats = mqAdminExtImpl.examineConsumeStats("group_test");
ConsumeStats consumeStatsWithTopic = mqAdminExtImpl.examineConsumeStats("group_test", "topic_test");
Assert.assertNotNull(consumeStats);
Assert.assertEquals(consumeStats.getOffsetTable().size(), 2);
Assert.assertNotNull(consumeStatsWithTopic);
Assert.assertEquals(consumeStatsWithTopic.getOffsetTable().size(), 2);
}
@Test
public void testExamineBrokerClusterInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(MockObjectUtil.createClusterInfo());
}
ClusterInfo clusterInfo = mqAdminExtImpl.examineBrokerClusterInfo();
Assert.assertNotNull(clusterInfo);
Assert.assertEquals(clusterInfo.getBrokerAddrTable().size(), 1);
Assert.assertEquals(clusterInfo.getClusterAddrTable().size(), 1);
}
@Test
public void testExamineTopicRouteInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineTopicRouteInfo(anyString())).thenReturn(MockObjectUtil.createTopicRouteData());
}
TopicRouteData topicRouteData = mqAdminExtImpl.examineTopicRouteInfo("topic_test");
Assert.assertNotNull(topicRouteData);
}
@Test
public void testExamineConsumerConnectionInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(new ConsumerConnection());
}
ConsumerConnection consumerConnection = mqAdminExtImpl.examineConsumerConnectionInfo("group_test");
Assert.assertNotNull(consumerConnection);
}
@Test
public void testExamineProducerConnectionInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineProducerConnectionInfo(anyString(), anyString())).thenReturn(new ProducerConnection());
}
ProducerConnection producerConnection = mqAdminExtImpl.examineProducerConnectionInfo("group_test", "topic_test");
Assert.assertNotNull(producerConnection);
}
@Test
public void testGetNameServerAddressList() {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getNameServerAddressList()).thenReturn(Lists.asList("127.0.0.1:9876", new String[] {"127.0.0.2:9876"}));
}
List<String> list = mqAdminExtImpl.getNameServerAddressList();
Assert.assertEquals(list.size(), 2);
}
@Test
public void testWipeWritePermOfBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.wipeWritePermOfBroker(anyString(), anyString())).thenReturn(6);
}
int result = mqAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "broker-a");
Assert.assertEquals(result, 6);
}
@Test
public void testPutKVConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).putKVConfig(anyString(), anyString(), anyString());
}
mqAdminExtImpl.putKVConfig("namespace", "key", "value");
}
@Test
public void testGetKVConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getKVConfig(anyString(), anyString())).thenReturn("value");
}
String value = mqAdminExtImpl.getKVConfig("namespace", "key");
Assert.assertEquals(value, "value");
}
@Test
public void testGetKVListByNamespace() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getKVListByNamespace(anyString())).thenReturn(new KVTable());
}
KVTable kvTable = mqAdminExtImpl.getKVListByNamespace("namespace");
Assert.assertNotNull(kvTable);
}
@Test
public void testDeleteTopicInBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteTopicInBroker(any(), anyString());
}
mqAdminExtImpl.deleteTopicInBroker(Sets.newHashSet("127.0.0.1:10911"), "topic_test");
}
@Test
public void testDeleteTopicInNameServer() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteTopicInNameServer(any(), anyString());
}
mqAdminExtImpl.deleteTopicInNameServer(Sets.newHashSet("127.0.0.1:9876", "127.0.0.2:9876"), "topic_test");
}
@Test
public void testDeleteSubscriptionGroup() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteSubscriptionGroup(anyString(), anyString());
}
mqAdminExtImpl.deleteSubscriptionGroup(brokerAddr, "group_test");
}
@Test
public void testCreateAndUpdateKvConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).createAndUpdateKvConfig(anyString(), anyString(), anyString());
}
mqAdminExtImpl.createAndUpdateKvConfig("namespace", "key", "value");
}
@Test
public void testDeleteKvConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteKvConfig(anyString(), anyString());
}
mqAdminExtImpl.deleteKvConfig("namespace", "key");
}
@Test
public void testDeleteConsumerOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testResetOffsetByTimestampOld() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean())).thenReturn(new ArrayList<RollbackStats>());
}
List<RollbackStats> stats = mqAdminExtImpl.resetOffsetByTimestampOld("group_test", "topic_test", 1628495765398L, false);
Assert.assertNotNull(stats);
}
@Test
public void testResetOffsetByTimestamp() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean())).thenReturn(new HashMap<MessageQueue, Long>());
}
Map<MessageQueue, Long> map = mqAdminExtImpl.resetOffsetByTimestamp("group_test", "topic_test", 1628495765398L, false);
Assert.assertNotNull(map);
}
@Test
public void testResetOffsetNew() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).resetOffsetNew(anyString(), anyString(), anyLong());
}
mqAdminExtImpl.resetOffsetNew("group_test", "topic_test", 1628495765398L);
}
@Test
public void testGetConsumeStatus() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getConsumeStatus(anyString(), anyString(), anyString())).thenReturn(new HashMap<String, Map<MessageQueue, Long>>());
}
mqAdminExtImpl.getConsumeStatus("topic_test", "group_test", "");
}
@Test
public void testCreateOrUpdateOrderConf() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).createOrUpdateOrderConf(anyString(), anyString(), anyBoolean());
}
mqAdminExtImpl.createOrUpdateOrderConf("key", "value", false);
}
@Test
public void testQueryTopicConsumeByWho() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(new GroupList());
}
GroupList groupList = mqAdminExtImpl.queryTopicConsumeByWho("topic_test");
Assert.assertNotNull(groupList);
}
@Test
public void testCleanExpiredConsumerQueue() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.cleanExpiredConsumerQueue(anyString())).thenReturn(true);
}
boolean result = mqAdminExtImpl.cleanExpiredConsumerQueue("DefaultCluster");
Assert.assertEquals(result, true);
}
@Test
public void testCleanExpiredConsumerQueueByAddr() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(anyString())).thenReturn(true);
}
boolean result = mqAdminExtImpl.cleanExpiredConsumerQueueByAddr("DefaultCluster");
Assert.assertEquals(result, true);
}
@Test
public void testGetConsumerRunningInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())).thenReturn(new ConsumerRunningInfo());
}
ConsumerRunningInfo consumerRunningInfo = mqAdminExtImpl.getConsumerRunningInfo("group_test", "", true);
Assert.assertNotNull(consumerRunningInfo);
}
@Test
public void testConsumeMessageDirectly() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString())).thenReturn(new ConsumeMessageDirectlyResult());
when(defaultMQAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString())).thenReturn(new ConsumeMessageDirectlyResult());
}
ConsumeMessageDirectlyResult result1 = mqAdminExtImpl.consumeMessageDirectly("group_test", "", "7F000001ACC018B4AAC2116AF6500000");
ConsumeMessageDirectlyResult result2 = mqAdminExtImpl.consumeMessageDirectly("group_test", "", "topic_test", "7F000001ACC018B4AAC2116AF6500000");
Assert.assertNotNull(result1);
Assert.assertNotNull(result2);
}
@Test
public void testMessageTrackDetail() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.messageTrackDetail(any())).thenReturn(new ArrayList<MessageTrack>());
}
List<MessageTrack> tracks = mqAdminExtImpl.messageTrackDetail(new MessageExt());
Assert.assertNotNull(tracks);
}
@Test
public void testCloneGroupOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).cloneGroupOffset(anyString(), anyString(), anyString(), anyBoolean());
}
mqAdminExtImpl.cloneGroupOffset("group_test", "group_test1", "topic_test", false);
}
@Test
public void testCreateTopic() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt());
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt());
}
mqAdminExtImpl.createTopic("key", "topic_test", 8);
mqAdminExtImpl.createTopic("key", "topic_test", 8, 1);
}
@Test
public void testSearchOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.searchOffset(any(), anyLong())).thenReturn(Long.MAX_VALUE);
}
long offset = mqAdminExtImpl.searchOffset(new MessageQueue(), 1628495765398L);
Assert.assertEquals(offset, Long.MAX_VALUE);
}
@Test
public void testMaxOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.maxOffset(any())).thenReturn(Long.MAX_VALUE);
}
long offset = mqAdminExtImpl.maxOffset(new MessageQueue());
Assert.assertEquals(offset, Long.MAX_VALUE);
}
@Test
public void testMinOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.minOffset(any())).thenReturn(Long.MIN_VALUE);
}
long offset = mqAdminExtImpl.minOffset(new MessageQueue());
Assert.assertEquals(offset, Long.MIN_VALUE);
}
@Test
public void testEarliestMsgStoreTime() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.earliestMsgStoreTime(any())).thenReturn(1628495765398L);
}
long storeTime = mqAdminExtImpl.earliestMsgStoreTime(new MessageQueue());
Assert.assertEquals(storeTime, 1628495765398L);
}
@Test
public void testViewMessage() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.viewMessage(anyString())).thenReturn(new MessageExt());
}
MessageExt messageExt = mqAdminExtImpl.viewMessage("7F000001ACC018B4AAC2116AF6500000");
Assert.assertNotNull(messageExt);
}
@Test
public void testQueryMessage() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.queryMessage(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(mock(QueryResult.class));
}
QueryResult result = mqAdminExtImpl.queryMessage("topic_test", "key", 32, 1627804565000L, System.currentTimeMillis());
Assert.assertNotNull(result);
}
@Test
public void testStart() {
assertNotNull(mqAdminExtImpl);
try {
mqAdminExtImpl.start();
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalStateException);
}
}
@Test
public void testShutdown() {
assertNotNull(mqAdminExtImpl);
try {
mqAdminExtImpl.shutdown();
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalStateException);
}
}
@Test
public void testQueryConsumeTimeSpan() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(new ArrayList<QueueTimeSpan>());
}
List<QueueTimeSpan> timeSpans = mqAdminExtImpl.queryConsumeTimeSpan("topic_test", "group_test");
Assert.assertNotNull(timeSpans);
}
@Test
public void testViewMessage2() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl()).thenReturn(mock(MQAdminImpl.class));
when(defaultMQAdminExt.viewMessage(anyString())).thenThrow(new RuntimeException("viewMessage exception"));
}
mqAdminExtImpl.viewMessage("topic_test", "7F000001ACC018B4AAC2116AF6500000");
}
@Test
public void testGetBrokerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getBrokerConfig(anyString())).thenReturn(new Properties());
}
Properties brokerConfig = mqAdminExtImpl.getBrokerConfig(brokerAddr);
Assert.assertNotNull(brokerConfig);
}
@Test
public void testFetchTopicsByCLuster() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchTopicsByCLuster(anyString())).thenReturn(new TopicList());
}
TopicList topicList = mqAdminExtImpl.fetchTopicsByCLuster("DefaultCluster");
Assert.assertNotNull(topicList);
}
@Test
public void testCleanUnusedTopic() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.cleanUnusedTopic(anyString())).thenReturn(true);
when(defaultMQAdminExt.cleanUnusedTopicByAddr(anyString())).thenReturn(true);
}
Boolean result1 = mqAdminExtImpl.cleanUnusedTopic("DefaultCluster");
Boolean result2 = mqAdminExtImpl.cleanUnusedTopic(brokerAddr);
Assert.assertEquals(result1, true);
Assert.assertEquals(result2, true);
}
@Test
public void testViewBrokerStatsData() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.viewBrokerStatsData(anyString(), anyString(), anyString())).thenReturn(new BrokerStatsData());
}
BrokerStatsData brokerStatsData = mqAdminExtImpl.viewBrokerStatsData(brokerAddr, BrokerStatsManager.TOPIC_PUT_NUMS, "topic_test");
Assert.assertNotNull(brokerStatsData);
}
@Test
public void testGetClusterList() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getClusterList(anyString())).thenReturn(new HashSet<>());
}
Set<String> clusterList = mqAdminExtImpl.getClusterList("topic_test");
Assert.assertNotNull(clusterList);
}
@Test
public void testFetchConsumeStatsInBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchConsumeStatsInBroker(anyString(), anyBoolean(), anyLong())).thenReturn(new ConsumeStatsList());
}
ConsumeStatsList consumeStatsList = mqAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, false, System.currentTimeMillis());
Assert.assertNotNull(consumeStatsList);
}
@Test
public void testGetTopicClusterList() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchTopicsByCLuster(anyString())).thenReturn(new TopicList());
}
TopicList topicList = mqAdminExtImpl.fetchTopicsByCLuster("DefaultCluster");
Assert.assertNotNull(topicList);
}
@Test
public void testGetAllSubscriptionGroup() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(new SubscriptionGroupWrapper());
}
SubscriptionGroupWrapper wrapper = mqAdminExtImpl.getAllSubscriptionGroup(brokerAddr, 5000L);
Assert.assertNotNull(wrapper);
}
@Test
public void testGetAllTopicGroup() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getAllTopicGroup(anyString(), anyLong())).thenReturn(new TopicConfigSerializeWrapper());
}
TopicConfigSerializeWrapper wrapper = mqAdminExtImpl.getAllTopicGroup(brokerAddr, 5000L);
Assert.assertNotNull(wrapper);
}
@Test
public void testUpdateConsumeOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).updateConsumeOffset(anyString(), anyString(), any(), anyLong());
}
mqAdminExtImpl.updateConsumeOffset(brokerAddr, "group_test", new MessageQueue(), 10000L);
}
@Test
public void testUpdateNameServerConfig() {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testGetNameServerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.getNameServerConfig(new ArrayList<>()));
}
@Test
public void testQueryConsumeQueue() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.queryConsumeQueue(brokerAddr, "topic_test", 2, 1, 10, "group_test"));
}
@Test
public void testResumeCheckHalfMessage() throws Exception {
assertNotNull(mqAdminExtImpl);
Assert.assertFalse(mqAdminExtImpl.resumeCheckHalfMessage("7F000001ACC018B4AAC2116AF6500000"));
Assert.assertFalse(mqAdminExtImpl.resumeCheckHalfMessage("topic_test", "7F000001ACC018B4AAC2116AF6500000"));
}
}

View File

@@ -0,0 +1,50 @@
package org.apache.rocketmq.console.config;
import java.io.File;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.boot.web.server.ErrorPage;
import org.springframework.boot.web.server.ErrorPageRegistrar;
import org.springframework.boot.web.server.ErrorPageRegistry;
public class RMQConfigureTest {
private RMQConfigure rmqConfigure = new RMQConfigure();
@Test
public void testSet() {
rmqConfigure.setAccessKey("rocketmq");
rmqConfigure.setSecretKey("12345678");
rmqConfigure.setDataPath("/tmp/rocketmq-console/data/test");
rmqConfigure.setEnableDashBoardCollect("true");
rmqConfigure.setIsVIPChannel("true");
rmqConfigure.setUseTLS(true);
rmqConfigure.setLoginRequired(true);
rmqConfigure.setMsgTrackTopicName(null);
rmqConfigure.setNamesrvAddr("127.0.0.1:9876");
}
@Test
public void testGet() {
testSet();
Assert.assertEquals(rmqConfigure.getAccessKey(), "rocketmq");
Assert.assertEquals(rmqConfigure.getSecretKey(), "12345678");
Assert.assertTrue(rmqConfigure.isACLEnabled());
Assert.assertTrue(rmqConfigure.isUseTLS());
Assert.assertEquals(rmqConfigure.getConsoleCollectData(), "/tmp/rocketmq-console/data/test" + File.separator + "dashboard");
Assert.assertEquals(rmqConfigure.getRocketMqConsoleDataPath(), "/tmp/rocketmq-console/data/test");
Assert.assertEquals(rmqConfigure.getIsVIPChannel(), "true");
Assert.assertTrue(rmqConfigure.isEnableDashBoardCollect());
Assert.assertTrue(rmqConfigure.isLoginRequired());
Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC);
Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876");
ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar();
registrar.registerErrorPages(new ErrorPageRegistry() {
@Override
public void addErrorPages(ErrorPage... errorPages) {
}
});
}
}

View File

@@ -21,12 +21,22 @@ import java.util.HashSet;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.console.interceptor.AuthInterceptor;
import org.apache.rocketmq.console.service.impl.LoginServiceImpl;
import org.apache.rocketmq.console.service.impl.ProducerServiceImpl;
import org.apache.rocketmq.console.support.GlobalExceptionHandler;
import org.apache.rocketmq.console.support.GlobalRestfulResponseBodyAdvice;
import org.apache.rocketmq.console.util.MyPrintingResultHandler;
import org.apache.rocketmq.console.util.WebUtil;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Spy;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.anyString;
@@ -42,9 +52,33 @@ public class ProducerControllerTest extends BaseControllerTest {
@Spy
private ProducerServiceImpl producerService;
@Override protected MockMvc createMockMvc() {
AuthInterceptor authInterceptor = new AuthInterceptor();
ReflectionTestUtils.setField(authInterceptor, "loginService", new LoginServiceImpl());
MockMvc innerMockMvc = MockMvcBuilders.standaloneSetup(getTestController())
.addInterceptors(authInterceptor)
.alwaysDo(MyPrintingResultHandler.me())
.setControllerAdvice(new GlobalExceptionHandler(), new GlobalRestfulResponseBodyAdvice())
.build();
this.mockMvc = innerMockMvc;
return innerMockMvc;
}
@Before
public void init(){
createMockMvc();
}
@Test
public void testProducerConnection() throws Exception {
final String url = "/producer/producerConnection.query";
// user not login request will redirect
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("producerGroup", "producer_test")
.param("topic", "topic_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().is3xxRedirection());
// user login
{
ProducerConnection producerConnection = new ProducerConnection();
HashSet<Connection> connections = new HashSet<>();
@@ -58,10 +92,8 @@ public class ProducerControllerTest extends BaseControllerTest {
.thenThrow(new MQClientException("Not found the producer group connection", null))
.thenReturn(producerConnection);
}
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("producerGroup", "producer_test")
.param("topic", "topic_test");
// 1、no connection
// 1、no producer connection
requestBuilder.sessionAttr(WebUtil.USER_NAME, "admin");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$").exists())
@@ -79,7 +111,8 @@ public class ProducerControllerTest extends BaseControllerTest {
.andExpect(jsonPath("$.data.connectionSet[0].clientId").value("clientId"));
}
@Override protected Object getTestController() {
@Override
protected Object getTestController() {
return producerController;
}
}

View File

@@ -0,0 +1,177 @@
package org.apache.rocketmq.console.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.console.BaseTest;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.service.impl.DashboardCollectServiceImpl;
import org.apache.rocketmq.console.util.JsonUtil;
import org.apache.rocketmq.console.util.MockObjectUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
public class DashboardCollectTaskTest extends BaseTest {
@Spy
private DashboardCollectTask dashboardCollectTask;
@Spy
private DashboardCollectServiceImpl dashboardCollectService;
@Mock
private MQAdminExt mqAdminExt;
@Mock
private RMQConfigure rmqConfigure;
private int taskExecuteNum = 10;
private File brokerFile;
private File topicFile;
@Before
public void init() throws Exception {
MockitoAnnotations.initMocks(this);
when(rmqConfigure.getConsoleCollectData()).thenReturn("/tmp/rocketmq-console/test/data");
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
String dataLocationPath = rmqConfigure.getConsoleCollectData();
DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
String nowDateStr = format.format(new Date());
brokerFile = new File(dataLocationPath + nowDateStr + ".json");
topicFile = new File(dataLocationPath + nowDateStr + "_topic" + ".json");
autoInjection();
}
@Test
public void testCollectTopic() throws Exception {
// enableDashBoardCollect = false
when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(false);
dashboardCollectTask.collectTopic();
{
TopicList topicList = new TopicList();
Set<String> topicSet = new HashSet<>();
topicSet.add("topic_test");
topicSet.add("%RETRY%group_test");
topicSet.add("%DLQ%group_test");
topicList.setTopicList(topicSet);
when(mqAdminExt.fetchAllTopicList())
.thenThrow(new RuntimeException("fetchAllTopicList exception"))
.thenReturn(topicList);
TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData();
when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData);
GroupList list = new GroupList();
list.setGroupList(Sets.newHashSet("group_test"));
when(mqAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(list);
BrokerStatsData brokerStatsData = MockObjectUtil.createBrokerStatsData();
when(mqAdminExt.viewBrokerStatsData(anyString(), anyString(), anyString()))
.thenThrow(new RuntimeException("viewBrokerStatsData TOPIC_PUT_NUMS exception"))
.thenThrow(new RuntimeException("viewBrokerStatsData GROUP_GET_NUMS exception"))
.thenReturn(brokerStatsData);
when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(true);
}
// fetchAllTopicList exception
try {
dashboardCollectTask.collectTopic();
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "fetchAllTopicList exception");
}
for (int i = 0; i < taskExecuteNum; i++) {
dashboardCollectTask.collectTopic();
}
LoadingCache<String, List<String>> map = dashboardCollectService.getTopicMap();
Assert.assertEquals(map.size(), 1);
Assert.assertEquals(map.get("topic_test").size(), taskExecuteNum);
dashboardCollectTask.saveData();
Assert.assertEquals(topicFile.exists(), true);
Map<String, List<String>> topicData =
JsonUtil.string2Obj(MixAll.file2String(topicFile),
new TypeReference<Map<String, List<String>>>() {
});
Assert.assertEquals(topicData.get("topic_test").size(), taskExecuteNum);
}
@Test
public void testCollectBroker() throws Exception {
// enableDashBoardCollect = false
when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(false);
dashboardCollectTask.collectBroker();
{
HashMap<String, String> result = new HashMap<>();
result.put("getTotalTps", "0.0 0.033330000333300004 0.03332972261338355");
result.put("commitLogMinOffset", "0");
KVTable kvTable = new KVTable();
kvTable.setTable(result);
when(mqAdminExt.fetchBrokerRuntimeStats(anyString()))
.thenThrow(new RuntimeException("fetchBrokerRuntimeStats exception"))
.thenReturn(kvTable);
when(rmqConfigure.isEnableDashBoardCollect()).thenReturn(true);
}
// fetchBrokerRuntimeStats exception
try {
dashboardCollectTask.collectBroker();
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "fetchBrokerRuntimeStats exception");
}
for (int i = 0; i < taskExecuteNum; i++) {
dashboardCollectTask.collectBroker();
}
LoadingCache<String, List<String>> map = dashboardCollectService.getBrokerMap();
Assert.assertEquals(map.size(), 1);
Assert.assertEquals(map.get("broker-a" + ":" + MixAll.MASTER_ID).size(), taskExecuteNum);
mockBrokerFileExistBeforeSaveData();
dashboardCollectTask.saveData();
Assert.assertEquals(brokerFile.exists(), true);
Map<String, List<String>> brokerData =
JsonUtil.string2Obj(MixAll.file2String(brokerFile),
new TypeReference<Map<String, List<String>>>() {
});
Assert.assertEquals(brokerData.get("broker-a" + ":" + MixAll.MASTER_ID).size(), taskExecuteNum + 2);
}
@After
public void after() {
if (brokerFile != null && brokerFile.exists()) {
brokerFile.delete();
}
if (topicFile != null && topicFile.exists()) {
topicFile.delete();
}
}
private void mockBrokerFileExistBeforeSaveData() throws Exception {
Map<String, List<String>> map = new HashMap<>();
map.put("broker-a" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"}));
map.put("broker-b" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"}));
MixAll.string2File(JsonUtil.obj2String(map), brokerFile.getAbsolutePath());
}
}

View File

@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
@@ -39,7 +40,8 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
@@ -47,6 +49,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
@@ -141,6 +144,17 @@ public class MockObjectUtil {
return wrapper;
}
public static TopicConfigSerializeWrapper createTopicConfigWrapper() {
TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper();
TopicConfig config = new TopicConfig();
config.setTopicName("topic_test");
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap(2);
topicConfigTable.put("topic_test", config);
wrapper.setTopicConfigTable(topicConfigTable);
wrapper.setDataVersion(new DataVersion());
return wrapper;
}
public static ConsumerConnection createConsumerConnection() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connections = new HashSet<Connection>();
@@ -259,4 +273,26 @@ public class MockObjectUtil {
.append("true").append(TraceConstants.FIELD_SPLITOR);
return sb.toString();
}
public static BrokerStatsData createBrokerStatsData() {
BrokerStatsData brokerStatsData = new BrokerStatsData();
BrokerStatsItem statsDay = new BrokerStatsItem();
statsDay.setAvgpt(100.0);
statsDay.setSum(10000L);
statsDay.setTps(100.0);
brokerStatsData.setStatsDay(statsDay);
BrokerStatsItem statsHour = new BrokerStatsItem();
statsHour.setAvgpt(10.0);
statsHour.setSum(100L);
statsHour.setTps(100.0);
brokerStatsData.setStatsHour(statsHour);
BrokerStatsItem statsMinute = new BrokerStatsItem();
statsMinute.setAvgpt(10.0);
statsMinute.setSum(100L);
statsMinute.setTps(100.0);
brokerStatsData.setStatsMinute(statsMinute);
return brokerStatsData;
}
}

View File

@@ -48,6 +48,8 @@ public class MsgTraceDecodeUtilTest {
@Test
public void testDecodePubTraceMessage() {
List<TraceContext> contexts = MsgTraceDecodeUtil.decoderFromTraceDataString(null);
Assert.assertEquals(contexts.size(), 0);
String pubTraceData_V1 = new String(pubTraceDataBase);
List<TraceContext> traceContextListV1 = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_V1);
Assert.assertEquals(traceContextListV1.size(), 1);
@@ -96,6 +98,21 @@ public class MsgTraceDecodeUtilTest {
Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getOffsetMsgId(), "0A741D02000078BF000000000132F7C9");
Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911");
Assert.assertEquals(traceContextListV4.get(0).getTraceBeans().get(0).getClientHost(), "10.10.10.11");
String pubTraceData_default = new StringBuilder(pubTraceDataBase)
.append("0A741D02000078BF000000000132F7C9").append(TraceConstants.CONTENT_SPLITOR)
.append("true").append(TraceConstants.CONTENT_SPLITOR)
.append("10.10.10.11").append(TraceConstants.CONTENT_SPLITOR)
.append("10.10.10.11").append(TraceConstants.CONTENT_SPLITOR)
.toString();
List<TraceContext> traceContextList = MsgTraceDecodeUtil.decoderFromTraceDataString(pubTraceData_default);
Assert.assertEquals(traceContextList.size(), 1);
Assert.assertEquals(traceContextList.get(0).getTraceType().toString(), "Pub");
Assert.assertEquals(traceContextList.get(0).isSuccess(), true);
Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc6980189");
Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getOffsetMsgId(), "0A741D02000078BF000000000132F7C9");
Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getStoreHost(), "10.10.10.10:30911");
Assert.assertEquals(traceContextList.get(0).getTraceBeans().get(0).getClientHost(), "10.10.10.11");
}
@Test
@@ -134,5 +151,18 @@ public class MsgTraceDecodeUtilTest {
Assert.assertEquals(traceContextListV3.get(1).isSuccess(), false);
Assert.assertEquals(traceContextListV3.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f");
Assert.assertEquals(traceContextListV3.get(1).getGroupName(), "test_consumer_group");
String subTraceData_default = new StringBuilder(subTraceDataBase)
.append("4").append(TraceConstants.CONTENT_SPLITOR)
.append("1614666740499").append(TraceConstants.CONTENT_SPLITOR)
.append("test_consumer_group").append(TraceConstants.CONTENT_SPLITOR)
.append("test_consumer_group").append(TraceConstants.CONTENT_SPLITOR)
.toString();
List<TraceContext> traceContextList = MsgTraceDecodeUtil.decoderFromTraceDataString(subTraceData_default);
Assert.assertEquals(traceContextList.size(), 2);
Assert.assertEquals(traceContextList.get(1).getTraceType().toString(), "SubAfter");
Assert.assertEquals(traceContextList.get(1).isSuccess(), false);
Assert.assertEquals(traceContextList.get(1).getTraceBeans().get(0).getMsgId(), "0A741C02622500000000080cc698003f");
Assert.assertEquals(traceContextList.get(1).getGroupName(), "test_consumer_group");
}
}