mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2025-09-10 19:48:29 +08:00
Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
@@ -33,7 +33,6 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.trace.TraceType;
|
||||
import org.apache.rocketmq.common.Pair;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.common.topic.TopicValidator;
|
||||
import org.apache.rocketmq.console.config.RMQConfigure;
|
||||
import org.apache.rocketmq.console.model.MessageTraceView;
|
||||
import org.apache.rocketmq.console.model.trace.ProducerNode;
|
||||
|
@@ -63,6 +63,7 @@ public abstract class BaseControllerTest extends BaseTest {
|
||||
when(configure.getSecretKey()).thenReturn("rocketmq");
|
||||
when(configure.getNamesrvAddr()).thenReturn("127.0.0.1:9876");
|
||||
when(configure.isACLEnabled()).thenReturn(true);
|
||||
when(configure.isUseTLS()).thenReturn(false);
|
||||
}
|
||||
|
||||
protected ResultActions performOkExpect(ResultActions perform) throws Exception {
|
||||
|
@@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
|
||||
@@ -108,7 +109,7 @@ public class TopicControllerTest extends BaseControllerTest {
|
||||
when(defaultMQProducer.getmQClientFactory()).thenReturn(mqClientInstance);
|
||||
when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPIImpl);
|
||||
when(mqClientAPIImpl.getSystemTopicList(anyLong())).thenReturn(sysTopicList);
|
||||
when(topicService.buildDefaultMQProducer(anyString(), any(), anyBoolean())).thenReturn(producer);
|
||||
doReturn(producer).when(topicService).buildDefaultMQProducer(anyString(), any(), anyBoolean());
|
||||
}
|
||||
final String url = "/topic/list.query";
|
||||
|
||||
@@ -246,7 +247,7 @@ public class TopicControllerTest extends BaseControllerTest {
|
||||
SendResult result = new SendResult(SendStatus.SEND_OK, "7F000001E41A2E5D6D978B82C20F003D",
|
||||
"0A8E83C300002A9F00000000000013D3", new MessageQueue(), 1000L);
|
||||
when(producer.send(any(Message.class))).thenReturn(result);
|
||||
when(topicService.buildDefaultMQProducer(anyString(), any(), anyBoolean())).thenReturn(producer);
|
||||
doReturn(producer).when(topicService).buildDefaultMQProducer(anyString(), any(), anyBoolean());
|
||||
}
|
||||
|
||||
SendTopicMessageRequest request = new SendTopicMessageRequest();
|
||||
|
@@ -1,61 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
|
||||
import org.apache.rocketmq.console.service.ClusterService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.apache.rocketmq.console.testbase.TestConstant.BROKER_ADDRESS;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class ClusterServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void list() throws Exception {
|
||||
Map<String, Object> clusterMap = clusterService.list();
|
||||
ClusterInfo clusterInfo = (ClusterInfo)clusterMap.get("clusterInfo");
|
||||
Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* brokerDetail */>> brokerServerMap = (Map<String, Map<Long, Object>>)clusterMap.get("brokerServer");
|
||||
Assert.assertNotNull(clusterInfo);
|
||||
Assert.assertNotNull(brokerServerMap);
|
||||
Assert.assertNotNull(clusterInfo.getBrokerAddrTable().get(TestConstant.TEST_BROKER_NAME));
|
||||
Assert.assertNotNull(brokerServerMap.get(TestConstant.TEST_BROKER_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getBrokerConfig() throws Exception {
|
||||
Properties properties = clusterService.getBrokerConfig(BROKER_ADDRESS);
|
||||
Assert.assertNotNull(properties);
|
||||
}
|
||||
|
||||
}
|
@@ -1,182 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.rocketmq.common.protocol.body.Connection;
|
||||
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
|
||||
import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
|
||||
import org.apache.rocketmq.console.model.GroupConsumeInfo;
|
||||
import org.apache.rocketmq.console.model.TopicConsumerInfo;
|
||||
import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
|
||||
import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
|
||||
import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class ConsumerServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
sendTestTopicMessage();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryGroupList() throws Exception {
|
||||
List<GroupConsumeInfo> consumeInfoList = consumerService.queryGroupList();
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(consumeInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryGroup() throws Exception {
|
||||
GroupConsumeInfo consumeInfo = consumerService.queryGroup(TEST_CONSUMER_GROUP);
|
||||
// if (consumeInfo.getCount() < 1) {
|
||||
// Thread.sleep(2000);
|
||||
// continue;
|
||||
// }
|
||||
Assert.assertNotNull(consumeInfo);
|
||||
Assert.assertEquals(consumeInfo.getGroup(), TEST_CONSUMER_GROUP);
|
||||
// Assert.assertTrue(consumeInfo.getCount() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumeStatsListByGroupName() throws Exception {
|
||||
// todo can't use if don't consume a message
|
||||
List<TopicConsumerInfo> topicConsumerInfoList = new RetryTempLate<List<TopicConsumerInfo>>() {
|
||||
@Override protected List<TopicConsumerInfo> process() throws Exception {
|
||||
return consumerService.queryConsumeStatsListByGroupName(TEST_CONSUMER_GROUP);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(topicConsumerInfoList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicConsumerInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumeStatsList() throws Exception {
|
||||
List<TopicConsumerInfo> topicConsumerInfoList = new RetryTempLate<List<TopicConsumerInfo>>() {
|
||||
@Override protected List<TopicConsumerInfo> process() throws Exception {
|
||||
return consumerService.queryConsumeStatsList(TEST_CONSOLE_TOPIC, TEST_CONSUMER_GROUP);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(topicConsumerInfoList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicConsumerInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumeStatsListByTopicName() throws Exception {
|
||||
Map<String, TopicConsumerInfo> consumeTopicMap = new RetryTempLate<Map<String, TopicConsumerInfo>>() {
|
||||
@Override protected Map<String, TopicConsumerInfo> process() throws Exception {
|
||||
return consumerService.queryConsumeStatsListByTopicName(TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(consumeTopicMap);
|
||||
Assert.assertNotNull(consumeTopicMap.get(TEST_CONSUMER_GROUP));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resetOffset() throws Exception {
|
||||
ResetOffsetRequest resetOffsetRequest = new ResetOffsetRequest();
|
||||
resetOffsetRequest.setConsumerGroupList(Lists.<String>newArrayList(TEST_CONSUMER_GROUP));
|
||||
resetOffsetRequest.setForce(true);
|
||||
resetOffsetRequest.setTopic(TEST_CONSOLE_TOPIC);
|
||||
resetOffsetRequest.setResetTime(System.currentTimeMillis() - 1000);
|
||||
Map<String /*consumerGroup*/, ConsumerGroupRollBackStat> consumerGroupRollBackStatMap = consumerService.resetOffset(resetOffsetRequest);
|
||||
Assert.assertNotNull(consumerGroupRollBackStatMap);
|
||||
Assert.assertNotNull(consumerGroupRollBackStatMap.get(TEST_CONSUMER_GROUP));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void examineSubscriptionGroupConfig() throws Exception {
|
||||
List<ConsumerConfigInfo> configInfoList = consumerService.examineSubscriptionGroupConfig(TEST_CONSUMER_GROUP);
|
||||
Assert.assertTrue(configInfoList.size() == 1);
|
||||
Assert.assertTrue(configInfoList.get(0).getSubscriptionGroupConfig().getGroupName().equals(TEST_CONSUMER_GROUP));
|
||||
Assert.assertTrue(configInfoList.get(0).getSubscriptionGroupConfig().getRetryQueueNums() == RETRY_QUEUE_NUMS);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteSubGroup() throws Exception {
|
||||
createAndUpdateSubscriptionGroupConfig();
|
||||
DeleteSubGroupRequest deleteSubGroupRequest = new DeleteSubGroupRequest();
|
||||
deleteSubGroupRequest.setBrokerNameList(Lists.<String>newArrayList(TestConstant.TEST_BROKER_NAME));
|
||||
deleteSubGroupRequest.setGroupName(TEST_CREATE_DELETE_CONSUMER_GROUP);
|
||||
Assert.assertTrue(consumerService.deleteSubGroup(deleteSubGroupRequest));
|
||||
List<ConsumerConfigInfo> groupConsumeInfoList = consumerService.examineSubscriptionGroupConfig(TEST_CREATE_DELETE_CONSUMER_GROUP);
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(groupConsumeInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createAndUpdateSubscriptionGroupConfig() throws Exception {
|
||||
ConsumerConfigInfo consumerConfigInfoForCreate = new ConsumerConfigInfo();
|
||||
BeanUtils.copyProperties(consumerConfigInfo, consumerConfigInfoForCreate);
|
||||
consumerConfigInfoForCreate.getSubscriptionGroupConfig().setGroupName(TEST_CREATE_DELETE_CONSUMER_GROUP);
|
||||
Assert.assertTrue(consumerService.createAndUpdateSubscriptionGroupConfig(consumerConfigInfoForCreate));
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(consumerService.examineSubscriptionGroupConfig(TEST_CREATE_DELETE_CONSUMER_GROUP)));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchBrokerNameSetBySubscriptionGroup() throws Exception {
|
||||
Set<String> xx = consumerService.fetchBrokerNameSetBySubscriptionGroup(TEST_CONSUMER_GROUP);
|
||||
Assert.assertTrue(xx.contains(TestConstant.TEST_BROKER_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConsumerConnection() throws Exception {
|
||||
ConsumerConnection consumerConnection = new RetryTempLate<ConsumerConnection>() {
|
||||
@Override protected ConsumerConnection process() throws Exception {
|
||||
return consumerService.getConsumerConnection(TEST_CONSUMER_GROUP);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(consumerConnection);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(consumerConnection.getConnectionSet()));
|
||||
Assert.assertTrue(Lists.transform(Lists.newArrayList(consumerConnection.getConnectionSet()), new Function<Connection, String>() {
|
||||
@Override
|
||||
public String apply(Connection input) {
|
||||
return input.getClientAddr().split(":")[0];
|
||||
}
|
||||
}).contains(TestConstant.BROKER_IP));
|
||||
}
|
||||
|
||||
}
|
@@ -1,62 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.console.service.DashboardCollectService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class DashboardCollectServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Resource
|
||||
private DashboardCollectService dashboardCollectService;
|
||||
|
||||
@Test
|
||||
public void getBrokerMap() throws Exception {
|
||||
Assert.assertNotNull(dashboardCollectService.getBrokerMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTopicMap() throws Exception {
|
||||
Assert.assertNotNull(dashboardCollectService.getTopicMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jsonDataFile2map() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getBrokerCache() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTopicCache() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -1,26 +0,0 @@
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import org.apache.rocketmq.console.config.RMQConfigure;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class LoginFileTest {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoad() throws Exception {
|
||||
RMQConfigure configure = new RMQConfigure();
|
||||
configure.setDataPath(this.getClass().getResource("/").getPath());
|
||||
|
||||
UserServiceImpl.FileBasedUserInfoStore fileBasedUserInfoStore = new UserServiceImpl.FileBasedUserInfoStore(configure);
|
||||
Assert.assertTrue("No exception raise for FileBasedUserInfoStore", true);
|
||||
}
|
||||
}
|
@@ -1,135 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.rocketmq.common.Pair;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.console.model.MessageView;
|
||||
import org.apache.rocketmq.console.service.MessageService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
||||
import org.apache.rocketmq.tools.admin.api.MessageTrack;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class MessageServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Resource
|
||||
private MessageService messageService;
|
||||
@Resource
|
||||
private MQAdminExt mqAdminExt;
|
||||
private String msgId;
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
msgId = sendTestTopicMessage().getMsgId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
@Test
|
||||
public void viewMessage() throws Exception {
|
||||
final String messageId = msgId;
|
||||
Pair<MessageView, List<MessageTrack>> messageViewListPair=new RetryTempLate<Pair<MessageView, List<MessageTrack>>>() {
|
||||
@Override protected Pair<MessageView, List<MessageTrack>> process() throws Exception {
|
||||
return messageService.viewMessage(TEST_CONSOLE_TOPIC,messageId);
|
||||
}
|
||||
}.execute(10,1000);
|
||||
|
||||
MessageView messageView = messageViewListPair.getObject1();
|
||||
Assert.assertEquals(messageView.getMessageBody(),TEST_TOPIC_MESSAGE_BODY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryMessageByTopicAndKey() throws Exception {
|
||||
final String messageId = msgId;
|
||||
List<MessageView> messageViewList=new RetryTempLate<List<MessageView>>() {
|
||||
@Override protected List<MessageView> process() throws Exception {
|
||||
return messageService.queryMessageByTopicAndKey(TEST_CONSOLE_TOPIC,TEST_TOPIC_MESSAGE_KEY);
|
||||
}
|
||||
}.execute(10,1000);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(messageViewList));
|
||||
Assert.assertTrue(Lists.transform(messageViewList, new Function<MessageView, String>() {
|
||||
@Override public String apply(MessageView input) {
|
||||
return input.getMsgId();
|
||||
}
|
||||
}).contains(messageId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryMessageByTopic() throws Exception {
|
||||
final String messageId = msgId;
|
||||
List<MessageView> messageViewList = null;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
messageViewList = messageService.queryMessageByTopic(TEST_CONSOLE_TOPIC, System.currentTimeMillis() - 100000, System.currentTimeMillis());
|
||||
if (CollectionUtils.isEmpty(messageViewList)) {
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(messageViewList));
|
||||
Assert.assertTrue(Lists.transform(messageViewList, new Function<MessageView, String>() {
|
||||
@Override public String apply(MessageView input) {
|
||||
return input.getMsgId();
|
||||
}
|
||||
}).contains(messageId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void messageTrackDetail() throws Exception {
|
||||
final String messageId = msgId;
|
||||
Pair<MessageView, List<MessageTrack>> messageViewListPair=new RetryTempLate<Pair<MessageView, List<MessageTrack>>>() {
|
||||
@Override protected Pair<MessageView, List<MessageTrack>> process() throws Exception {
|
||||
return messageService.viewMessage(TEST_CONSOLE_TOPIC,messageId);
|
||||
}
|
||||
}.execute(10,1000); // make the topic can be found
|
||||
final MessageExt messageExt = mqAdminExt.viewMessage(TEST_CONSOLE_TOPIC,messageId);
|
||||
Assert.assertNotNull(messageService.messageTrackDetail(messageExt));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumeMessageDirectly() throws Exception {
|
||||
// final String messageId = msgId;
|
||||
// ConsumeMessageDirectlyResult messageDirectlyResult=new RetryTempLate<ConsumeMessageDirectlyResult>() {
|
||||
// @Override protected ConsumeMessageDirectlyResult process() throws Exception {
|
||||
// return messageService.consumeMessageDirectly(TEST_CONSOLE_TOPIC,messageId,TEST_CONSUMER_GROUP,null);
|
||||
// }
|
||||
// }.execute(20,1000); // todo test
|
||||
}
|
||||
|
||||
|
||||
}
|
@@ -1,139 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.rocketmq.client.QueryResult;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.common.topic.TopicValidator;
|
||||
import org.apache.rocketmq.console.config.RMQConfigure;
|
||||
import org.apache.rocketmq.console.model.MessageTraceView;
|
||||
import org.apache.rocketmq.console.model.trace.MessageTraceGraph;
|
||||
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
||||
import org.assertj.core.util.Lists;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class MessageTraceServiceImplTest {
|
||||
|
||||
@InjectMocks
|
||||
private MessageTraceServiceImpl messageTraceService;
|
||||
@Mock
|
||||
private MQAdminExt mqAdminExt;
|
||||
@Mock
|
||||
private RMQConfigure rmqConfigure;
|
||||
|
||||
private static final String TEST_MESSAGE_ID = "7F0000016D48512DDF172E788E5E0000";
|
||||
private static final String FAKE_SUBSCRIPTION_GROUP = "CID_JODIE";
|
||||
private static final String TEST_KEY = "TEST_KEY";
|
||||
private static final String PUB_TRACE = "Pub\u00011625848452706\u0001DefaultRegion\u0001sendGroup\u0001" +
|
||||
"TopicTraceTest\u0001" + TEST_MESSAGE_ID + "\u0001TagA\u0001OrderID188\u0001" +
|
||||
"192.168.0.101:10911\u000111\u000117\u00010\u0001C0A8006500002A9F0000000000003866\u0001true\u0002";
|
||||
private static final String SUB_TRACE1 = "SubBefore\u00011625848452722\u0001null\u0001" + FAKE_SUBSCRIPTION_GROUP +
|
||||
"\u00017F0000016801512DDF172E788E720014\u0001" + TEST_MESSAGE_ID + "\u00010\u0001OrderID188\u0002";
|
||||
private static final String SUB_TRACE2 = "SubAfter\u00017F0000016801512DDF172E788E720014\u0001" + TEST_MESSAGE_ID +
|
||||
"\u000140\u0001true\u0001OrderID188\u00010\u0002";
|
||||
private static final String END_TRANSACTION_TRACE = "EndTransaction\u00011625913838389\u0001DefaultRegion\u0001" +
|
||||
FAKE_SUBSCRIPTION_GROUP + "\u0001TopicTraceTest\u0001" + TEST_MESSAGE_ID +
|
||||
"\u0001TagA\u0001OrderID188\u0001192.168.0.101:10911\u00012\u00017F000001ACFE512DDF17325DBAEA0000" +
|
||||
"\u0001UNKNOW\u0001true\u0002";
|
||||
private MessageExt fakeMessageExt;
|
||||
private MessageExt fakeMessageExt2;
|
||||
private MessageExt fakeMessageExt3;
|
||||
private MessageExt fakeMessageExt4;
|
||||
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
Mockito.when(rmqConfigure.getMsgTrackTopicNameOrDefault()).thenReturn(TopicValidator.RMQ_SYS_TRACE_TOPIC);
|
||||
fakeMessageExt = new MessageExt();
|
||||
fakeMessageExt.setKeys(Lists.newArrayList(TEST_KEY));
|
||||
fakeMessageExt.setBody(PUB_TRACE.getBytes(StandardCharsets.UTF_8));
|
||||
fakeMessageExt2 = new MessageExt();
|
||||
fakeMessageExt2.setKeys(Lists.newArrayList(TEST_KEY));
|
||||
fakeMessageExt2.setBody(SUB_TRACE1.getBytes(StandardCharsets.UTF_8));
|
||||
fakeMessageExt3 = new MessageExt();
|
||||
fakeMessageExt3.setKeys(Lists.newArrayList(TEST_KEY));
|
||||
fakeMessageExt3.setBody(SUB_TRACE2.getBytes(StandardCharsets.UTF_8));
|
||||
fakeMessageExt4 = new MessageExt();
|
||||
fakeMessageExt4.setKeys(Lists.newArrayList(TEST_KEY));
|
||||
fakeMessageExt4.setBody(END_TRANSACTION_TRACE.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SneakyThrows
|
||||
public void queryMessageTraceKeyTest() {
|
||||
List<MessageExt> messageTraceList = Lists.newArrayList(fakeMessageExt);
|
||||
QueryResult queryResult = new QueryResult(1, messageTraceList);
|
||||
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(queryResult);
|
||||
List<MessageTraceView> views = messageTraceService.queryMessageTraceKey(TEST_MESSAGE_ID);
|
||||
Assertions.assertEquals(1, views.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SneakyThrows
|
||||
public void queryMessageTraceKeyWithExceptionTest() {
|
||||
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenThrow(new RuntimeException());
|
||||
Assertions.assertThrows(RuntimeException.class, () -> messageTraceService.queryMessageTraceKey(TEST_MESSAGE_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SneakyThrows
|
||||
public void queryMessageTraceWithNoResultTest() {
|
||||
QueryResult queryResult = new QueryResult(1, Collections.emptyList());
|
||||
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(queryResult);
|
||||
MessageTraceGraph messageTraceGraph = messageTraceService.queryMessageTraceGraph(TEST_MESSAGE_ID);
|
||||
Assertions.assertNotNull(messageTraceGraph);
|
||||
Assertions.assertEquals(0, messageTraceGraph.getMessageTraceViews().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SneakyThrows
|
||||
public void queryMessageTraceTest() {
|
||||
List<MessageExt> messageTraceList = Lists.newArrayList(fakeMessageExt, fakeMessageExt2, fakeMessageExt3, fakeMessageExt4);
|
||||
QueryResult queryResult = new QueryResult(1, messageTraceList);
|
||||
Mockito.when(mqAdminExt.queryMessage(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(queryResult);
|
||||
MessageTraceGraph messageTraceGraph = messageTraceService.queryMessageTraceGraph(TEST_MESSAGE_ID);
|
||||
Assertions.assertNotNull(messageTraceGraph);
|
||||
Assertions.assertEquals(TEST_MESSAGE_ID, messageTraceGraph.getProducerNode().getMsgId());
|
||||
Assertions.assertEquals(1, messageTraceGraph.getSubscriptionNodeList().size());
|
||||
Assertions.assertEquals(FAKE_SUBSCRIPTION_GROUP, messageTraceGraph.getSubscriptionNodeList()
|
||||
.get(0).getSubscriptionGroup());
|
||||
Assertions.assertEquals(4, messageTraceGraph.getMessageTraceViews().size());
|
||||
Assertions.assertEquals(1, messageTraceGraph.getProducerNode().getTransactionNodeList().size());
|
||||
Assertions.assertEquals(LocalTransactionState.UNKNOW.name(),
|
||||
messageTraceGraph.getProducerNode().getTransactionNodeList().get(0).getTransactionState());
|
||||
for (MessageTraceView view : messageTraceGraph.getMessageTraceViews()) {
|
||||
Assertions.assertEquals(0, view.getRetryTimes());
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,55 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.console.service.MonitorService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class MonitorServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private MonitorService monitorService;
|
||||
@Test
|
||||
public void createOrUpdateConsumerMonitor() throws Exception {
|
||||
// monitorService.createOrUpdateConsumerMonitor()
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumerMonitorConfig() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumerMonitorConfigByGroupName() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteConsumerMonitor() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -1,66 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.console.service.OpsService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class OpsServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private OpsService opsService;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void homePageInfo() throws Exception {
|
||||
List<String> namesvrAddrList= (List<String>)opsService.homePageInfo().get("namesvrAddrList");
|
||||
Assert.assertTrue(namesvrAddrList.contains(TestConstant.NAME_SERVER_ADDRESS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateNameSvrAddrList() throws Exception {
|
||||
String testChangeNameSvrAddr = "110.110.100.110:1234";
|
||||
opsService.updateNameSvrAddrList(testChangeNameSvrAddr);
|
||||
Assert.assertEquals(opsService.getNameSvrList(),testChangeNameSvrAddr);
|
||||
|
||||
opsService.updateNameSvrAddrList(TestConstant.NAME_SERVER_ADDRESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getNameSvrList() throws Exception {
|
||||
Assert.assertEquals(opsService.getNameSvrList(),TestConstant.NAME_SERVER_ADDRESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rocketMqStatusCheck() throws Exception {
|
||||
// need enhance in milestone 2
|
||||
opsService.rocketMqStatusCheck();
|
||||
}
|
||||
|
||||
}
|
@@ -1,71 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.common.protocol.body.Connection;
|
||||
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
|
||||
import org.apache.rocketmq.console.service.ProducerService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class ProducerServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private ProducerService producerService;
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
sendTestTopicMessage().getMsgId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getProducerConnection() throws Exception {
|
||||
ProducerConnection producerConnection=new RetryTempLate<ProducerConnection>() {
|
||||
@Override protected ProducerConnection process() throws Exception {
|
||||
return producerService.getProducerConnection(TEST_PRODUCER_GROUP,TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
}.execute(10,1000);
|
||||
Assert.assertNotNull(producerConnection);
|
||||
Assert.assertTrue(Lists.transform(Lists.newArrayList(producerConnection.getConnectionSet()), new Function<Connection, String>() {
|
||||
@Override public String apply(Connection input) {
|
||||
return input.getClientAddr().split(":")[0];
|
||||
}
|
||||
}).contains(TestConstant.LOCAL_HOST));
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -1,175 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.TopicConfig;
|
||||
import org.apache.rocketmq.common.admin.TopicStatsTable;
|
||||
import org.apache.rocketmq.common.protocol.body.TopicList;
|
||||
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
|
||||
import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
|
||||
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
|
||||
import org.apache.rocketmq.console.service.TopicService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class TopicServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Resource
|
||||
private TopicService topicService;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
sendTestTopicMessage().getMsgId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchAllTopicList() throws Exception {
|
||||
TopicList topicList = topicService.fetchAllTopicList(true);
|
||||
Assert.assertNotNull(topicList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicList.getTopicList()));
|
||||
Assert.assertTrue(topicList.getTopicList().contains(TEST_CONSOLE_TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stats() throws Exception {
|
||||
TopicStatsTable topicStatsTable = topicService.stats(TEST_CONSOLE_TOPIC);
|
||||
Assert.assertNotNull(topicStatsTable );
|
||||
Assert.assertEquals(topicStatsTable.getOffsetTable().size(),READ_QUEUE_NUM);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void route() throws Exception {
|
||||
TopicRouteData topicRouteData = topicService.route(TEST_CONSOLE_TOPIC);
|
||||
Assert.assertNotNull(topicRouteData);
|
||||
Assert.assertEquals(topicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID),TestConstant.BROKER_ADDRESS);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryTopicConsumerInfo() throws Exception {
|
||||
// GroupList groupList = null; // todo
|
||||
// for(int i=0;i<20;i++){
|
||||
// sendTestTopicMessage();
|
||||
// }
|
||||
// for (int i = 0; i < 20; i++) {
|
||||
// groupList = topicService.queryTopicConsumerInfo(TEST_CONSOLE_TOPIC);
|
||||
// if (CollectionUtils.isNotEmpty(groupList.getGroupList())) {
|
||||
// break;
|
||||
// }
|
||||
// Thread.sleep(1000);
|
||||
// }
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createOrUpdate() throws Exception {
|
||||
TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
|
||||
topicConfigInfo.setBrokerNameList(Lists.newArrayList(TestConstant.TEST_BROKER_NAME));
|
||||
topicConfigInfo.setTopicName(TEST_CREATE_DELETE_TOPIC);
|
||||
topicService.createOrUpdate(topicConfigInfo);
|
||||
|
||||
TopicList topicList = topicService.fetchAllTopicList(true);
|
||||
|
||||
Assert.assertNotNull(topicList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicList.getTopicList()));
|
||||
Assert.assertTrue(topicList.getTopicList().contains(TEST_CREATE_DELETE_TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void examineTopicConfig() throws Exception {
|
||||
List<TopicConfigInfo> topicConfigInfoList = topicService.examineTopicConfig(TEST_CONSOLE_TOPIC);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicConfigInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void examineTopicConfigList() throws Exception {
|
||||
TopicConfig topicConfig = topicService.examineTopicConfig(TEST_CONSOLE_TOPIC,TestConstant.TEST_BROKER_NAME);
|
||||
Assert.assertNotNull(topicConfig);
|
||||
Assert.assertEquals(topicConfig.getReadQueueNums(),READ_QUEUE_NUM);
|
||||
Assert.assertEquals(topicConfig.getWriteQueueNums(),WRITE_QUEUE_NUM);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void deleteTopic() throws Exception {
|
||||
Assert.assertTrue(topicService.deleteTopic(TEST_CONSOLE_TOPIC));
|
||||
topicService.examineTopicConfig(TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void deleteTopic1() throws Exception {
|
||||
Assert.assertTrue(topicService.deleteTopic(TEST_CONSOLE_TOPIC,TestConstant.TEST_CLUSTER_NAME));
|
||||
topicService.examineTopicConfig(TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteTopicInBroker() throws Exception {
|
||||
Assert.assertTrue(topicService.deleteTopic(TestConstant.TEST_BROKER_NAME,TEST_CONSOLE_TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendTopicMessageRequest() throws Exception {
|
||||
SendTopicMessageRequest sendTopicMessageRequest = new SendTopicMessageRequest();
|
||||
sendTopicMessageRequest.setTopic(TEST_CONSOLE_TOPIC);
|
||||
sendTopicMessageRequest.setMessageBody("sendTopicMessageRequestMessageBody");
|
||||
sendTopicMessageRequest.setKey("sendTopicMessageRequestKey");
|
||||
sendTopicMessageRequest.setTag("sendTopicMessageRequestTag");
|
||||
|
||||
SendResult sendResult= topicService.sendTopicMessageRequest(sendTopicMessageRequest);
|
||||
Assert.assertNotNull(sendResult);
|
||||
Assert.assertTrue(StringUtils.isNoneBlank(sendResult.getMsgId()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendTopicMessageRequestWithMessageTrace() throws Exception {
|
||||
SendTopicMessageRequest sendTopicMessageRequest = new SendTopicMessageRequest();
|
||||
sendTopicMessageRequest.setTopic(TEST_CONSOLE_TOPIC);
|
||||
sendTopicMessageRequest.setMessageBody("sendTopicMessageRequestMessageBody");
|
||||
sendTopicMessageRequest.setKey("sendTopicMessageRequestKey");
|
||||
sendTopicMessageRequest.setTag("sendTopicMessageRequestTag");
|
||||
sendTopicMessageRequest.setTraceEnabled(true);
|
||||
|
||||
SendResult sendResult= topicService.sendTopicMessageRequest(sendTopicMessageRequest);
|
||||
Assert.assertNotNull(sendResult);
|
||||
Assert.assertTrue(StringUtils.isNoneBlank(sendResult.getMsgId()));
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user