pref: optimize the response speed of the query api (#273)

This commit is contained in:
Xu Yichi
2025-03-27 12:22:50 +08:00
committed by GitHub
parent 1aad0cda25
commit de152dd6f3
4 changed files with 256 additions and 19 deletions

View File

@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
import org.apache.rocketmq.remoting.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
@@ -127,11 +128,11 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) { if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
} }
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS()); AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper();
DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS());
List<MessageView> messageViewList = Lists.newArrayList(); List<MessageView> messageViewList = Lists.newArrayList();
try { try {
String subExpression = "*"; String subExpression = "*";
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) { for (MessageQueue mq : mqs) {
long minOffset = consumer.searchOffset(mq, begin); long minOffset = consumer.searchOffset(mq, begin);
@@ -188,8 +189,6 @@ public class MessageServiceImpl implements MessageService {
} catch (Exception e) { } catch (Exception e) {
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
consumer.shutdown();
} }
} }
@@ -263,7 +262,8 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) { if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
} }
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS()); AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper();
DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS());
long total = 0; long total = 0;
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>(); List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
@@ -271,7 +271,6 @@ public class MessageServiceImpl implements MessageService {
List<MessageView> messageViews = new ArrayList<>(); List<MessageView> messageViews = new ArrayList<>();
try { try {
consumer.start();
Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic()); Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
int idx = 0; int idx = 0;
for (MessageQueue messageQueue : messageQueues) { for (MessageQueue messageQueue : messageQueues) {
@@ -394,8 +393,6 @@ public class MessageServiceImpl implements MessageService {
} catch (Exception e) { } catch (Exception e) {
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
consumer.shutdown();
} }
} }
@@ -405,14 +402,14 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) { if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
} }
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS()); AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper();
DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS());
List<MessageView> messageViews = new ArrayList<>(); List<MessageView> messageViews = new ArrayList<>();
long offset = query.getPageNum() * query.getPageSize(); long offset = query.getPageNum() * query.getPageSize();
long total = 0; long total = 0;
try { try {
consumer.start();
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) { for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
long start = queueOffsetInfo.getStart(); long start = queueOffsetInfo.getStart();
long end = queueOffsetInfo.getEnd(); long end = queueOffsetInfo.getEnd();
@@ -462,8 +459,6 @@ public class MessageServiceImpl implements MessageService {
} catch (Exception e) { } catch (Exception e) {
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
consumer.shutdown();
} }
} }

View File

