diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/DlqMessageController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/DlqMessageController.java index 500040d..d32b1aa 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/DlqMessageController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/DlqMessageController.java @@ -17,12 +17,16 @@ package org.apache.rocketmq.dashboard.controller; import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.dashboard.exception.ServiceException; import org.apache.rocketmq.dashboard.model.DlqMessageExcelModel; +import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.request.MessageQuery; import org.apache.rocketmq.dashboard.permisssion.Permission; import org.apache.rocketmq.dashboard.service.DlqMessageService; @@ -30,6 +34,7 @@ import org.apache.rocketmq.dashboard.util.ExcelUtil; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -39,6 +44,7 @@ import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/dlqMessage") @Permission +@Slf4j public class DlqMessageController { @Resource @@ -70,4 +76,33 @@ public class DlqMessageController { throw new ServiceException(-1, String.format("export dlq message failed!")); } } + + @PostMapping(value = "/batchResendDlqMessage.do") + @ResponseBody + public Object batchResendDlqMessage(@RequestBody List dlqMessages) { + return dlqMessageService.batchResendDlqMessage(dlqMessages); + } + + @PostMapping(value = "/batchExportDlqMessage.do") + public void batchExportDlqMessage(HttpServletResponse response, @RequestBody List dlqMessages) { + List dlqMessageExcelModelList = new ArrayList<>(dlqMessages.size()); + for (DlqMessageRequest dlqMessage : dlqMessages) { + DlqMessageExcelModel excelModel = new DlqMessageExcelModel(); + try { + String topic = MixAll.DLQ_GROUP_TOPIC_PREFIX + dlqMessage.getConsumerGroup(); + MessageExt messageExt = mqAdminExt.viewMessage(topic, dlqMessage.getMsgId()); + excelModel = new DlqMessageExcelModel(messageExt); + } catch (Exception e) { + log.error("Failed to query message by Id:{}", dlqMessage.getMsgId(), e); + excelModel.setMsgId(dlqMessage.getMsgId()); + excelModel.setException(e.getMessage()); + } + dlqMessageExcelModelList.add(excelModel); + } + try { + ExcelUtil.writeExcel(response, dlqMessageExcelModelList, "dlqs", "dlqs", DlqMessageExcelModel.class); + } catch (Exception e) { + throw new ServiceException(-1, String.format("export dlq message failed!")); + } + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageExcelModel.java b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageExcelModel.java index 2476a23..e010a1c 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageExcelModel.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageExcelModel.java @@ -25,9 +25,11 @@ import com.google.common.base.Charsets; import java.io.Serializable; import java.util.Date; import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.rocketmq.common.message.MessageExt; @Data +@NoArgsConstructor public class DlqMessageExcelModel extends BaseRowModel implements Serializable { @ExcelProperty(value = "topic", index = 0) @@ -66,6 +68,10 @@ public class DlqMessageExcelModel extends BaseRowModel implements Serializable { @ColumnWidth(value = 15) private int bodyCRC; + @ExcelProperty(value = "exception", index = 9) + @ColumnWidth(value = 30) + private String exception; + public DlqMessageExcelModel(MessageExt messageExt) { this.topic = messageExt.getTopic(); this.msgId = messageExt.getMsgId(); diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageRequest.java b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageRequest.java new file mode 100644 index 0000000..bd98dfc --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model; + +import lombok.Data; + +@Data +public class DlqMessageRequest { + + private String topicName; + + private String consumerGroup; + + private String msgId; + + private String clientId; +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java new file mode 100644 index 0000000..44bf55f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/DlqMessageResendResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model; + +import lombok.Data; +import org.apache.rocketmq.common.protocol.body.CMResult; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; + +@Data +public class DlqMessageResendResult { + private CMResult consumeResult; + private String remark; + private String msgId; + + public DlqMessageResendResult(ConsumeMessageDirectlyResult consumeMessageDirectlyResult, String msgId) { + this.consumeResult = consumeMessageDirectlyResult.getConsumeResult(); + this.remark = consumeMessageDirectlyResult.getRemark(); + this.msgId = msgId; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/DlqMessageService.java b/src/main/java/org/apache/rocketmq/dashboard/service/DlqMessageService.java index 5cf9eb9..96adf92 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/DlqMessageService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/DlqMessageService.java @@ -17,10 +17,15 @@ package org.apache.rocketmq.dashboard.service; +import java.util.List; +import org.apache.rocketmq.dashboard.model.DlqMessageResendResult; +import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.request.MessageQuery; public interface DlqMessageService { MessagePage queryDlqMessageByPage(MessageQuery query); + + List batchResendDlqMessage(List dlqMessages); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java index 6fb822a..006f1c2 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/DlqMessageServiceImpl.java @@ -19,12 +19,16 @@ package org.apache.rocketmq.dashboard.service.impl; import com.google.common.base.Throwables; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.dashboard.model.DlqMessageResendResult; +import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.request.MessageQuery; @@ -65,4 +69,17 @@ public class DlqMessageServiceImpl implements DlqMessageService { } return messageService.queryMessageByPage(query); } + + @Override + public List batchResendDlqMessage(List dlqMessages) { + List batchResendResults = new LinkedList<>(); + for (DlqMessageRequest dlqMessage : dlqMessages) { + ConsumeMessageDirectlyResult result = messageService.consumeMessageDirectly(dlqMessage.getTopicName(), + dlqMessage.getMsgId(), dlqMessage.getConsumerGroup(), + dlqMessage.getClientId()); + DlqMessageResendResult resendResult = new DlqMessageResendResult(result, dlqMessage.getMsgId()); + batchResendResults.add(resendResult); + } + return batchResendResults; + } } diff --git a/src/main/resources/static/src/dlqMessage.js b/src/main/resources/static/src/dlqMessage.js index fd82db1..352e4cc 100644 --- a/src/main/resources/static/src/dlqMessage.js +++ b/src/main/resources/static/src/dlqMessage.js @@ -82,6 +82,9 @@ module.controller('dlqMessageController', ['$scope', 'ngDialog', '$http', 'Notif if ($scope.messageShowList.length == 0){ $("#noMsgTip").removeAttr("style"); } + for (const message of $scope.messageShowList) { + message.checked = false; + } console.log($scope.messageShowList); if (resp.data.page.first) { $scope.paginationConf.currentPage = 1; @@ -180,5 +183,109 @@ module.controller('dlqMessageController', ['$scope', 'ngDialog', '$http', 'Notif $scope.exportDlqMessage = function (msgId, consumerGroup) { window.location.href = "dlqMessage/exportDlqMessage.do?msgId=" + msgId + "&consumerGroup=" + consumerGroup; + }; + + $scope.selectedDlqMessage = []; + $scope.batchResendDlqMessage = function (consumerGroup) { + for (const message of $scope.messageCheckedList) { + const dlqMessage = {}; + dlqMessage.topic = message.properties.RETRY_TOPIC; + dlqMessage.msgId = message.properties.ORIGIN_MESSAGE_ID; + dlqMessage.consumerGroup = consumerGroup; + $scope.selectedDlqMessage.push(dlqMessage); + } + $http({ + method: "POST", + url: "dlqMessage/batchResendDlqMessage.do", + data: $scope.selectedDlqMessage + }).success(function (resp) { + $scope.selectedDlqMessage = []; + if (resp.status == 0) { + ngDialog.open({ + template: 'operationResultDialog', + data: { + result: resp.data + } + }); + } else { + ngDialog.open({ + template: 'operationResultDialog', + data: { + result: resp.errMsg + } + }); + } + }); + }; + + $scope.batchExportDlqMessage = function (consumerGroup) { + for (const message of $scope.messageCheckedList) { + const dlqMessage = {}; + dlqMessage.msgId = message.msgId; + dlqMessage.consumerGroup = consumerGroup; + $scope.selectedDlqMessage.push(dlqMessage); + } + $http({ + method: "POST", + url: "dlqMessage/batchExportDlqMessage.do", + data: $scope.selectedDlqMessage, + headers: { + 'Content-type': 'application/json' + }, + responseType: "arraybuffer" + }).success(function (resp) { + $scope.selectedDlqMessage = []; + const blob = new Blob([resp], {type: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}); + const objectUrl = URL.createObjectURL(blob); + const a = document.createElement('a'); + a.style.display = 'none'; + a.download = 'dlqs.xlsx'; + a.href = objectUrl; + a.click(); + document.body.removeChild(a) + }); + }; + + $scope.checkedAll = false; + $scope.messageCheckedList = []; + $scope.selectAll = function () { + $scope.messageCheckedList = []; + if ($scope.checkedAll == true) { + angular.forEach($scope.messageShowList, function (item, index) { + item.checked = true; + $scope.messageCheckedList.push(item); + }); + } else { + angular.forEach($scope.messageShowList, function (item, index) { + item.checked = false; + }); + } + checkBtn($scope.messageCheckedList) + console.log($scope.messageCheckedList) + } + + $scope.selectItem = function () { + var flag = true; + $scope.messageCheckedList = []; + angular.forEach($scope.messageShowList, function (item, index) { + if (item.checked) { + $scope.messageCheckedList.push(item); + } else { + flag = false; + } + }) + $scope.checkedAll = flag; + checkBtn($scope.messageCheckedList) + console.log($scope.messageCheckedList); + } + + function checkBtn(messageCheckList) { + if (messageCheckList.length == 0) { + $("#batchResendBtn").addClass("disabled"); + $("#batchExportBtn").addClass("disabled"); + } else { + $("#batchResendBtn").removeClass("disabled"); + $("#batchExportBtn").removeClass("disabled"); + } } }]); \ No newline at end of file diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index 48f73b4..7592f90 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -111,5 +111,7 @@ var en = { "TRACE_TOPIC":"TraceTopic", "SELECT_TRACE_TOPIC":"selectTraceTopic", "EXPORT": "export", - "NO_MATCH_RESULT": "no match result" + "NO_MATCH_RESULT": "no match result", + "BATCH_RESEND": "batchReSend", + "BATCH_EXPORT": "batchExport" } \ No newline at end of file diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index dad46d7..9779d91 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -112,5 +112,7 @@ var zh = { "TRACE_TOPIC":"消息轨迹主题", "SELECT_TRACE_TOPIC":"选择消息轨迹主题", "EXPORT": "导出", - "NO_MATCH_RESULT": "没有查到符合条件的结果" + "NO_MATCH_RESULT": "没有查到符合条件的结果", + "BATCH_RESEND": "批量重发", + "BATCH_EXPORT": "批量导出" } \ No newline at end of file diff --git a/src/main/resources/static/view/pages/dlqMessage.html b/src/main/resources/static/view/pages/dlqMessage.html index f2a68f8..90edb92 100644 --- a/src/main/resources/static/view/pages/dlqMessage.html +++ b/src/main/resources/static/view/pages/dlqMessage.html @@ -64,9 +64,22 @@ ng-click="queryDlqMessageByConsumerGroup()"> {{ 'SEARCH' | translate}} + + + @@ -77,6 +90,9 @@ + @@ -192,7 +208,7 @@
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java index 4254299..767881a 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/controller/DlqMessageControllerTest.java @@ -18,10 +18,14 @@ package org.apache.rocketmq.dashboard.controller; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; +import java.util.List; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.CMResult; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.dashboard.model.MessagePage; import org.apache.rocketmq.dashboard.model.MessageView; import org.apache.rocketmq.dashboard.model.request.MessageQuery; @@ -132,6 +136,42 @@ public class DlqMessageControllerTest extends BaseControllerTest { } + @Test + public void testBatchResendDlqMessage() throws Exception { + final String url = "/dlqMessage/batchResendDlqMessage.do"; + List dlqMessages = MockObjectUtil.createDlqMessageRequest(); + { + ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult(); + result.setConsumeResult(CMResult.CR_SUCCESS); + when(messageService.consumeMessageDirectly(any(), any(), any(), any())).thenReturn(result); + } + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + requestBuilder.content(JSON.toJSONString(dlqMessages)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().isOk()) + .andExpect(jsonPath("$.data", hasSize(2))) + .andExpect(jsonPath("$.data[0].consumeResult").value("CR_SUCCESS")); + } + + @Test + public void testBatchExportDlqMessage() throws Exception { + final String url = "/dlqMessage/batchExportDlqMessage.do"; + { + when(mqAdminExt.viewMessage("%DLQ%group_test", "0A9A003F00002A9F0000000000000310")) + .thenThrow(new RuntimeException()); + when(mqAdminExt.viewMessage("%DLQ%group_test", "0A9A003F00002A9F0000000000000311")) + .thenReturn(MockObjectUtil.createMessageExt()); + } + List dlqMessages = MockObjectUtil.createDlqMessageRequest(); + requestBuilder = MockMvcRequestBuilders.post(url); + requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); + requestBuilder.content(JSON.toJSONString(dlqMessages)); + perform = mockMvc.perform(requestBuilder); + perform.andExpect(status().is(200)) + .andExpect(content().contentType("application/vnd.ms-excel")); + } + @Override protected Object getTestController() { return dlqMessageController; } diff --git a/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java b/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java index 89a6764..658a169 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java +++ b/src/test/java/org/apache/rocketmq/dashboard/util/MockObjectUtil.java @@ -57,6 +57,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.dashboard.model.DlqMessageRequest; import org.apache.rocketmq.remoting.protocol.LanguageCode; import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; @@ -298,4 +299,16 @@ public class MockObjectUtil { brokerStatsData.setStatsMinute(statsMinute); return brokerStatsData; } + + public static List createDlqMessageRequest() { + List dlqMessages = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + DlqMessageRequest dlqMessageRequest = new DlqMessageRequest(); + dlqMessageRequest.setConsumerGroup("group_test"); + dlqMessageRequest.setTopicName("topic_test"); + dlqMessageRequest.setMsgId("0A9A003F00002A9F000000000000031" + i); + dlqMessages.add(dlqMessageRequest); + } + return dlqMessages; + } }
Message ID Tag Key{{'NO_MATCH_RESULT' | translate}}
+ + {{item.msgId}} {{item.properties.TAGS}} {{item.properties.KEYS}}