mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-02-20 03:55:42 +08:00
Remove the 'ng' suffix of sub projects.
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.controller;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
@DirtiesContext
|
||||
public class ClusterControllerTests {
|
||||
@Test
|
||||
public void testList() throws Exception {
|
||||
//todo add. java.lang.Exception: No runnable methods
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.controller;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
@DirtiesContext
|
||||
public class OpsControllerTest {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void homePage() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateNameSvrAddr() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clusterStatus() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
|
||||
import org.apache.rocketmq.console.service.ClusterService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.apache.rocketmq.console.testbase.TestConstant.BROKER_ADDRESS;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class ClusterServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void list() throws Exception {
|
||||
Map<String, Object> clusterMap = clusterService.list();
|
||||
ClusterInfo clusterInfo = (ClusterInfo)clusterMap.get("clusterInfo");
|
||||
Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* brokerDetail */>> brokerServerMap = (Map<String, Map<Long, Object>>)clusterMap.get("brokerServer");
|
||||
Assert.assertNotNull(clusterInfo);
|
||||
Assert.assertNotNull(brokerServerMap);
|
||||
Assert.assertNotNull(clusterInfo.getBrokerAddrTable().get(TestConstant.TEST_BROKER_NAME));
|
||||
Assert.assertNotNull(brokerServerMap.get(TestConstant.TEST_BROKER_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getBrokerConfig() throws Exception {
|
||||
Properties properties = clusterService.getBrokerConfig(BROKER_ADDRESS);
|
||||
Assert.assertNotNull(properties);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,182 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.rocketmq.common.protocol.body.Connection;
|
||||
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
|
||||
import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
|
||||
import org.apache.rocketmq.console.model.GroupConsumeInfo;
|
||||
import org.apache.rocketmq.console.model.TopicConsumerInfo;
|
||||
import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
|
||||
import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
|
||||
import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class ConsumerServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
sendTestTopicMessage();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryGroupList() throws Exception {
|
||||
List<GroupConsumeInfo> consumeInfoList = consumerService.queryGroupList();
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(consumeInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryGroup() throws Exception {
|
||||
GroupConsumeInfo consumeInfo = consumerService.queryGroup(TEST_CONSUMER_GROUP);
|
||||
// if (consumeInfo.getCount() < 1) {
|
||||
// Thread.sleep(2000);
|
||||
// continue;
|
||||
// }
|
||||
Assert.assertNotNull(consumeInfo);
|
||||
Assert.assertEquals(consumeInfo.getGroup(), TEST_CONSUMER_GROUP);
|
||||
// Assert.assertTrue(consumeInfo.getCount() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumeStatsListByGroupName() throws Exception {
|
||||
// todo can't use if don't consume a message
|
||||
List<TopicConsumerInfo> topicConsumerInfoList = new RetryTempLate<List<TopicConsumerInfo>>() {
|
||||
@Override protected List<TopicConsumerInfo> process() throws Exception {
|
||||
return consumerService.queryConsumeStatsListByGroupName(TEST_CONSUMER_GROUP);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(topicConsumerInfoList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicConsumerInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumeStatsList() throws Exception {
|
||||
List<TopicConsumerInfo> topicConsumerInfoList = new RetryTempLate<List<TopicConsumerInfo>>() {
|
||||
@Override protected List<TopicConsumerInfo> process() throws Exception {
|
||||
return consumerService.queryConsumeStatsList(TEST_CONSOLE_TOPIC, TEST_CONSUMER_GROUP);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(topicConsumerInfoList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicConsumerInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumeStatsListByTopicName() throws Exception {
|
||||
Map<String, TopicConsumerInfo> consumeTopicMap = new RetryTempLate<Map<String, TopicConsumerInfo>>() {
|
||||
@Override protected Map<String, TopicConsumerInfo> process() throws Exception {
|
||||
return consumerService.queryConsumeStatsListByTopicName(TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(consumeTopicMap);
|
||||
Assert.assertNotNull(consumeTopicMap.get(TEST_CONSUMER_GROUP));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resetOffset() throws Exception {
|
||||
ResetOffsetRequest resetOffsetRequest = new ResetOffsetRequest();
|
||||
resetOffsetRequest.setConsumerGroupList(Lists.<String>newArrayList(TEST_CONSUMER_GROUP));
|
||||
resetOffsetRequest.setForce(true);
|
||||
resetOffsetRequest.setTopic(TEST_CONSOLE_TOPIC);
|
||||
resetOffsetRequest.setResetTime(System.currentTimeMillis() - 1000);
|
||||
Map<String /*consumerGroup*/, ConsumerGroupRollBackStat> consumerGroupRollBackStatMap = consumerService.resetOffset(resetOffsetRequest);
|
||||
Assert.assertNotNull(consumerGroupRollBackStatMap);
|
||||
Assert.assertNotNull(consumerGroupRollBackStatMap.get(TEST_CONSUMER_GROUP));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void examineSubscriptionGroupConfig() throws Exception {
|
||||
List<ConsumerConfigInfo> configInfoList = consumerService.examineSubscriptionGroupConfig(TEST_CONSUMER_GROUP);
|
||||
Assert.assertTrue(configInfoList.size() == 1);
|
||||
Assert.assertTrue(configInfoList.get(0).getSubscriptionGroupConfig().getGroupName().equals(TEST_CONSUMER_GROUP));
|
||||
Assert.assertTrue(configInfoList.get(0).getSubscriptionGroupConfig().getRetryQueueNums() == RETRY_QUEUE_NUMS);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteSubGroup() throws Exception {
|
||||
createAndUpdateSubscriptionGroupConfig();
|
||||
DeleteSubGroupRequest deleteSubGroupRequest = new DeleteSubGroupRequest();
|
||||
deleteSubGroupRequest.setBrokerNameList(Lists.<String>newArrayList(TestConstant.TEST_BROKER_NAME));
|
||||
deleteSubGroupRequest.setGroupName(TEST_CREATE_DELETE_CONSUMER_GROUP);
|
||||
Assert.assertTrue(consumerService.deleteSubGroup(deleteSubGroupRequest));
|
||||
List<ConsumerConfigInfo> groupConsumeInfoList = consumerService.examineSubscriptionGroupConfig(TEST_CREATE_DELETE_CONSUMER_GROUP);
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(groupConsumeInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createAndUpdateSubscriptionGroupConfig() throws Exception {
|
||||
ConsumerConfigInfo consumerConfigInfoForCreate = new ConsumerConfigInfo();
|
||||
BeanUtils.copyProperties(consumerConfigInfo, consumerConfigInfoForCreate);
|
||||
consumerConfigInfoForCreate.getSubscriptionGroupConfig().setGroupName(TEST_CREATE_DELETE_CONSUMER_GROUP);
|
||||
Assert.assertTrue(consumerService.createAndUpdateSubscriptionGroupConfig(consumerConfigInfoForCreate));
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(consumerService.examineSubscriptionGroupConfig(TEST_CREATE_DELETE_CONSUMER_GROUP)));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchBrokerNameSetBySubscriptionGroup() throws Exception {
|
||||
Set<String> xx = consumerService.fetchBrokerNameSetBySubscriptionGroup(TEST_CONSUMER_GROUP);
|
||||
Assert.assertTrue(xx.contains(TestConstant.TEST_BROKER_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConsumerConnection() throws Exception {
|
||||
ConsumerConnection consumerConnection = new RetryTempLate<ConsumerConnection>() {
|
||||
@Override protected ConsumerConnection process() throws Exception {
|
||||
return consumerService.getConsumerConnection(TEST_CONSUMER_GROUP);
|
||||
}
|
||||
}.execute(10, 2000);
|
||||
Assert.assertNotNull(consumerConnection);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(consumerConnection.getConnectionSet()));
|
||||
Assert.assertTrue(Lists.transform(Lists.newArrayList(consumerConnection.getConnectionSet()), new Function<Connection, String>() {
|
||||
@Override
|
||||
public String apply(Connection input) {
|
||||
return input.getClientAddr().split(":")[0];
|
||||
}
|
||||
}).contains(TestConstant.BROKER_IP));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.console.service.DashboardCollectService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class DashboardCollectServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Resource
|
||||
private DashboardCollectService dashboardCollectService;
|
||||
|
||||
@Test
|
||||
public void getBrokerMap() throws Exception {
|
||||
Assert.assertNotNull(dashboardCollectService.getBrokerMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTopicMap() throws Exception {
|
||||
Assert.assertNotNull(dashboardCollectService.getTopicMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jsonDataFile2map() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getBrokerCache() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTopicCache() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class DashboardServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Test
|
||||
public void queryBrokerData() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryTopicData() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryTopicData1() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryTopicCurrentData() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.rocketmq.common.Pair;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.console.model.MessageView;
|
||||
import org.apache.rocketmq.console.service.MessageService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
||||
import org.apache.rocketmq.tools.admin.api.MessageTrack;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class MessageServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Resource
|
||||
private MessageService messageService;
|
||||
@Resource
|
||||
private MQAdminExt mqAdminExt;
|
||||
private String msgId;
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
msgId = sendTestTopicMessage().getMsgId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
@Test
|
||||
public void viewMessage() throws Exception {
|
||||
final String messageId = msgId;
|
||||
Pair<MessageView, List<MessageTrack>> messageViewListPair=new RetryTempLate<Pair<MessageView, List<MessageTrack>>>() {
|
||||
@Override protected Pair<MessageView, List<MessageTrack>> process() throws Exception {
|
||||
return messageService.viewMessage(TEST_CONSOLE_TOPIC,messageId);
|
||||
}
|
||||
}.execute(10,1000);
|
||||
|
||||
MessageView messageView = messageViewListPair.getObject1();
|
||||
Assert.assertEquals(messageView.getMessageBody(),TEST_TOPIC_MESSAGE_BODY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryMessageByTopicAndKey() throws Exception {
|
||||
final String messageId = msgId;
|
||||
List<MessageView> messageViewList=new RetryTempLate<List<MessageView>>() {
|
||||
@Override protected List<MessageView> process() throws Exception {
|
||||
return messageService.queryMessageByTopicAndKey(TEST_CONSOLE_TOPIC,TEST_TOPIC_MESSAGE_KEY);
|
||||
}
|
||||
}.execute(10,1000);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(messageViewList));
|
||||
Assert.assertTrue(Lists.transform(messageViewList, new Function<MessageView, String>() {
|
||||
@Override public String apply(MessageView input) {
|
||||
return input.getMsgId();
|
||||
}
|
||||
}).contains(messageId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryMessageByTopic() throws Exception {
|
||||
final String messageId = msgId;
|
||||
List<MessageView> messageViewList = null;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
messageViewList = messageService.queryMessageByTopic(TEST_CONSOLE_TOPIC, System.currentTimeMillis() - 100000, System.currentTimeMillis());
|
||||
if (CollectionUtils.isEmpty(messageViewList)) {
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(messageViewList));
|
||||
Assert.assertTrue(Lists.transform(messageViewList, new Function<MessageView, String>() {
|
||||
@Override public String apply(MessageView input) {
|
||||
return input.getMsgId();
|
||||
}
|
||||
}).contains(messageId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void messageTrackDetail() throws Exception {
|
||||
final String messageId = msgId;
|
||||
Pair<MessageView, List<MessageTrack>> messageViewListPair=new RetryTempLate<Pair<MessageView, List<MessageTrack>>>() {
|
||||
@Override protected Pair<MessageView, List<MessageTrack>> process() throws Exception {
|
||||
return messageService.viewMessage(TEST_CONSOLE_TOPIC,messageId);
|
||||
}
|
||||
}.execute(10,1000); // make the topic can be found
|
||||
final MessageExt messageExt = mqAdminExt.viewMessage(TEST_CONSOLE_TOPIC,messageId);
|
||||
Assert.assertNotNull(messageService.messageTrackDetail(messageExt));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumeMessageDirectly() throws Exception {
|
||||
// final String messageId = msgId;
|
||||
// ConsumeMessageDirectlyResult messageDirectlyResult=new RetryTempLate<ConsumeMessageDirectlyResult>() {
|
||||
// @Override protected ConsumeMessageDirectlyResult process() throws Exception {
|
||||
// return messageService.consumeMessageDirectly(TEST_CONSOLE_TOPIC,messageId,TEST_CONSUMER_GROUP,null);
|
||||
// }
|
||||
// }.execute(20,1000); // todo test
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.console.service.MonitorService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class MonitorServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private MonitorService monitorService;
|
||||
@Test
|
||||
public void createOrUpdateConsumerMonitor() throws Exception {
|
||||
// monitorService.createOrUpdateConsumerMonitor()
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumerMonitorConfig() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryConsumerMonitorConfigByGroupName() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteConsumerMonitor() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.console.service.OpsService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class OpsServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private OpsService opsService;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void homePageInfo() throws Exception {
|
||||
List<String> namesvrAddrList= (List<String>)opsService.homePageInfo().get("namesvrAddrList");
|
||||
Assert.assertTrue(namesvrAddrList.contains(TestConstant.NAME_SERVER_ADDRESS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateNameSvrAddrList() throws Exception {
|
||||
String testChangeNameSvrAddr = "110.110.100.110:1234";
|
||||
opsService.updateNameSvrAddrList(testChangeNameSvrAddr);
|
||||
Assert.assertEquals(opsService.getNameSvrList(),testChangeNameSvrAddr);
|
||||
|
||||
opsService.updateNameSvrAddrList(TestConstant.NAME_SERVER_ADDRESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getNameSvrList() throws Exception {
|
||||
Assert.assertEquals(opsService.getNameSvrList(),TestConstant.NAME_SERVER_ADDRESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rocketMqStatusCheck() throws Exception {
|
||||
// need enhance in milestone 2
|
||||
opsService.rocketMqStatusCheck();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.common.protocol.body.Connection;
|
||||
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
|
||||
import org.apache.rocketmq.console.service.ProducerService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class ProducerServiceImplTest extends RocketMQConsoleTestBase {
|
||||
@Resource
|
||||
private ProducerService producerService;
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
sendTestTopicMessage().getMsgId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getProducerConnection() throws Exception {
|
||||
ProducerConnection producerConnection=new RetryTempLate<ProducerConnection>() {
|
||||
@Override protected ProducerConnection process() throws Exception {
|
||||
return producerService.getProducerConnection(TEST_PRODUCER_GROUP,TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
}.execute(10,1000);
|
||||
Assert.assertNotNull(producerConnection);
|
||||
Assert.assertTrue(Lists.transform(Lists.newArrayList(producerConnection.getConnectionSet()), new Function<Connection, String>() {
|
||||
@Override public String apply(Connection input) {
|
||||
return input.getClientAddr().split(":")[0];
|
||||
}
|
||||
}).contains(TestConstant.LOCAL_HOST));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,161 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.service.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.TopicConfig;
|
||||
import org.apache.rocketmq.common.admin.TopicStatsTable;
|
||||
import org.apache.rocketmq.common.protocol.body.TopicList;
|
||||
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
|
||||
import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
|
||||
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
|
||||
import org.apache.rocketmq.console.service.TopicService;
|
||||
import org.apache.rocketmq.console.testbase.RocketMQConsoleTestBase;
|
||||
import org.apache.rocketmq.console.testbase.TestConstant;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
public class TopicServiceImplTest extends RocketMQConsoleTestBase {
|
||||
|
||||
@Resource
|
||||
private TopicService topicService;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initMQClientEnv();
|
||||
registerTestMQTopic();
|
||||
sendTestTopicMessage().getMsgId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyMQClientEnv();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchAllTopicList() throws Exception {
|
||||
TopicList topicList = topicService.fetchAllTopicList();
|
||||
Assert.assertNotNull(topicList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicList.getTopicList()));
|
||||
Assert.assertTrue(topicList.getTopicList().contains(TEST_CONSOLE_TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stats() throws Exception {
|
||||
TopicStatsTable topicStatsTable = topicService.stats(TEST_CONSOLE_TOPIC);
|
||||
Assert.assertNotNull(topicStatsTable );
|
||||
Assert.assertEquals(topicStatsTable.getOffsetTable().size(),READ_QUEUE_NUM);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void route() throws Exception {
|
||||
TopicRouteData topicRouteData = topicService.route(TEST_CONSOLE_TOPIC);
|
||||
Assert.assertNotNull(topicRouteData);
|
||||
Assert.assertEquals(topicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID),TestConstant.BROKER_ADDRESS);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryTopicConsumerInfo() throws Exception {
|
||||
// GroupList groupList = null; // todo
|
||||
// for(int i=0;i<20;i++){
|
||||
// sendTestTopicMessage();
|
||||
// }
|
||||
// for (int i = 0; i < 20; i++) {
|
||||
// groupList = topicService.queryTopicConsumerInfo(TEST_CONSOLE_TOPIC);
|
||||
// if (CollectionUtils.isNotEmpty(groupList.getGroupList())) {
|
||||
// break;
|
||||
// }
|
||||
// Thread.sleep(1000);
|
||||
// }
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createOrUpdate() throws Exception {
|
||||
TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
|
||||
topicConfigInfo.setBrokerNameList(Lists.newArrayList(TestConstant.TEST_BROKER_NAME));
|
||||
topicConfigInfo.setTopicName(TEST_CREATE_DELETE_TOPIC);
|
||||
topicService.createOrUpdate(topicConfigInfo);
|
||||
|
||||
TopicList topicList = topicService.fetchAllTopicList();
|
||||
|
||||
Assert.assertNotNull(topicList);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicList.getTopicList()));
|
||||
Assert.assertTrue(topicList.getTopicList().contains(TEST_CREATE_DELETE_TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void examineTopicConfig() throws Exception {
|
||||
List<TopicConfigInfo> topicConfigInfoList = topicService.examineTopicConfig(TEST_CONSOLE_TOPIC);
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(topicConfigInfoList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void examineTopicConfigList() throws Exception {
|
||||
TopicConfig topicConfig = topicService.examineTopicConfig(TEST_CONSOLE_TOPIC,TestConstant.TEST_BROKER_NAME);
|
||||
Assert.assertNotNull(topicConfig);
|
||||
Assert.assertEquals(topicConfig.getReadQueueNums(),READ_QUEUE_NUM);
|
||||
Assert.assertEquals(topicConfig.getWriteQueueNums(),WRITE_QUEUE_NUM);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void deleteTopic() throws Exception {
|
||||
Assert.assertTrue(topicService.deleteTopic(TEST_CONSOLE_TOPIC));
|
||||
topicService.examineTopicConfig(TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void deleteTopic1() throws Exception {
|
||||
Assert.assertTrue(topicService.deleteTopic(TEST_CONSOLE_TOPIC,TestConstant.TEST_CLUSTER_NAME));
|
||||
topicService.examineTopicConfig(TEST_CONSOLE_TOPIC);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteTopicInBroker() throws Exception {
|
||||
Assert.assertTrue(topicService.deleteTopic(TestConstant.TEST_BROKER_NAME,TEST_CONSOLE_TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendTopicMessageRequest() throws Exception {
|
||||
SendTopicMessageRequest sendTopicMessageRequest = new SendTopicMessageRequest();
|
||||
sendTopicMessageRequest.setTopic(TEST_CONSOLE_TOPIC);
|
||||
sendTopicMessageRequest.setMessageBody("sendTopicMessageRequestMessageBody");
|
||||
sendTopicMessageRequest.setKey("sendTopicMessageRequestKey");
|
||||
sendTopicMessageRequest.setTag("sendTopicMessageRequestTag");
|
||||
|
||||
SendResult sendResult= topicService.sendTopicMessageRequest(sendTopicMessageRequest);
|
||||
Assert.assertNotNull(sendResult);
|
||||
Assert.assertTrue(StringUtils.isNoneBlank(sendResult.getMsgId()));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,168 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.console.testbase;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
|
||||
import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
|
||||
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
|
||||
import org.apache.rocketmq.console.service.ConsumerService;
|
||||
import org.apache.rocketmq.console.service.TopicService;
|
||||
import org.apache.rocketmq.console.util.JsonUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
@ComponentScan(basePackageClasses = {TestRocketMQServer.class})
|
||||
public abstract class RocketMQConsoleTestBase {
|
||||
private Logger consoleTestBaseLog = LoggerFactory.getLogger(RocketMQConsoleTestBase.class);
|
||||
protected static final int RETRY_QUEUE_NUMS = 2;
|
||||
protected static final int WRITE_QUEUE_NUM = 16;
|
||||
protected static final int READ_QUEUE_NUM = 16;
|
||||
protected static final int PERM = 6;
|
||||
protected static final String TEST_CONSUMER_GROUP = "CONSOLE_TEST_CONSUMER_GROUP";
|
||||
protected static final String TEST_PRODUCER_GROUP = "CONSOLE_TEST_PRODUCER_GROUP";
|
||||
protected static final String TEST_CREATE_DELETE_CONSUMER_GROUP = "CREATE_DELETE_CONSUMER_GROUP";
|
||||
protected static final String TEST_CREATE_DELETE_TOPIC = "CREATE_DELETE_TOPIC";
|
||||
protected static final String TEST_TOPIC_MESSAGE_BODY = "hello world";
|
||||
protected static final String TEST_TOPIC_MESSAGE_KEY = "TEST_TOPIC_KEY";
|
||||
protected static final String TEST_CONSOLE_TOPIC = "TEST_CONSOLE_TOPIC";
|
||||
@Resource
|
||||
protected ConsumerService consumerService;
|
||||
protected ConsumerConfigInfo consumerConfigInfo = new ConsumerConfigInfo();
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
private DefaultMQProducer producer;
|
||||
|
||||
@Resource
|
||||
protected TopicService topicService;
|
||||
|
||||
public static abstract class RetryTempLate<T> {
|
||||
protected abstract T process() throws Exception;
|
||||
public T execute(int times, long waitTime) throws Exception {
|
||||
Exception exception = null;
|
||||
for (int i = 0; i < times; i++) {
|
||||
try {
|
||||
return process();
|
||||
}
|
||||
catch (Exception ignore) {
|
||||
exception = ignore;
|
||||
if (waitTime > 0) {
|
||||
Thread.sleep(waitTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
throw Throwables.propagate(exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void startTestMQProducer() {
|
||||
producer = new DefaultMQProducer(TEST_PRODUCER_GROUP);
|
||||
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
|
||||
try {
|
||||
producer.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected SendResult sendTestTopicMessage() throws InterruptedException {
|
||||
if (producer == null) {
|
||||
startTestMQProducer();
|
||||
}
|
||||
|
||||
Message message = new Message(TEST_CONSOLE_TOPIC, TEST_TOPIC_MESSAGE_BODY.getBytes());
|
||||
message.setKeys(Lists.<String>newArrayList(TEST_TOPIC_MESSAGE_KEY));
|
||||
for (int i = 0; i < 3; i++) {
|
||||
try {
|
||||
return producer.send(message);
|
||||
}
|
||||
catch (Exception ignore) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("send message error");
|
||||
}
|
||||
|
||||
protected void startTestMQConsumer() {
|
||||
consumer = new DefaultMQPushConsumer(TEST_CONSUMER_GROUP); //test online consumer
|
||||
consumerConfigInfo.setBrokerNameList(Lists.newArrayList(TestConstant.TEST_BROKER_NAME));
|
||||
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
|
||||
subscriptionGroupConfig.setGroupName(TEST_CONSUMER_GROUP);
|
||||
subscriptionGroupConfig.setRetryQueueNums(RETRY_QUEUE_NUMS);
|
||||
consumerConfigInfo.setSubscriptionGroupConfig(subscriptionGroupConfig);
|
||||
consumerService.createAndUpdateSubscriptionGroupConfig(consumerConfigInfo);
|
||||
consumer.setNamesrvAddr(TestConstant.NAME_SERVER_ADDRESS);
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
||||
try {
|
||||
consumer.subscribe(TEST_CONSOLE_TOPIC, "*");
|
||||
|
||||
consumer.registerMessageListener(new MessageListenerConcurrently() {
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeConcurrentlyContext context) {
|
||||
consoleTestBaseLog.info("op=consumeMessage message={}", JsonUtil.obj2String(msgs));
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
});
|
||||
consumer.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void registerTestMQTopic() {
|
||||
TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
|
||||
topicConfigInfo.setBrokerNameList(Lists.newArrayList(TestConstant.TEST_BROKER_NAME));
|
||||
topicConfigInfo.setTopicName(TEST_CONSOLE_TOPIC);
|
||||
topicConfigInfo.setWriteQueueNums(WRITE_QUEUE_NUM);
|
||||
topicConfigInfo.setReadQueueNums(READ_QUEUE_NUM);
|
||||
topicConfigInfo.setPerm(PERM);
|
||||
topicService.createOrUpdate(topicConfigInfo);
|
||||
}
|
||||
|
||||
protected void initMQClientEnv() {
|
||||
startTestMQProducer();
|
||||
startTestMQConsumer();
|
||||
|
||||
}
|
||||
|
||||
protected void destroyMQClientEnv() {
|
||||
if (consumer != null) {
|
||||
consumer.shutdown();
|
||||
}
|
||||
if (producer != null) {
|
||||
producer.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.testbase;
|
||||
|
||||
import static java.io.File.separator;
|
||||
|
||||
public class TestConstant {
|
||||
|
||||
public static final String LOCAL_HOST = "127.0.0.1";
|
||||
public static final String NAME_SERVER_IP = LOCAL_HOST;
|
||||
public static final String BROKER_IP = LOCAL_HOST;
|
||||
public static final int NAME_SERVER_PORT = 19153;
|
||||
public static final int BROKER_PORT = 19055;
|
||||
public static final int BROKER_HA_PORT = 19043;
|
||||
|
||||
public static final String TEST_FILE_ROOT_DIR = System.getProperty("user.home") + separator + "rocketmq-console-test";
|
||||
public static final String TEST_CLUSTER_NAME = "ConsoleTestCluster";
|
||||
public static final String TEST_BROKER_NAME = "ConsoleTestBroker";
|
||||
public static final String BROKER_ADDRESS = BROKER_IP + ":" + BROKER_PORT;
|
||||
public static final String NAME_SERVER_ADDRESS = NAME_SERVER_IP + ":" + NAME_SERVER_PORT;
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.testbase;
|
||||
|
||||
import java.io.File;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import org.apache.rocketmq.broker.BrokerController;
|
||||
import org.apache.rocketmq.common.BrokerConfig;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
|
||||
import org.apache.rocketmq.namesrv.NamesrvController;
|
||||
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
|
||||
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
|
||||
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import static java.io.File.separator;
|
||||
import static org.apache.rocketmq.console.testbase.TestConstant.TEST_BROKER_NAME;
|
||||
import static org.apache.rocketmq.console.testbase.TestConstant.TEST_CLUSTER_NAME;
|
||||
import static org.apache.rocketmq.console.testbase.TestConstant.TEST_FILE_ROOT_DIR;
|
||||
|
||||
@Service
|
||||
public class TestRocketMQServer {
|
||||
public static Logger log = LoggerFactory.getLogger(TestRocketMQServer.class);
|
||||
private final SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss");
|
||||
|
||||
private String serverDir;
|
||||
private volatile boolean started = false;
|
||||
|
||||
//name server
|
||||
private NamesrvConfig namesrvConfig = new NamesrvConfig();
|
||||
private NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
|
||||
private NamesrvController namesrvController;
|
||||
|
||||
//broker
|
||||
private BrokerController brokerController;
|
||||
private BrokerConfig brokerConfig = new BrokerConfig();
|
||||
private NettyServerConfig nettyServerConfig = new NettyServerConfig();
|
||||
private NettyClientConfig nettyClientConfig = new NettyClientConfig();
|
||||
private MessageStoreConfig storeConfig = new MessageStoreConfig();
|
||||
|
||||
public TestRocketMQServer() {
|
||||
this.storeConfig.setDiskMaxUsedSpaceRatio(95);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
createServerDir();
|
||||
|
||||
startNameServer();
|
||||
|
||||
startBroker();
|
||||
|
||||
started = true;
|
||||
|
||||
log.info("Start RocketServer Successfully");
|
||||
}
|
||||
|
||||
private void createServerDir() {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
serverDir = TEST_FILE_ROOT_DIR + separator + sf.format(new Date());
|
||||
final File file = new File(serverDir);
|
||||
if (!file.exists()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
log.error("Has retry 5 times to register base dir,but still failed.");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
private void startNameServer() {
|
||||
namesrvConfig.setKvConfigPath(serverDir + separator + "namesrv" + separator + "kvConfig.json");
|
||||
nameServerNettyServerConfig.setListenPort(TestConstant.NAME_SERVER_PORT);
|
||||
namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
|
||||
try {
|
||||
namesrvController.initialize();
|
||||
log.info("Success to start Name Server:{}", TestConstant.NAME_SERVER_ADDRESS);
|
||||
namesrvController.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Failed to start Name Server", e);
|
||||
System.exit(1);
|
||||
}
|
||||
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, TestConstant.NAME_SERVER_ADDRESS);
|
||||
}
|
||||
|
||||
private void startBroker() {
|
||||
brokerConfig.setBrokerName(TEST_BROKER_NAME);
|
||||
brokerConfig.setBrokerClusterName(TEST_CLUSTER_NAME);
|
||||
brokerConfig.setBrokerIP1(TestConstant.BROKER_IP);
|
||||
brokerConfig.setNamesrvAddr(TestConstant.NAME_SERVER_ADDRESS);
|
||||
storeConfig.setStorePathRootDir(serverDir);
|
||||
storeConfig.setStorePathCommitLog(serverDir + separator + "commitlog");
|
||||
storeConfig.setHaListenPort(TestConstant.BROKER_HA_PORT);
|
||||
nettyServerConfig.setListenPort(TestConstant.BROKER_PORT);
|
||||
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
|
||||
|
||||
try {
|
||||
brokerController.initialize();
|
||||
log.info("Broker Start name:{} address:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
|
||||
brokerController.start();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Failed to start Broker", e);
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void shutdown() {
|
||||
brokerController.shutdown();
|
||||
namesrvController.shutdown();
|
||||
deleteFile(new File(TEST_FILE_ROOT_DIR));
|
||||
}
|
||||
|
||||
private void deleteFile(File file) {
|
||||
if (!file.exists()) {
|
||||
return;
|
||||
}
|
||||
if (file.isFile()) {
|
||||
file.delete();
|
||||
}
|
||||
else if (file.isDirectory()) {
|
||||
File[] files = file.listFiles();
|
||||
for (File file1 : files) {
|
||||
deleteFile(file1);
|
||||
}
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.rocketmq.console.web;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import java.util.Map;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
|
||||
@DirtiesContext
|
||||
public class WebStaticApplicationTests {
|
||||
|
||||
@Autowired
|
||||
private TestRestTemplate restTemplate;
|
||||
|
||||
@Test
|
||||
public void testHome() throws Exception {
|
||||
ResponseEntity<String> entity = this.restTemplate.getForEntity("/", String.class);
|
||||
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
assertThat(entity.getBody()).contains("<body ng-controller");
|
||||
}
|
||||
|
||||
//@Test
|
||||
public void testCss() throws Exception {
|
||||
ResponseEntity<String> entity = this.restTemplate.getForEntity(
|
||||
"/vendor/bootstrap/css/bootstrap.css", String.class);
|
||||
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
assertThat(entity.getBody()).contains("body");
|
||||
assertThat(entity.getHeaders().getContentType())
|
||||
.isEqualTo(MediaType.valueOf("text/css;charset=UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResources() throws Exception {
|
||||
for(Map.Entry<String,String> entry : resourcesMap().entrySet()){
|
||||
resources(entry.getValue(),entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String,String> resourcesMap(){
|
||||
Map<String,String> map = Maps.newHashMap();
|
||||
map.put("text/css;charset=UTF-8","/vendor/bootstrap/css/bootstrap.css");
|
||||
map.put("text/css;charset=UTF-8","/vendor/bootstrap/css/bootstrap-theme.css");
|
||||
map.put("text/css;charset=UTF-8","/vendor/bootstrap-material-design/css/bootstrap-material-design.css");
|
||||
map.put("text/css;charset=UTF-8","/vendor/bootstrap-material-design/css/ripples.css");
|
||||
return map;
|
||||
}
|
||||
|
||||
private void resources(String path,String type){
|
||||
ResponseEntity<String> entity = this.restTemplate.getForEntity(path, String.class);
|
||||
assertThat(entity.getHeaders().getContentType())
|
||||
.isEqualTo(MediaType.valueOf(type));
|
||||
}
|
||||
|
||||
}
|
||||
15
src/test/resources/application.properties
Normal file
15
src/test/resources/application.properties
Normal file
@@ -0,0 +1,15 @@
|
||||
server.contextPath=
|
||||
server.port=8080
|
||||
#spring.application.index=true
|
||||
spring.application.name=rocketmq-console
|
||||
spring.http.encoding.charset=UTF-8
|
||||
spring.http.encoding.enabled=true
|
||||
spring.http.encoding.force=true
|
||||
logging.config=classpath:logback.xml
|
||||
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
|
||||
rocketmq.config.namesrvAddr=
|
||||
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
|
||||
rocketmq.config.isVIPChannel=
|
||||
#rocketmq-console's data path:dashboard/monitor
|
||||
rocketmq.config.dataPath=/tmp/rocketmq-console/data
|
||||
rocketmq.config.enableDashBoardCollect=false
|
||||
13
src/test/resources/logback.xml
Normal file
13
src/test/resources/logback.xml
Normal file
@@ -0,0 +1,13 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder charset="UTF-8">
|
||||
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="WARN">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
|
||||
</configuration>
|
||||
44
src/test/resources/logback_rocketmq_client.xml
Normal file
44
src/test/resources/logback_rocketmq_client.xml
Normal file
@@ -0,0 +1,44 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder charset="UTF-8">
|
||||
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="RocketmqCommon" additivity="false">
|
||||
<level value="OFF"/>
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</logger>
|
||||
|
||||
|
||||
<logger name="RocketmqRemoting" additivity="false">
|
||||
<level value="OFF"/>
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</logger>
|
||||
|
||||
|
||||
<logger name="RocketmqClient" additivity="false">
|
||||
<level value="OFF"/>
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</logger>
|
||||
|
||||
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user