[ISSUE #30]Added DLQ message management (#31)

* [ISSUE #30]Added DLQ message management

* remove the specific namesrvAddr in application.properties.

Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
This commit is contained in:
zhangjidi2016
2021-10-31 11:29:55 +08:00
committed by GitHub
parent d5fed12773
commit 4b2b61e394
16 changed files with 936 additions and 20 deletions

View File

@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import com.google.common.collect.Lists;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.DlqMessageExcelModel;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.DlqMessageService;
import org.apache.rocketmq.dashboard.util.ExcelUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/dlqMessage")
@Permission
public class DlqMessageController {
@Resource
private DlqMessageService dlqMessageService;
@Resource
private MQAdminExt mqAdminExt;
@RequestMapping(value = "/queryDlqMessageByConsumerGroup.query", method = RequestMethod.POST)
@ResponseBody
public Object queryDlqMessageByConsumerGroup(@RequestBody MessageQuery query) {
return dlqMessageService.queryDlqMessageByPage(query);
}
@GetMapping(value = "/exportDlqMessage.do")
public void exportDlqMessage(HttpServletResponse response, @RequestParam String consumerGroup,
@RequestParam String msgId) {
MessageExt messageExt = null;
try {
String topic = MixAll.DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
messageExt = mqAdminExt.viewMessage(topic, msgId);
} catch (Exception e) {
throw new ServiceException(-1, String.format("Failed to query message by Id: %s", msgId));
}
DlqMessageExcelModel excelModel = new DlqMessageExcelModel(messageExt);
try {
ExcelUtil.writeExcel(response, Lists.newArrayList(excelModel), "dlq", "dlq", DlqMessageExcelModel.class);
} catch (Exception e) {
throw new ServiceException(-1, String.format("export dlq message failed!"));
}
}
}

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model;
import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import com.alibaba.excel.metadata.BaseRowModel;
import com.alibaba.excel.util.DateUtils;
import com.google.common.base.Charsets;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import org.apache.rocketmq.common.message.MessageExt;
@Data
public class DlqMessageExcelModel extends BaseRowModel implements Serializable {
@ExcelProperty(value = "topic", index = 0)
@ColumnWidth(value = 15)
private String topic;
@ExcelProperty(value = "msgId", index = 1)
@ColumnWidth(value = 15)
private String msgId;
@ExcelProperty(value = "bornHost", index = 2)
@ColumnWidth(value = 15)
private String bornHost;
@ExcelProperty(value = "bornTimestamp", index = 3)
@ColumnWidth(value = 25)
private String bornTimestamp;
@ExcelProperty(value = "storeTimestamp", index = 4)
@ColumnWidth(value = 25)
private String storeTimestamp;
@ExcelProperty(value = "reconsumeTimes", index = 5)
@ColumnWidth(value = 25)
private int reconsumeTimes;
@ExcelProperty(value = "properties", index = 6)
@ColumnWidth(value = 20)
private String properties;
@ExcelProperty(value = "messageBody", index = 7)
@ColumnWidth(value = 20)
private String messageBody;
@ExcelProperty(value = "bodyCRC", index = 8)
@ColumnWidth(value = 15)
private int bodyCRC;
public DlqMessageExcelModel(MessageExt messageExt) {
this.topic = messageExt.getTopic();
this.msgId = messageExt.getMsgId();
this.bornHost = messageExt.getBornHostString();
this.bornTimestamp = DateUtils.format(new Date(messageExt.getBornTimestamp()), DateUtils.DATE_FORMAT_19);
this.storeTimestamp = DateUtils.format(new Date(messageExt.getStoreTimestamp()), DateUtils.DATE_FORMAT_19);
this.reconsumeTimes = messageExt.getReconsumeTimes();
this.properties = messageExt.getProperties().toString();
this.messageBody = new String(messageExt.getBody(), Charsets.UTF_8);
this.bodyCRC = messageExt.getBodyCRC();
}
}

View File

@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
public interface DlqMessageService {
MessagePage queryDlqMessageByPage(MessageQuery query);
}

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.service.DlqMessageService;
import org.apache.rocketmq.dashboard.service.MessageService;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class DlqMessageServiceImpl implements DlqMessageService {
@Resource
private MQAdminExt mqAdminExt;
@Resource
private MessageService messageService;
@Override
public MessagePage queryDlqMessageByPage(MessageQuery query) {
List<MessageView> messageViews = new ArrayList<>();
PageRequest page = PageRequest.of(query.getPageNum(), query.getPageSize());
String topic = query.getTopic();
try {
mqAdminExt.examineTopicRouteInfo(topic);
} catch (MQClientException e) {
// If the %DLQ%Group does not exist, the message returns null
if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
&& e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
return new MessagePage(new PageImpl<>(messageViews, page, 0), query.getTaskId());
} else {
throw Throwables.propagate(e);
}
} catch (Exception e) {
throw Throwables.propagate(e);
}
return messageService.queryMessageByPage(query);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.util;
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.support.ExcelTypeEnum;
import com.alibaba.excel.write.metadata.style.WriteCellStyle;
import com.alibaba.excel.write.metadata.style.WriteFont;
import com.alibaba.excel.write.style.HorizontalCellStyleStrategy;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.apache.poi.ss.usermodel.HorizontalAlignment;
public class ExcelUtil {
public static void writeExcel(HttpServletResponse response, List<? extends Object> data, String fileName,
String sheetName, Class clazz) throws Exception {
WriteCellStyle headWriteCellStyle = new WriteCellStyle();
WriteFont writeFont = new WriteFont();
writeFont.setFontHeightInPoints((short)12);
writeFont.setFontName("Microsoft YaHei UI");
headWriteCellStyle.setWriteFont(writeFont);
headWriteCellStyle.setHorizontalAlignment(HorizontalAlignment.CENTER);
WriteCellStyle contentWriteCellStyle = new WriteCellStyle();
contentWriteCellStyle.setWriteFont(writeFont);
contentWriteCellStyle.setHorizontalAlignment(HorizontalAlignment.CENTER);
HorizontalCellStyleStrategy horizontalCellStyleStrategy = new HorizontalCellStyleStrategy(headWriteCellStyle, contentWriteCellStyle);
EasyExcel.write(getOutputStream(fileName, response), clazz)
.excelType(ExcelTypeEnum.XLSX).sheet(sheetName).registerWriteHandler(horizontalCellStyleStrategy).doWrite(data);
}
private static OutputStream getOutputStream(String fileName, HttpServletResponse response) throws Exception {
fileName = URLEncoder.encode(fileName, "UTF-8");
response.setContentType("application/vnd.ms-excel");
response.setCharacterEncoding("utf8");
response.setHeader("Content-Disposition", "attachment;filename=" + fileName + ".xlsx");
return response.getOutputStream();
}
}