mirror of
https://github.com/apache/rocketmq-dashboard.git
synced 2026-02-21 04:45:41 +08:00
* [ISSUE #30]Supports batch resending and batch exporting dlq messages. * Optimize the code Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
@@ -17,12 +17,16 @@
|
||||
package org.apache.rocketmq.dashboard.controller;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.dashboard.exception.ServiceException;
|
||||
import org.apache.rocketmq.dashboard.model.DlqMessageExcelModel;
|
||||
import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
|
||||
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
|
||||
import org.apache.rocketmq.dashboard.permisssion.Permission;
|
||||
import org.apache.rocketmq.dashboard.service.DlqMessageService;
|
||||
@@ -30,6 +34,7 @@ import org.apache.rocketmq.dashboard.util.ExcelUtil;
|
||||
import org.apache.rocketmq.tools.admin.MQAdminExt;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
@@ -39,6 +44,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
|
||||
@Controller
|
||||
@RequestMapping("/dlqMessage")
|
||||
@Permission
|
||||
@Slf4j
|
||||
public class DlqMessageController {
|
||||
|
||||
@Resource
|
||||
@@ -70,4 +76,33 @@ public class DlqMessageController {
|
||||
throw new ServiceException(-1, String.format("export dlq message failed!"));
|
||||
}
|
||||
}
|
||||
|
||||
@PostMapping(value = "/batchResendDlqMessage.do")
|
||||
@ResponseBody
|
||||
public Object batchResendDlqMessage(@RequestBody List<DlqMessageRequest> dlqMessages) {
|
||||
return dlqMessageService.batchResendDlqMessage(dlqMessages);
|
||||
}
|
||||
|
||||
@PostMapping(value = "/batchExportDlqMessage.do")
|
||||
public void batchExportDlqMessage(HttpServletResponse response, @RequestBody List<DlqMessageRequest> dlqMessages) {
|
||||
List<DlqMessageExcelModel> dlqMessageExcelModelList = new ArrayList<>(dlqMessages.size());
|
||||
for (DlqMessageRequest dlqMessage : dlqMessages) {
|
||||
DlqMessageExcelModel excelModel = new DlqMessageExcelModel();
|
||||
try {
|
||||
String topic = MixAll.DLQ_GROUP_TOPIC_PREFIX + dlqMessage.getConsumerGroup();
|
||||
MessageExt messageExt = mqAdminExt.viewMessage(topic, dlqMessage.getMsgId());
|
||||
excelModel = new DlqMessageExcelModel(messageExt);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to query message by Id:{}", dlqMessage.getMsgId(), e);
|
||||
excelModel.setMsgId(dlqMessage.getMsgId());
|
||||
excelModel.setException(e.getMessage());
|
||||
}
|
||||
dlqMessageExcelModelList.add(excelModel);
|
||||
}
|
||||
try {
|
||||
ExcelUtil.writeExcel(response, dlqMessageExcelModelList, "dlqs", "dlqs", DlqMessageExcelModel.class);
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(-1, String.format("export dlq message failed!"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,9 +25,11 @@ import com.google.common.base.Charsets;
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class DlqMessageExcelModel extends BaseRowModel implements Serializable {
|
||||
|
||||
@ExcelProperty(value = "topic", index = 0)
|
||||
@@ -66,6 +68,10 @@ public class DlqMessageExcelModel extends BaseRowModel implements Serializable {
|
||||
@ColumnWidth(value = 15)
|
||||
private int bodyCRC;
|
||||
|
||||
@ExcelProperty(value = "exception", index = 9)
|
||||
@ColumnWidth(value = 30)
|
||||
private String exception;
|
||||
|
||||
public DlqMessageExcelModel(MessageExt messageExt) {
|
||||
this.topic = messageExt.getTopic();
|
||||
this.msgId = messageExt.getMsgId();
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.dashboard.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DlqMessageRequest {
|
||||
|
||||
private String topicName;
|
||||
|
||||
private String consumerGroup;
|
||||
|
||||
private String msgId;
|
||||
|
||||
private String clientId;
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.rocketmq.dashboard.model;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.rocketmq.common.protocol.body.CMResult;
|
||||
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
|
||||
|
||||
@Data
|
||||
public class DlqMessageResendResult {
|
||||
private CMResult consumeResult;
|
||||
private String remark;
|
||||
private String msgId;
|
||||
|
||||
public DlqMessageResendResult(ConsumeMessageDirectlyResult consumeMessageDirectlyResult, String msgId) {
|
||||
this.consumeResult = consumeMessageDirectlyResult.getConsumeResult();
|
||||
this.remark = consumeMessageDirectlyResult.getRemark();
|
||||
this.msgId = msgId;
|
||||
}
|
||||
}
|
||||
@@ -17,10 +17,15 @@
|
||||
|
||||
package org.apache.rocketmq.dashboard.service;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
|
||||
import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
|
||||
import org.apache.rocketmq.dashboard.model.MessagePage;
|
||||
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
|
||||
|
||||
public interface DlqMessageService {
|
||||
|
||||
MessagePage queryDlqMessageByPage(MessageQuery query);
|
||||
|
||||
List<DlqMessageResendResult> batchResendDlqMessage(List<DlqMessageRequest> dlqMessages);
|
||||
}
|
||||
|
||||
@@ -19,12 +19,16 @@ package org.apache.rocketmq.dashboard.service.impl;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.protocol.ResponseCode;
|
||||
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
|
||||
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
|
||||
import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
|
||||
import org.apache.rocketmq.dashboard.model.MessagePage;
|
||||
import org.apache.rocketmq.dashboard.model.MessageView;
|
||||
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
|
||||
@@ -65,4 +69,17 @@ public class DlqMessageServiceImpl implements DlqMessageService {
|
||||
}
|
||||
return messageService.queryMessageByPage(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DlqMessageResendResult> batchResendDlqMessage(List<DlqMessageRequest> dlqMessages) {
|
||||
List<DlqMessageResendResult> batchResendResults = new LinkedList<>();
|
||||
for (DlqMessageRequest dlqMessage : dlqMessages) {
|
||||
ConsumeMessageDirectlyResult result = messageService.consumeMessageDirectly(dlqMessage.getTopicName(),
|
||||
dlqMessage.getMsgId(), dlqMessage.getConsumerGroup(),
|
||||
dlqMessage.getClientId());
|
||||
DlqMessageResendResult resendResult = new DlqMessageResendResult(result, dlqMessage.getMsgId());
|
||||
batchResendResults.add(resendResult);
|
||||
}
|
||||
return batchResendResults;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user