From c440b443a57354b982f07325db232fe25fa848ff Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 11 Mar 2021 18:24:11 +0800 Subject: [PATCH] Support query message by page (#688) --- pom.xml | 7 +- .../console/controller/MessageController.java | 24 +- .../rocketmq/console/model/MessagePage.java | 54 +++ .../console/model/MessagePageTask.java | 55 +++ .../console/model/MessageQueryByPage.java | 107 +++++ .../console/model/QueueOffsetInfo.java | 104 +++++ .../console/model/request/MessageQuery.java | 82 ++++ .../console/service/MessageService.java | 8 + .../service/impl/MessageServiceImpl.java | 374 ++++++++++++++++-- src/main/resources/static/src/message.js | 46 ++- .../resources/static/view/pages/message.html | 9 +- 11 files changed, 831 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/apache/rocketmq/console/model/MessagePage.java create mode 100644 src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java create mode 100644 src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java create mode 100644 src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java create mode 100644 src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java diff --git a/pom.xml b/pom.xml index d454166..5cad525 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ 1.8 1.8 - 16.0.1 + 29.0-jre 2.1 2.6 2.4 @@ -82,6 +82,11 @@ spring-boot-starter-actuator ${spring.boot.version} + + org.springframework.data + spring-data-commons + ${spring.boot.version} + org.springframework.boot spring-boot-starter-test diff --git a/src/main/java/org/apache/rocketmq/console/controller/MessageController.java b/src/main/java/org/apache/rocketmq/console/controller/MessageController.java index dd3cdb8..fa0fe00 100644 --- a/src/main/java/org/apache/rocketmq/console/controller/MessageController.java +++ b/src/main/java/org/apache/rocketmq/console/controller/MessageController.java @@ -16,24 +16,28 @@ */ package org.apache.rocketmq.console.controller; +import com.google.common.collect.Maps; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.apache.rocketmq.console.model.MessagePage; import org.apache.rocketmq.console.model.MessageView; +import org.apache.rocketmq.console.model.request.MessageQuery; import org.apache.rocketmq.console.service.MessageService; import org.apache.rocketmq.console.util.JsonUtil; -import com.google.common.collect.Maps; +import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import javax.annotation.Resource; import java.util.List; import java.util.Map; -import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/message") @@ -52,6 +56,12 @@ public class MessageController { return messageViewMap; } + @PostMapping("/queryMessagePageByTopic.query") + @ResponseBody + public MessagePage queryMessagePageByTopic(@RequestBody MessageQuery query) { + return messageService.queryMessageByPage(query); + } + @RequestMapping(value = "/queryMessageByTopicAndKey.query", method = RequestMethod.GET) @ResponseBody public Object queryMessageByTopicAndKey(@RequestParam String topic, @RequestParam String key) { @@ -61,15 +71,15 @@ public class MessageController { @RequestMapping(value = "/queryMessageByTopic.query", method = RequestMethod.GET) @ResponseBody public Object queryMessageByTopic(@RequestParam String topic, @RequestParam long begin, - @RequestParam long end) { + @RequestParam long end) { return messageService.queryMessageByTopic(topic, begin, end); } @RequestMapping(value = "/consumeMessageDirectly.do", method = RequestMethod.POST) @ResponseBody public Object consumeMessageDirectly(@RequestParam String topic, @RequestParam String consumerGroup, - @RequestParam String msgId, - @RequestParam(required = false) String clientId) { + @RequestParam String msgId, + @RequestParam(required = false) String clientId) { logger.info("msgId={} consumerGroup={} clientId={}", msgId, consumerGroup, clientId); ConsumeMessageDirectlyResult consumeMessageDirectlyResult = messageService.consumeMessageDirectly(topic, msgId, consumerGroup, clientId); logger.info("consumeMessageDirectlyResult={}", JsonUtil.obj2String(consumeMessageDirectlyResult)); diff --git a/src/main/java/org/apache/rocketmq/console/model/MessagePage.java b/src/main/java/org/apache/rocketmq/console/model/MessagePage.java new file mode 100644 index 0000000..2a803d8 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/console/model/MessagePage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + + +import org.springframework.data.domain.Page; + +public class MessagePage { + private Page page; + private String taskId; + + public MessagePage(Page page, String taskId) { + this.page = page; + this.taskId = taskId; + } + + public Page getPage() { + return page; + } + + public void setPage(Page page) { + this.page = page; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + @Override + public String toString() { + return "MessagePage{" + + "page=" + page + + ", taskId='" + taskId + '\'' + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java b/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java new file mode 100644 index 0000000..31cc18a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + +import org.springframework.data.domain.Page; + +import java.util.List; + +public class MessagePageTask { + private Page page; + private List queueOffsetInfos; + + public MessagePageTask(Page page, List queueOffsetInfos) { + this.page = page; + this.queueOffsetInfos = queueOffsetInfos; + } + + public Page getPage() { + return page; + } + + public void setPage(Page page) { + this.page = page; + } + + public List getQueueOffsetInfos() { + return queueOffsetInfos; + } + + public void setQueueOffsetInfos(List queueOffsetInfos) { + this.queueOffsetInfos = queueOffsetInfos; + } + + @Override + public String toString() { + return "MessagePageTask{" + + "page=" + page + + ", queueOffsetInfos=" + queueOffsetInfos + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java b/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java new file mode 100644 index 0000000..b3370c4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + + +import org.springframework.data.domain.PageRequest; + +public class MessageQueryByPage { + public static final int DEFAULT_PAGE = 0; + + public static final int MIN_PAGE_SIZE = 10; + + public static final int MAX_PAGE_SIZE = 100; + + /** + * current page num + */ + private int pageNum; + + private int pageSize; + + private String topic; + private long begin; + private long end; + + public MessageQueryByPage(int pageNum, int pageSize, String topic, long begin, long end) { + this.pageNum = pageNum; + this.pageSize = pageSize; + this.topic = topic; + this.begin = begin; + this.end = end; + } + + public void setPageNum(int pageNum) { + this.pageNum = pageNum; + } + + public void setPageSize(int pageSize) { + this.pageSize = pageSize; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public long getBegin() { + return begin; + } + + public void setBegin(long begin) { + this.begin = begin; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public int getPageNum() { + return pageNum <= 0 ? DEFAULT_PAGE : pageNum - 1; + } + + public int getPageSize() { + if (pageSize <= 1) { + return MIN_PAGE_SIZE; + } else if (pageSize > MAX_PAGE_SIZE) { + return MAX_PAGE_SIZE; + } + return this.pageSize; + } + + public PageRequest page() { + return PageRequest.of(this.getPageNum(), this.getPageSize()); + } + + @Override + public String toString() { + return "MessageQueryByPage{" + + "pageNum=" + pageNum + + ", pageSize=" + pageSize + + ", topic='" + topic + '\'' + + ", begin=" + begin + + ", end=" + end + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java b/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java new file mode 100644 index 0000000..0940712 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + +import org.apache.rocketmq.common.message.MessageQueue; + +public class QueueOffsetInfo { + private Integer idx; + + private Long start; + private Long end; + + private Long startOffset; + private Long endOffset; + private MessageQueue messageQueues; + + public QueueOffsetInfo() { + } + + public QueueOffsetInfo(Integer idx, Long start, Long end, Long startOffset, Long endOffset, MessageQueue messageQueues) { + this.idx = idx; + this.start = start; + this.end = end; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.messageQueues = messageQueues; + } + + public Integer getIdx() { + return idx; + } + + public void setIdx(Integer idx) { + this.idx = idx; + } + + public Long getStart() { + return start; + } + + public void setStart(Long start) { + this.start = start; + } + + public Long getEnd() { + return end; + } + + public void setEnd(Long end) { + this.end = end; + } + + public Long getStartOffset() { + return startOffset; + } + + public void setStartOffset(Long startOffset) { + this.startOffset = startOffset; + } + + public Long getEndOffset() { + return endOffset; + } + + public void setEndOffset(Long endOffset) { + this.endOffset = endOffset; + } + + public MessageQueue getMessageQueues() { + return messageQueues; + } + + public void setMessageQueues(MessageQueue messageQueues) { + this.messageQueues = messageQueues; + } + + public void incStartOffset() { + this.startOffset++; + this.endOffset++; + } + + public void incEndOffset() { + this.endOffset++; + } + + public void incStartOffset(long size) { + this.startOffset += size; + this.endOffset += size; + } +} diff --git a/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java b/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java new file mode 100644 index 0000000..f78fe09 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model.request; + +public class MessageQuery { + /** + * current page num + */ + private int pageNum; + + private int pageSize; + + private String topic; + + private String taskId; + + private long begin; + + private long end; + + public int getPageNum() { + return pageNum; + } + + public void setPageNum(int pageNum) { + this.pageNum = pageNum; + } + + public int getPageSize() { + return pageSize; + } + + public void setPageSize(int pageSize) { + this.pageSize = pageSize; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public long getBegin() { + return begin; + } + + public void setBegin(long begin) { + this.begin = begin; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } +} diff --git a/src/main/java/org/apache/rocketmq/console/service/MessageService.java b/src/main/java/org/apache/rocketmq/console/service/MessageService.java index e56b4d8..af8d6fb 100644 --- a/src/main/java/org/apache/rocketmq/console/service/MessageService.java +++ b/src/main/java/org/apache/rocketmq/console/service/MessageService.java @@ -20,6 +20,8 @@ package org.apache.rocketmq.console.service; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.console.model.MessagePage; +import org.apache.rocketmq.console.model.request.MessageQuery; import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.console.model.MessageView; @@ -48,4 +50,10 @@ public interface MessageService { ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup, String clientId); + + MessagePage queryMessageByPage(MessageQuery query); + + + + } diff --git a/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java index 34d3994..a7961b4 100644 --- a/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java @@ -21,21 +21,19 @@ package org.apache.rocketmq.console.service.impl; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.List; -import java.util.Set; -import javax.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.Connection; @@ -43,7 +41,12 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.console.config.RMQConfigure; import org.apache.rocketmq.console.exception.ServiceException; +import org.apache.rocketmq.console.model.QueueOffsetInfo; import org.apache.rocketmq.console.model.MessageView; +import org.apache.rocketmq.console.model.MessagePage; +import org.apache.rocketmq.console.model.MessagePageTask; +import org.apache.rocketmq.console.model.MessageQueryByPage; +import org.apache.rocketmq.console.model.request.MessageQuery; import org.apache.rocketmq.console.service.MessageService; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.MQAdminExt; @@ -51,13 +54,31 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.util.Collections; +import java.util.List; +import java.util.Comparator; +import java.util.ArrayList; +import java.util.Set; +import java.util.Collection; +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + @Service public class MessageServiceImpl implements MessageService { private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class); + private static final Cache> CACHE = CacheBuilder.newBuilder() + .maximumSize(10000) + .expireAfterWrite(60, TimeUnit.MINUTES) + .build(); + @Autowired private RMQConfigure configure; /** @@ -75,8 +96,7 @@ public class MessageServiceImpl implements MessageService { MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId); List messageTrackList = messageTrackDetail(messageExt); return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList); - } - catch (Exception e) { + } catch (Exception e) { throw new ServiceException(-1, String.format("Failed to query message by Id: %s", msgId)); } } @@ -90,8 +110,7 @@ public class MessageServiceImpl implements MessageService { return MessageView.fromMessageExt(messageExt); } }); - } - catch (Exception err) { + } catch (Exception err) { throw Throwables.propagate(err); } } @@ -101,9 +120,9 @@ public class MessageServiceImpl implements MessageService { boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); RPCHook rpcHook = null; if (isEnableAcl) { - rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey())); + rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); } - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook); + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); List messageViewList = Lists.newArrayList(); try { String subExpression = "*"; @@ -146,8 +165,7 @@ public class MessageServiceImpl implements MessageService { case OFFSET_ILLEGAL: break READQ; } - } - catch (Exception e) { + } catch (Exception e) { break; } } @@ -162,11 +180,9 @@ public class MessageServiceImpl implements MessageService { } }); return messageViewList; - } - catch (Exception e) { + } catch (Exception e) { throw Throwables.propagate(e); - } - finally { + } finally { consumer.shutdown(); } } @@ -175,8 +191,7 @@ public class MessageServiceImpl implements MessageService { public List messageTrackDetail(MessageExt msg) { try { return mqAdminExt.messageTrackDetail(msg); - } - catch (Exception e) { + } catch (Exception e) { logger.error("op=messageTrackDetailError", e); return Collections.emptyList(); } @@ -185,12 +200,11 @@ public class MessageServiceImpl implements MessageService { @Override public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup, - String clientId) { + String clientId) { if (StringUtils.isNotBlank(clientId)) { try { return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); - } - catch (Exception e) { + } catch (Exception e) { throw Throwables.propagate(e); } } @@ -204,12 +218,322 @@ public class MessageServiceImpl implements MessageService { logger.info("clientId={}", connection.getClientId()); return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); } - } - catch (Exception e) { + } catch (Exception e) { throw Throwables.propagate(e); } throw new IllegalStateException("NO CONSUMER"); } + @Override + public MessagePage queryMessageByPage(MessageQuery query) { + MessageQueryByPage queryByPage = new MessageQueryByPage( + query.getPageNum(), + query.getPageSize(), + query.getTopic(), + query.getBegin(), + query.getEnd()); + + List queueOffsetInfos = CACHE.getIfPresent(query.getTaskId()); + + if (queueOffsetInfos == null) { + query.setPageNum(1); + MessagePageTask task = this.queryFirstMessagePage(queryByPage); + String taskId = MessageClientIDSetter.createUniqID(); + CACHE.put(taskId, task.getQueueOffsetInfos()); + + return new MessagePage(task.getPage(), taskId); + } + Page messageViews = queryMessageByTaskPage(queryByPage, queueOffsetInfos); + return new MessagePage(messageViews, query.getTaskId()); + + } + + private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) { + boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); + RPCHook rpcHook = null; + if (isEnableAcl) { + rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); + } + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + + long total = 0; + List queueOffsetInfos = new ArrayList<>(); + + List messageViews = new ArrayList<>(); + + try { + consumer.start(); + Collection messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic()); + int idx = 0; + for (MessageQueue messageQueue : messageQueues) { + Long minOffset = consumer.searchOffset(messageQueue, query.getBegin()); + Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd()) + 1; + queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue)); + } + + // check first offset has message + // filter the begin time + for (QueueOffsetInfo queueOffset : queueOffsetInfos) { + Long start = queueOffset.getStart(); + boolean hasData = false; + boolean hasIllegalOffset = true; + while (hasIllegalOffset) { + PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", start, 32); + if (pullResult.getPullStatus() == PullStatus.FOUND) { + hasData = true; + List msgFoundList = pullResult.getMsgFoundList(); + for (MessageExt messageExt : msgFoundList) { + if (messageExt.getStoreTimestamp() < query.getBegin()) { + start++; + } else { + hasIllegalOffset = false; + break; + } + } + } else { + hasIllegalOffset = false; + } + } + if (!hasData) { + queueOffset.setEnd(queueOffset.getStart()); + } + queueOffset.setStart(start); + queueOffset.setStartOffset(start); + queueOffset.setEndOffset(start); + } + + // filter the end time + for (QueueOffsetInfo queueOffset : queueOffsetInfos) { + if (queueOffset.getStart().equals(queueOffset.getEnd())) { + continue; + } + long end = queueOffset.getEnd(); + long pullOffset = end; + int pullSize = 32; + boolean hasIllegalOffset = true; + while (hasIllegalOffset) { + + if (pullOffset - pullSize > queueOffset.getStart()) { + pullOffset = pullOffset - pullSize; + } else { + pullOffset = queueOffset.getStartOffset(); + pullSize = (int) (end - pullOffset); + } + PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", pullOffset, pullSize); + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List msgFoundList = pullResult.getMsgFoundList(); + for (int i = msgFoundList.size() - 1; i >= 0; i--) { + MessageExt messageExt = msgFoundList.get(i); + if (messageExt.getStoreTimestamp() < query.getBegin()) { + end--; + } else { + hasIllegalOffset = false; + break; + } + } + } else { + hasIllegalOffset = false; + } + if (pullOffset == queueOffset.getStartOffset()) { + break; + } + } + queueOffset.setEnd(end); + total += queueOffset.getEnd() - queueOffset.getStart(); + } + + long pageSize = total > query.getPageSize() ? query.getPageSize() : total; + + + // move startOffset + int next = moveStartOffset(queueOffsetInfos, query); + moveEndOffset(queueOffsetInfos, query, next); + + // find the first page of message + for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) { + Long start = queueOffsetInfo.getStartOffset(); + Long end = queueOffsetInfo.getEndOffset(); + long size = Math.min(end - start, pageSize); + if (size == 0) { + continue; + } + + while (size > 0) { + PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32); + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List poll = pullResult.getMsgFoundList(); + if (poll.size() == 0) { + break; + } + List collect = poll.stream() + .map(MessageView::fromMessageExt).collect(Collectors.toList()); + + for (MessageView view : collect) { + if (size > 0) { + messageViews.add(view); + size--; + } + } + } else { + break; + } + + } + } + PageImpl page = new PageImpl<>(messageViews, query.page(), total); + return new MessagePageTask(page, queueOffsetInfos); + } catch (Exception e) { + throw Throwables.propagate(e); + } finally { + consumer.shutdown(); + } + } + + private Page queryMessageByTaskPage(MessageQueryByPage query, List queueOffsetInfos) { + boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey()); + RPCHook rpcHook = null; + if (isEnableAcl) { + rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); + } + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + List messageViews = new ArrayList<>(); + + long offset = query.getPageNum() * query.getPageSize(); + + long total = 0; + try { + consumer.start(); + for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) { + long start = queueOffsetInfo.getStart(); + long end = queueOffsetInfo.getEnd(); + queueOffsetInfo.setStartOffset(start); + queueOffsetInfo.setEndOffset(start); + total += end - start; + } + if (total <= offset) { + return Page.empty(); + } + long pageSize = total - offset > query.getPageSize() ? query.getPageSize() : total - offset; + + int next = moveStartOffset(queueOffsetInfos, query); + moveEndOffset(queueOffsetInfos, query, next); + + for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) { + Long start = queueOffsetInfo.getStartOffset(); + Long end = queueOffsetInfo.getEndOffset(); + long size = Math.min(end - start, pageSize); + if (size == 0) { + continue; + } + + while (size > 0) { + PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32); + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List poll = pullResult.getMsgFoundList(); + if (poll.size() == 0) { + break; + } + List collect = poll.stream() + .map(MessageView::fromMessageExt).collect(Collectors.toList()); + + for (MessageView view : collect) { + if (size > 0) { + messageViews.add(view); + size--; + } + } + } else { + break; + } + + } + } + return new PageImpl<>(messageViews, query.page(), total); + } catch (Exception e) { + throw Throwables.propagate(e); + } finally { + consumer.shutdown(); + } + } + + private int moveStartOffset(List queueOffsets, MessageQueryByPage query) { + int size = queueOffsets.size(); + int next = 0; + long offset = query.getPageNum() * query.getPageSize(); + if (offset == 0) { + return next; + } + // sort by queueOffset size + List orderQueue = queueOffsets + .stream() + .sorted((o1, o2) -> { + long size1 = o1.getEnd() - o1.getStart(); + long size2 = o2.getEnd() - o2.getStart(); + if (size1 < size2) { + return -1; + } else if (size1 > size2) { + return 1; + } + return 0; + }).collect(Collectors.toList()); + + // Take the smallest one each time + for (int i = 0; i < size && offset >= (size - i); i++) { + long minSize = orderQueue.get(i).getEnd() - orderQueue.get(i).getStartOffset(); + if (minSize == 0) { + continue; + } + long reduce = minSize * (size - i); + if (reduce <= offset) { + offset -= reduce; + for (int j = i; j < size; j++) { + orderQueue.get(j).incStartOffset(minSize); + } + } else { + long addOffset = offset / (size - i); + offset -= addOffset * (size - i); + if (addOffset != 0) { + for (int j = i; j < size; j++) { + orderQueue.get(j).incStartOffset(addOffset); + } + } + } + } + for (QueueOffsetInfo info : orderQueue) { + QueueOffsetInfo queueOffsetInfo = queueOffsets.get(info.getIdx()); + queueOffsetInfo.setStartOffset(info.getStartOffset()); + queueOffsetInfo.setEndOffset(info.getEndOffset()); + } + + for (QueueOffsetInfo info : queueOffsets) { + if (offset == 0) { + break; + } + next = (next + 1) % size; + if (info.getStartOffset() < info.getEnd()) { + info.incStartOffset(); + --offset; + } + } + return next; + } + + private void moveEndOffset(List queueOffsets, MessageQueryByPage query, int next) { + int size = queueOffsets.size(); + for (int j = 0; j < query.getPageSize(); j++) { + QueueOffsetInfo nextQueueOffset = queueOffsets.get(next); + next = (next + 1) % size; + int start = next; + while (nextQueueOffset.getEndOffset() >= nextQueueOffset.getEnd()) { + nextQueueOffset = queueOffsets.get(next); + next = (next + 1) % size; + if (start == next) { + return; + } + } + nextQueueOffset.incEndOffset(); + } + } + } diff --git a/src/main/resources/static/src/message.js b/src/main/resources/static/src/message.js index 0c0aefb..d2de6d9 100644 --- a/src/main/resources/static/src/message.js +++ b/src/main/resources/static/src/message.js @@ -43,6 +43,8 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat $scope.timepickerEnd = moment().add(1,'hour').format('YYYY-MM-DD HH:mm'); $scope.timepickerOptions ={format: 'YYYY-MM-DD HH:mm', showClear: true}; + $scope.taskId = ""; + $scope.paginationConf = { currentPage: 1, totalItems: 0, @@ -51,10 +53,44 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat perPageOptions: [10], rememberPerPage: 'perPageItems', onChange: function () { - $scope.changeShowMessageList(this.currentPage,this.totalItems); + $scope.queryMessagePageByTopic() } }; + $scope.queryMessagePageByTopic = function () { + if ($scope.timepickerEnd < $scope.timepickerBegin) { + Notification.error({message: "endTime is later than beginTime!", delay: 2000}); + return + } + if( $scope.selectedTopic === [] || (typeof $scope.selectedTopic) == "object"){ + return + } + $http({ + method: "POST", + url: "message/queryMessagePageByTopic.query", + data: { + topic: $scope.selectedTopic, + begin: $scope.timepickerBegin.valueOf(), + end: $scope.timepickerEnd.valueOf(), + pageNum: $scope.paginationConf.currentPage, + pageSize: $scope.paginationConf.itemsPerPage, + taskId: $scope.taskId + } + }).success(function (resp) { + if (resp.status === 0) { + console.log(resp); + $scope.messageShowList = resp.data.page.content; + if(resp.data.page.first){ + $scope.paginationConf.currentPage = 1; + } + $scope.paginationConf.currentPage = resp.data.page.number + 1; + $scope.paginationConf.totalItems = resp.data.page.totalElements; + $scope.taskId = resp.data.taskId + }else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + } $scope.queryMessageByTopic = function () { console.log($scope.selectedTopic); @@ -153,7 +189,6 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat }); }; - $scope.changeShowMessageList = function (currentPage,totalItem) { var perPage = $scope.paginationConf.itemsPerPage; var from = (currentPage - 1) * perPage; @@ -161,6 +196,13 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat $scope.messageShowList = $scope.queryMessageByTopicResult.slice(from, to); $scope.paginationConf.totalItems = totalItem ; }; + + $scope.onChangeQueryCondition = function (){ + console.log("change") + $scope.taskId = ""; + $scope.paginationConf.currentPage = 1; + $scope.paginationConf.totalItems = 0; + } }]); module.controller('messageDetailViewDialogController',['$scope', 'ngDialog', '$http','Notification', function ($scope, ngDialog, $http,Notification) { diff --git a/src/main/resources/static/view/pages/message.html b/src/main/resources/static/view/pages/message.html index f8bf3c8..8cac242 100644 --- a/src/main/resources/static/view/pages/message.html +++ b/src/main/resources/static/view/pages/message.html @@ -21,7 +21,7 @@ -
Only Return 2000 Messages
+
Total {{paginationConf.totalItems}} Messages
@@ -32,6 +32,7 @@ @@ -40,7 +41,7 @@
- @@ -49,7 +50,7 @@
- @@ -58,7 +59,7 @@