Support query message by page (#688)

This commit is contained in:
Demogorgon314
2021-03-11 18:24:11 +08:00
committed by GitHub
parent be476bc146
commit c440b443a5
11 changed files with 831 additions and 39 deletions

View File

@@ -58,7 +58,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<guava.version>16.0.1</guava.version>
<guava.version>29.0-jre</guava.version>
<commons-digester.version>2.1</commons-digester.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-io.version>2.4</commons-io.version>
@@ -82,6 +82,11 @@
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -16,24 +16,28 @@
*/
package org.apache.rocketmq.console.controller;
import com.google.common.collect.Maps;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.console.model.MessagePage;
import org.apache.rocketmq.console.model.MessageView;
import org.apache.rocketmq.console.model.request.MessageQuery;
import org.apache.rocketmq.console.service.MessageService;
import org.apache.rocketmq.console.util.JsonUtil;
import com.google.common.collect.Maps;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/message")
@@ -52,6 +56,12 @@ public class MessageController {
return messageViewMap;
}
@PostMapping("/queryMessagePageByTopic.query")
@ResponseBody
public MessagePage queryMessagePageByTopic(@RequestBody MessageQuery query) {
return messageService.queryMessageByPage(query);
}
@RequestMapping(value = "/queryMessageByTopicAndKey.query", method = RequestMethod.GET)
@ResponseBody
public Object queryMessageByTopicAndKey(@RequestParam String topic, @RequestParam String key) {
@@ -61,15 +71,15 @@ public class MessageController {
@RequestMapping(value = "/queryMessageByTopic.query", method = RequestMethod.GET)
@ResponseBody
public Object queryMessageByTopic(@RequestParam String topic, @RequestParam long begin,
@RequestParam long end) {
@RequestParam long end) {
return messageService.queryMessageByTopic(topic, begin, end);
}
@RequestMapping(value = "/consumeMessageDirectly.do", method = RequestMethod.POST)
@ResponseBody
public Object consumeMessageDirectly(@RequestParam String topic, @RequestParam String consumerGroup,
@RequestParam String msgId,
@RequestParam(required = false) String clientId) {
@RequestParam String msgId,
@RequestParam(required = false) String clientId) {
logger.info("msgId={} consumerGroup={} clientId={}", msgId, consumerGroup, clientId);
ConsumeMessageDirectlyResult consumeMessageDirectlyResult = messageService.consumeMessageDirectly(topic, msgId, consumerGroup, clientId);
logger.info("consumeMessageDirectlyResult={}", JsonUtil.obj2String(consumeMessageDirectlyResult));

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model;
import org.springframework.data.domain.Page;
public class MessagePage {
private Page<MessageView> page;
private String taskId;
public MessagePage(Page<MessageView> page, String taskId) {
this.page = page;
this.taskId = taskId;
}
public Page<MessageView> getPage() {
return page;
}
public void setPage(Page<MessageView> page) {
this.page = page;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public String toString() {
return "MessagePage{" +
"page=" + page +
", taskId='" + taskId + '\'' +
'}';
}
}

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model;
import org.springframework.data.domain.Page;
import java.util.List;
public class MessagePageTask {
private Page<MessageView> page;
private List<QueueOffsetInfo> queueOffsetInfos;
public MessagePageTask(Page<MessageView> page, List<QueueOffsetInfo> queueOffsetInfos) {
this.page = page;
this.queueOffsetInfos = queueOffsetInfos;
}
public Page<MessageView> getPage() {
return page;
}
public void setPage(Page<MessageView> page) {
this.page = page;
}
public List<QueueOffsetInfo> getQueueOffsetInfos() {
return queueOffsetInfos;
}
public void setQueueOffsetInfos(List<QueueOffsetInfo> queueOffsetInfos) {
this.queueOffsetInfos = queueOffsetInfos;
}
@Override
public String toString() {
return "MessagePageTask{" +
"page=" + page +
", queueOffsetInfos=" + queueOffsetInfos +
'}';
}
}

View File

@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model;
import org.springframework.data.domain.PageRequest;
public class MessageQueryByPage {
public static final int DEFAULT_PAGE = 0;
public static final int MIN_PAGE_SIZE = 10;
public static final int MAX_PAGE_SIZE = 100;
/**
* current page num
*/
private int pageNum;
private int pageSize;
private String topic;
private long begin;
private long end;
public MessageQueryByPage(int pageNum, int pageSize, String topic, long begin, long end) {
this.pageNum = pageNum;
this.pageSize = pageSize;
this.topic = topic;
this.begin = begin;
this.end = end;
}
public void setPageNum(int pageNum) {
this.pageNum = pageNum;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public long getBegin() {
return begin;
}
public void setBegin(long begin) {
this.begin = begin;
}
public long getEnd() {
return end;
}
public void setEnd(long end) {
this.end = end;
}
public int getPageNum() {
return pageNum <= 0 ? DEFAULT_PAGE : pageNum - 1;
}
public int getPageSize() {
if (pageSize <= 1) {
return MIN_PAGE_SIZE;
} else if (pageSize > MAX_PAGE_SIZE) {
return MAX_PAGE_SIZE;
}
return this.pageSize;
}
public PageRequest page() {
return PageRequest.of(this.getPageNum(), this.getPageSize());
}
@Override
public String toString() {
return "MessageQueryByPage{" +
"pageNum=" + pageNum +
", pageSize=" + pageSize +
", topic='" + topic + '\'' +
", begin=" + begin +
", end=" + end +
'}';
}
}

View File

@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model;
import org.apache.rocketmq.common.message.MessageQueue;
public class QueueOffsetInfo {
private Integer idx;
private Long start;
private Long end;
private Long startOffset;
private Long endOffset;
private MessageQueue messageQueues;
public QueueOffsetInfo() {
}
public QueueOffsetInfo(Integer idx, Long start, Long end, Long startOffset, Long endOffset, MessageQueue messageQueues) {
this.idx = idx;
this.start = start;
this.end = end;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.messageQueues = messageQueues;
}
public Integer getIdx() {
return idx;
}
public void setIdx(Integer idx) {
this.idx = idx;
}
public Long getStart() {
return start;
}
public void setStart(Long start) {
this.start = start;
}
public Long getEnd() {
return end;
}
public void setEnd(Long end) {
this.end = end;
}
public Long getStartOffset() {
return startOffset;
}
public void setStartOffset(Long startOffset) {
this.startOffset = startOffset;
}
public Long getEndOffset() {
return endOffset;
}
public void setEndOffset(Long endOffset) {
this.endOffset = endOffset;
}
public MessageQueue getMessageQueues() {
return messageQueues;
}
public void setMessageQueues(MessageQueue messageQueues) {
this.messageQueues = messageQueues;
}
public void incStartOffset() {
this.startOffset++;
this.endOffset++;
}
public void incEndOffset() {
this.endOffset++;
}
public void incStartOffset(long size) {
this.startOffset += size;
this.endOffset += size;
}
}

View File

@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.model.request;
public class MessageQuery {
/**
* current page num
*/
private int pageNum;
private int pageSize;
private String topic;
private String taskId;
private long begin;
private long end;
public int getPageNum() {
return pageNum;
}
public void setPageNum(int pageNum) {
this.pageNum = pageNum;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public long getBegin() {
return begin;
}
public void setBegin(long begin) {
this.begin = begin;
}
public long getEnd() {
return end;
}
public void setEnd(long end) {
this.end = end;
}
}

View File

@@ -20,6 +20,8 @@ package org.apache.rocketmq.console.service;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.console.model.MessagePage;
import org.apache.rocketmq.console.model.request.MessageQuery;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.console.model.MessageView;
@@ -48,4 +50,10 @@ public interface MessageService {
ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
String clientId);
MessagePage queryMessageByPage(MessageQuery query);
}

View File

@@ -21,21 +21,19 @@ package org.apache.rocketmq.console.service.impl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
@@ -43,7 +41,12 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.exception.ServiceException;
import org.apache.rocketmq.console.model.QueueOffsetInfo;
import org.apache.rocketmq.console.model.MessageView;
import org.apache.rocketmq.console.model.MessagePage;
import org.apache.rocketmq.console.model.MessagePageTask;
import org.apache.rocketmq.console.model.MessageQueryByPage;
import org.apache.rocketmq.console.model.request.MessageQuery;
import org.apache.rocketmq.console.service.MessageService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -51,13 +54,31 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.Set;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
public class MessageServiceImpl implements MessageService {
private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
private static final Cache<String, List<QueueOffsetInfo>> CACHE = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(60, TimeUnit.MINUTES)
.build();
@Autowired
private RMQConfigure configure;
/**
@@ -75,8 +96,7 @@ public class MessageServiceImpl implements MessageService {
MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId);
List<MessageTrack> messageTrackList = messageTrackDetail(messageExt);
return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList);
}
catch (Exception e) {
} catch (Exception e) {
throw new ServiceException(-1, String.format("Failed to query message by Id: %s", msgId));
}
}
@@ -90,8 +110,7 @@ public class MessageServiceImpl implements MessageService {
return MessageView.fromMessageExt(messageExt);
}
});
}
catch (Exception err) {
} catch (Exception err) {
throw Throwables.propagate(err);
}
}
@@ -101,9 +120,9 @@ public class MessageServiceImpl implements MessageService {
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey()));
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook);
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
List<MessageView> messageViewList = Lists.newArrayList();
try {
String subExpression = "*";
@@ -146,8 +165,7 @@ public class MessageServiceImpl implements MessageService {
case OFFSET_ILLEGAL:
break READQ;
}
}
catch (Exception e) {
} catch (Exception e) {
break;
}
}
@@ -162,11 +180,9 @@ public class MessageServiceImpl implements MessageService {
}
});
return messageViewList;
}
catch (Exception e) {
} catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
} finally {
consumer.shutdown();
}
}
@@ -175,8 +191,7 @@ public class MessageServiceImpl implements MessageService {
public List<MessageTrack> messageTrackDetail(MessageExt msg) {
try {
return mqAdminExt.messageTrackDetail(msg);
}
catch (Exception e) {
} catch (Exception e) {
logger.error("op=messageTrackDetailError", e);
return Collections.emptyList();
}
@@ -185,12 +200,11 @@ public class MessageServiceImpl implements MessageService {
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
String clientId) {
String clientId) {
if (StringUtils.isNotBlank(clientId)) {
try {
return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
}
catch (Exception e) {
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
@@ -204,12 +218,322 @@ public class MessageServiceImpl implements MessageService {
logger.info("clientId={}", connection.getClientId());
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
}
}
catch (Exception e) {
} catch (Exception e) {
throw Throwables.propagate(e);
}
throw new IllegalStateException("NO CONSUMER");
}
@Override
public MessagePage queryMessageByPage(MessageQuery query) {
MessageQueryByPage queryByPage = new MessageQueryByPage(
query.getPageNum(),
query.getPageSize(),
query.getTopic(),
query.getBegin(),
query.getEnd());
List<QueueOffsetInfo> queueOffsetInfos = CACHE.getIfPresent(query.getTaskId());
if (queueOffsetInfos == null) {
query.setPageNum(1);
MessagePageTask task = this.queryFirstMessagePage(queryByPage);
String taskId = MessageClientIDSetter.createUniqID();
CACHE.put(taskId, task.getQueueOffsetInfos());
return new MessagePage(task.getPage(), taskId);
}
Page<MessageView> messageViews = queryMessageByTaskPage(queryByPage, queueOffsetInfos);
return new MessagePage(messageViews, query.getTaskId());
}
private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
long total = 0;
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
List<MessageView> messageViews = new ArrayList<>();
try {
consumer.start();
Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
int idx = 0;
for (MessageQueue messageQueue : messageQueues) {
Long minOffset = consumer.searchOffset(messageQueue, query.getBegin());
Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd()) + 1;
queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
}
// check first offset has message
// filter the begin time
for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
Long start = queueOffset.getStart();
boolean hasData = false;
boolean hasIllegalOffset = true;
while (hasIllegalOffset) {
PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", start, 32);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
hasData = true;
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
for (MessageExt messageExt : msgFoundList) {
if (messageExt.getStoreTimestamp() < query.getBegin()) {
start++;
} else {
hasIllegalOffset = false;
break;
}
}
} else {
hasIllegalOffset = false;
}
}
if (!hasData) {
queueOffset.setEnd(queueOffset.getStart());
}
queueOffset.setStart(start);
queueOffset.setStartOffset(start);
queueOffset.setEndOffset(start);
}
// filter the end time
for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
if (queueOffset.getStart().equals(queueOffset.getEnd())) {
continue;
}
long end = queueOffset.getEnd();
long pullOffset = end;
int pullSize = 32;
boolean hasIllegalOffset = true;
while (hasIllegalOffset) {
if (pullOffset - pullSize > queueOffset.getStart()) {
pullOffset = pullOffset - pullSize;
} else {
pullOffset = queueOffset.getStartOffset();
pullSize = (int) (end - pullOffset);
}
PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", pullOffset, pullSize);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
for (int i = msgFoundList.size() - 1; i >= 0; i--) {
MessageExt messageExt = msgFoundList.get(i);
if (messageExt.getStoreTimestamp() < query.getBegin()) {
end--;
} else {
hasIllegalOffset = false;
break;
}
}
} else {
hasIllegalOffset = false;
}
if (pullOffset == queueOffset.getStartOffset()) {
break;
}
}
queueOffset.setEnd(end);
total += queueOffset.getEnd() - queueOffset.getStart();
}
long pageSize = total > query.getPageSize() ? query.getPageSize() : total;
// move startOffset
int next = moveStartOffset(queueOffsetInfos, query);
moveEndOffset(queueOffsetInfos, query, next);
// find the first page of message
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
Long start = queueOffsetInfo.getStartOffset();
Long end = queueOffsetInfo.getEndOffset();
long size = Math.min(end - start, pageSize);
if (size == 0) {
continue;
}
while (size > 0) {
PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> poll = pullResult.getMsgFoundList();
if (poll.size() == 0) {
break;
}
List<MessageView> collect = poll.stream()
.map(MessageView::fromMessageExt).collect(Collectors.toList());
for (MessageView view : collect) {
if (size > 0) {
messageViews.add(view);
size--;
}
}
} else {
break;
}
}
}
PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
return new MessagePageTask(page, queueOffsetInfos);
} catch (Exception e) {
throw Throwables.propagate(e);
} finally {
consumer.shutdown();
}
}
private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
List<MessageView> messageViews = new ArrayList<>();
long offset = query.getPageNum() * query.getPageSize();
long total = 0;
try {
consumer.start();
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
long start = queueOffsetInfo.getStart();
long end = queueOffsetInfo.getEnd();
queueOffsetInfo.setStartOffset(start);
queueOffsetInfo.setEndOffset(start);
total += end - start;
}
if (total <= offset) {
return Page.empty();
}
long pageSize = total - offset > query.getPageSize() ? query.getPageSize() : total - offset;
int next = moveStartOffset(queueOffsetInfos, query);
moveEndOffset(queueOffsetInfos, query, next);
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
Long start = queueOffsetInfo.getStartOffset();
Long end = queueOffsetInfo.getEndOffset();
long size = Math.min(end - start, pageSize);
if (size == 0) {
continue;
}
while (size > 0) {
PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> poll = pullResult.getMsgFoundList();
if (poll.size() == 0) {
break;
}
List<MessageView> collect = poll.stream()
.map(MessageView::fromMessageExt).collect(Collectors.toList());
for (MessageView view : collect) {
if (size > 0) {
messageViews.add(view);
size--;
}
}
} else {
break;
}
}
}
return new PageImpl<>(messageViews, query.page(), total);
} catch (Exception e) {
throw Throwables.propagate(e);
} finally {
consumer.shutdown();
}
}
private int moveStartOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query) {
int size = queueOffsets.size();
int next = 0;
long offset = query.getPageNum() * query.getPageSize();
if (offset == 0) {
return next;
}
// sort by queueOffset size
List<QueueOffsetInfo> orderQueue = queueOffsets
.stream()
.sorted((o1, o2) -> {
long size1 = o1.getEnd() - o1.getStart();
long size2 = o2.getEnd() - o2.getStart();
if (size1 < size2) {
return -1;
} else if (size1 > size2) {
return 1;
}
return 0;
}).collect(Collectors.toList());
// Take the smallest one each time
for (int i = 0; i < size && offset >= (size - i); i++) {
long minSize = orderQueue.get(i).getEnd() - orderQueue.get(i).getStartOffset();
if (minSize == 0) {
continue;
}
long reduce = minSize * (size - i);
if (reduce <= offset) {
offset -= reduce;
for (int j = i; j < size; j++) {
orderQueue.get(j).incStartOffset(minSize);
}
} else {
long addOffset = offset / (size - i);
offset -= addOffset * (size - i);
if (addOffset != 0) {
for (int j = i; j < size; j++) {
orderQueue.get(j).incStartOffset(addOffset);
}
}
}
}
for (QueueOffsetInfo info : orderQueue) {
QueueOffsetInfo queueOffsetInfo = queueOffsets.get(info.getIdx());
queueOffsetInfo.setStartOffset(info.getStartOffset());
queueOffsetInfo.setEndOffset(info.getEndOffset());
}
for (QueueOffsetInfo info : queueOffsets) {
if (offset == 0) {
break;
}
next = (next + 1) % size;
if (info.getStartOffset() < info.getEnd()) {
info.incStartOffset();
--offset;
}
}
return next;
}
private void moveEndOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query, int next) {
int size = queueOffsets.size();
for (int j = 0; j < query.getPageSize(); j++) {
QueueOffsetInfo nextQueueOffset = queueOffsets.get(next);
next = (next + 1) % size;
int start = next;
while (nextQueueOffset.getEndOffset() >= nextQueueOffset.getEnd()) {
nextQueueOffset = queueOffsets.get(next);
next = (next + 1) % size;
if (start == next) {
return;
}
}
nextQueueOffset.incEndOffset();
}
}
}

View File

@@ -43,6 +43,8 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat
$scope.timepickerEnd = moment().add(1,'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerOptions ={format: 'YYYY-MM-DD HH:mm', showClear: true};
$scope.taskId = "";
$scope.paginationConf = {
currentPage: 1,
totalItems: 0,
@@ -51,10 +53,44 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat
perPageOptions: [10],
rememberPerPage: 'perPageItems',
onChange: function () {
$scope.changeShowMessageList(this.currentPage,this.totalItems);
$scope.queryMessagePageByTopic()
}
};
$scope.queryMessagePageByTopic = function () {
if ($scope.timepickerEnd < $scope.timepickerBegin) {
Notification.error({message: "endTime is later than beginTime!", delay: 2000});
return
}
if( $scope.selectedTopic === [] || (typeof $scope.selectedTopic) == "object"){
return
}
$http({
method: "POST",
url: "message/queryMessagePageByTopic.query",
data: {
topic: $scope.selectedTopic,
begin: $scope.timepickerBegin.valueOf(),
end: $scope.timepickerEnd.valueOf(),
pageNum: $scope.paginationConf.currentPage,
pageSize: $scope.paginationConf.itemsPerPage,
taskId: $scope.taskId
}
}).success(function (resp) {
if (resp.status === 0) {
console.log(resp);
$scope.messageShowList = resp.data.page.content;
if(resp.data.page.first){
$scope.paginationConf.currentPage = 1;
}
$scope.paginationConf.currentPage = resp.data.page.number + 1;
$scope.paginationConf.totalItems = resp.data.page.totalElements;
$scope.taskId = resp.data.taskId
}else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});
}
$scope.queryMessageByTopic = function () {
console.log($scope.selectedTopic);
@@ -153,7 +189,6 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat
});
};
$scope.changeShowMessageList = function (currentPage,totalItem) {
var perPage = $scope.paginationConf.itemsPerPage;
var from = (currentPage - 1) * perPage;
@@ -161,6 +196,13 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http','Notificat
$scope.messageShowList = $scope.queryMessageByTopicResult.slice(from, to);
$scope.paginationConf.totalItems = totalItem ;
};
$scope.onChangeQueryCondition = function (){
console.log("change")
$scope.taskId = "";
$scope.paginationConf.currentPage = 1;
$scope.paginationConf.totalItems = 0;
}
}]);
module.controller('messageDetailViewDialogController',['$scope', 'ngDialog', '$http','Notification', function ($scope, ngDialog, $http,Notification) {

View File

@@ -21,7 +21,7 @@
<md-tabs md-dynamic-height="" md-border-bottom="">
<md-tab label="Topic">
<md-content class="md-padding" style="min-height:600px">
<h5 class="md-display-5">Only Return 2000 Messages</h5>
<h5 class="md-display-5">Total {{paginationConf.totalItems}} Messages</h5>
<div class="row">
<form class="form-inline pull-left col-sm-12">
<div class="form-group">
@@ -32,6 +32,7 @@
<select name="mySelectTopic" chosen
ng-model="selectedTopic"
ng-options="item for item in allTopicList"
ng-change="onChangeQueryCondition()"
required>
<option value=""></option>
</select>
@@ -40,7 +41,7 @@
<div class="form-group ">
<label>{{'BEGIN' | translate}}:</label>
<div class="input-group">
<input class="form-control" datetimepicker ng-model="timepickerBegin"
<input class="form-control" datetimepicker ng-change="onChangeQueryCondition()" ng-model="timepickerBegin"
options="timepickerOptions"/>
<span class="input-group-addon"><span
class="glyphicon glyphicon-calendar"></span></span>
@@ -49,7 +50,7 @@
<div class="form-group">
<label>{{'END' | translate}}:</label>
<div class="input-group">
<input class="form-control" datetimepicker ng-model="timepickerEnd"
<input class="form-control" datetimepicker ng-change="onChangeQueryCondition()" ng-model="timepickerEnd"
options="timepickerOptions"/>
<span class="input-group-addon"><span
class="glyphicon glyphicon-calendar"></span></span>
@@ -58,7 +59,7 @@
<button id="searchAppsButton" type="button"
class="btn btn-raised btn-sm btn-primary"
data-toggle="modal"
ng-click="queryMessageByTopic()">
ng-click="queryMessagePageByTopic()">
<span class="glyphicon glyphicon-search"></span>{{ 'SEARCH' | translate}}
</button>
</form>