@@ -73,6 +73,10 @@ import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR
@Service @Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService { public class TopicServiceImpl extends AbstractCommonService implements TopicService {
private transient DefaultMQProducer systemTopicProducer;
private final Object producerLock = new Object();
@Autowired @Autowired
private RMQConfigure configure; private RMQConfigure configure;
@@ -297,18 +301,40 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
if (isEnableAcl) { if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
} }
DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
// ensures thread safety
if (systemTopicProducer == null) {
synchronized (producerLock) {
if (systemTopicProducer == null) {
systemTopicProducer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
systemTopicProducer.setInstanceName("SystemTopicProducer-" + System.currentTimeMillis());
systemTopicProducer.setNamesrvAddr(configure.getNamesrvAddr());
try { try {
producer.start(); systemTopicProducer.start();
return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
} catch (Exception e) { } catch (Exception e) {
systemTopicProducer = null;
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
}
}
try {
return systemTopicProducer.getDefaultMQProducerImpl()
.getmQClientFactory()
.getMQClientAPIImpl()
.getSystemTopicList(20000L);
} catch (Exception e) {
// If the call fails, close and clean up the producer, and it will be re-created next time.
synchronized (producerLock) {
if (systemTopicProducer != null) {
systemTopicProducer.shutdown();
systemTopicProducer = null;
}
}
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
producer.shutdown();
} }
} }

View File

@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.support;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class AutoCloseConsumerWrapper {
private final Logger logger = LoggerFactory.getLogger(GlobalRestfulResponseBodyAdvice.class);
private static final AtomicReference<DefaultMQPullConsumer> CONSUMER_REF = new AtomicReference<>();
private final AtomicBoolean isTaskScheduled = new AtomicBoolean(false);
private final AtomicBoolean isClosing = new AtomicBoolean(false);
private static volatile Instant lastUsedTime = Instant.now();
private static final ScheduledExecutorService SCHEDULER =
Executors.newSingleThreadScheduledExecutor();
public AutoCloseConsumerWrapper() {
startIdleCheckTask();
}
public DefaultMQPullConsumer getConsumer(RPCHook rpcHook,Boolean useTLS) {
lastUsedTime = Instant.now();
DefaultMQPullConsumer consumer = CONSUMER_REF.get();
if (consumer == null) {
synchronized (this) {
consumer = CONSUMER_REF.get();
if (consumer == null) {
consumer = createNewConsumer(rpcHook,useTLS);
CONSUMER_REF.set(consumer);
}
try {
consumer.start();
} catch (MQClientException e) {
consumer.shutdown();
CONSUMER_REF.set(null);
throw new RuntimeException("Failed to start consumer", e);
}
}
}
return consumer;
}
protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) {
return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook) {{
setUseTLS(useTLS);
}};
}
private void startIdleCheckTask() {
if (!isTaskScheduled.get()) {
synchronized (this) {
if (!isTaskScheduled.get()) {
SCHEDULER.scheduleWithFixedDelay(() -> {
try {
checkAndCloseIdleConsumer();
} catch (Exception e) {
logger.error("Idle check failed", e);
}
}, 1, 1, TimeUnit.MINUTES);
isTaskScheduled.set(true);
}
}
}
}
public void checkAndCloseIdleConsumer() {
if (shouldClose()) {
synchronized (this) {
if (shouldClose()) {
close();
}
}
}
}
private boolean shouldClose() {
long idleTimeoutMs = 60_000;
return CONSUMER_REF.get() != null &&
Duration.between(lastUsedTime, Instant.now()).toMillis() > idleTimeoutMs;
}
public void close() {
if (isClosing.compareAndSet(false, true)) {
try {
DefaultMQPullConsumer consumer = CONSUMER_REF.getAndSet(null);
if (consumer != null) {
consumer.shutdown();
}
isTaskScheduled.set(false);
} finally {
isClosing.set(false);
}
}
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.util;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import java.lang.reflect.Field;
import static org.mockito.Mockito.mock;
import org.apache.rocketmq.client.exception.MQClientException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class AutoCloseConsumerWrapperTests {
private static class TestableWrapper extends AutoCloseConsumerWrapper {
private DefaultMQPullConsumer mockConsumer = mock(DefaultMQPullConsumer.class);
@Override
protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) {
return mockConsumer;
}
}
@Test
void shouldReuseConsumerInstance() throws Exception {
TestableWrapper wrapper = new TestableWrapper();
DefaultMQPullConsumer first = wrapper.getConsumer(mock(RPCHook.class), true);
assertNotNull(first);
DefaultMQPullConsumer second = wrapper.getConsumer(mock(RPCHook.class), true);
assertSame(first, second);
}
@Test
void shouldHandleStartFailure() throws Exception {
TestableWrapper wrapper = new TestableWrapper();
doThrow(new MQClientException("Simulated error", null))
.when(wrapper.mockConsumer).start();
assertThrows(RuntimeException.class, () ->
wrapper.getConsumer(mock(RPCHook.class), true));
verify(wrapper.mockConsumer).shutdown();
}
@Test
void shouldCloseIdleConsumer() throws Exception {
TestableWrapper wrapper = new TestableWrapper();
wrapper.getConsumer(mock(RPCHook.class), true);
Field lastUsedTime = AutoCloseConsumerWrapper.class.getDeclaredField("lastUsedTime");
lastUsedTime.setAccessible(true);
lastUsedTime.set(wrapper, Instant.now().minusSeconds(70));
wrapper.checkAndCloseIdleConsumer();
verify(wrapper.mockConsumer).shutdown();
}
}