diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/AuthWebMVCConfigurerAdapter.java b/src/main/java/org/apache/rocketmq/dashboard/config/AuthWebMVCConfigurerAdapter.java index 3a28d70..100d7df 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/config/AuthWebMVCConfigurerAdapter.java +++ b/src/main/java/org/apache/rocketmq/dashboard/config/AuthWebMVCConfigurerAdapter.java @@ -24,6 +24,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Configuration; import org.springframework.core.MethodParameter; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.web.bind.support.WebDataBinderFactory; import org.springframework.web.context.request.NativeWebRequest; import org.springframework.web.method.support.HandlerMethodArgumentResolver; @@ -62,7 +64,8 @@ public class AuthWebMVCConfigurerAdapter extends WebMvcConfigurerAdapter { "/producer/**", "/test/**", "/topic/**", - "/acl/**"); + "/acl/**", + "/connect/**"); } } @@ -94,4 +97,9 @@ public class AuthWebMVCConfigurerAdapter extends WebMvcConfigurerAdapter { public void addViewControllers(ViewControllerRegistry registry) { registry.addViewController("*.htm").setViewName("forward:/app.html"); } + + @Override + public void configureMessageConverters(List> converters) { + converters.removeIf(e -> e.getClass().isAssignableFrom(StringHttpMessageConverter.class)); + } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConnectController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConnectController.java new file mode 100644 index 0000000..45511cd --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConnectController.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.controller; + +import org.apache.rocketmq.dashboard.permisssion.Permission; +import org.apache.rocketmq.dashboard.service.ConnectService; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.Resource; +import java.util.Map; + +@Controller +@RequestMapping("/connect") +@Permission +public class ConnectController { + + @Resource + private ConnectService connectService; + + @RequestMapping(value = "/WorkerConnectors.query", method = RequestMethod.GET) + @ResponseBody + public Object workerConnectorList() { + return connectService.queryWorkerConnectorList(); + } + + @RequestMapping(value = "/worker.query", method = RequestMethod.GET) + @ResponseBody + public Object workerList() { + return connectService.queryWorkerList(); + } + + @RequestMapping(value = "/workerTasks.query", method = RequestMethod.GET) + @ResponseBody + public Object workerTaskList() { + return connectService.queryWorkerTaskList(); + } + + + @RequestMapping(value = "/createConnector.do", method = RequestMethod.POST) + @ResponseBody + public String createConnector(@RequestBody Map workerConnector) { + return connectService.createConnector(workerConnector); + } + + @RequestMapping(value = "/stopConnector.do", method = RequestMethod.GET) + @ResponseBody + public String stopConnector(@RequestParam(value = "name") String connectorName) { + return connectService.stopConnector(connectorName); + } + + @RequestMapping(value = "/stopAllConnectors.do", method = RequestMethod.GET) + @ResponseBody + public String stopAllConnectors() { + return connectService.stopAllConnectors(); + } + + @RequestMapping(value = "/reloadConnector.do", method = RequestMethod.GET) + @ResponseBody + public String reloadConnector() { + return connectService.reloadAllConnectors(); + } + + @RequestMapping(value = "/connectorStatus.query", method = RequestMethod.GET) + @ResponseBody + public Object getConnectorStatus(@RequestParam(value = "name") String connectorName) { + + String connectorStatus = connectService.getConnectorStatus(connectorName); + return connectorStatus; + } + + @RequestMapping(value = "/allocatedConnectors.query", method = RequestMethod.GET) + @ResponseBody + public void allocatedConnectors(@RequestParam(value = "ipAddr") String ipAddr) { + } + + @RequestMapping(value = "/allocatedTasks.query", method = RequestMethod.GET) + @ResponseBody + public void allocatedTasks(@RequestParam(value = "ipAddr") String ipAddr) { + } + + + +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerConnector.java b/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerConnector.java new file mode 100644 index 0000000..9270406 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerConnector.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model.connect; + +import java.util.Map; + +public class WorkerConnector { + + private String connectorName; + + private String connectTopicname; + + private String connectorClass; + + private String namesrvAddr; + + private String workerPort; + + private Long updateTimestamp; + + private Map properties; + + + public WorkerConnector() { + } + + public WorkerConnector(String connectorName, String connectTopicname, String connectorClass, String namesrvAddr, String workerPort, Long updateTimestamp, Map properties) { + this.connectorName = connectorName; + this.connectTopicname = connectTopicname; + this.connectorClass = connectorClass; + this.namesrvAddr = namesrvAddr; + this.workerPort = workerPort; + this.updateTimestamp = updateTimestamp; + this.properties = properties; + } + + public String getConnectorName() { + return connectorName; + } + + public void setConnectorName(String connectorName) { + this.connectorName = connectorName; + } + + public String getConnectTopicname() { + return connectTopicname; + } + + public void setConnectTopicname(String connectTopicname) { + this.connectTopicname = connectTopicname; + } + + public String getConnectorClass() { + return connectorClass; + } + + public void setConnectorClass(String connectorClass) { + this.connectorClass = connectorClass; + } + + public Long getUpdateTimestamp() { + return updateTimestamp; + } + + public void setUpdateTimestamp(Long updateTimestamp) { + this.updateTimestamp = updateTimestamp; + } + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public String getWorkerPort() { + return workerPort; + } + + public void setWorkerPort(String workerPort) { + this.workerPort = workerPort; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public void set(String key, String value) { + properties.put(key, value); + } + + + @Override + public String toString() { + return "WorkerConnector{" + + "connectorName='" + connectorName + '\'' + + ", connectTopicname='" + connectTopicname + '\'' + + ", connectorClass='" + connectorClass + '\'' + + ", updateTimestamp=" + updateTimestamp + + ", properties=" + properties + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerInfo.java new file mode 100644 index 0000000..52a0f2d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerInfo.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model.connect; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class WorkerInfo { + private static final Logger log = LoggerFactory.getLogger(WorkerInfo.class); + + private String ipAddr; + + private String namesrvAddr; + + private List allocatedConnectors; + + private List allocatedTasks; + + public WorkerInfo(String ipAddr, String namesrvAddr, List allocatedConnectors, List allocatedTasks) { + this.ipAddr = ipAddr; + this.namesrvAddr = namesrvAddr; + this.allocatedConnectors = allocatedConnectors; + this.allocatedTasks = allocatedTasks; + } + + public WorkerInfo() { + } + + public static Logger getLog() { + return log; + } + + public String getIpAddr() { + return ipAddr; + } + + public void setIpAddr(String ipAddr) { + this.ipAddr = ipAddr; + } + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public List getAllocatedConnectors() { + return allocatedConnectors; + } + + public void setAllocatedConnectors(List allocatedConnectors) { + this.allocatedConnectors = allocatedConnectors; + } + + public List getAllocatedTasks() { + return allocatedTasks; + } + + public void setAllocatedTasks(List allocatedTasks) { + this.allocatedTasks = allocatedTasks; + } + + @Override + public String toString() { + return "WorkerInfo{" + + "ipAddr='" + ipAddr + '\'' + + ", namesrvAddr='" + namesrvAddr + '\'' + + ", workingConnectors=" + allocatedConnectors + + ", existingTasks=" + allocatedTasks + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerTask.java b/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerTask.java new file mode 100644 index 0000000..53f662b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/model/connect/WorkerTask.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.model.connect; + +import java.util.Map; + +public class WorkerTask { + + private String connectorName; + + private String connectTopicname; + + private String taskClass; + + private String connectorClass; + + private Long updateTimestamp; + + private Map properties; + + public WorkerTask() { + } + + public WorkerTask(String connectorName, String connectTopicName, String taskClass, String connectorClass, Long updateTimestamp, Map properties) { + this.connectorName = connectorName; + this.connectTopicname = connectTopicName; + this.taskClass = taskClass; + this.connectorClass = connectorClass; + this.updateTimestamp = updateTimestamp; + this.properties = properties; + } + + public String getConnectorName() { + return connectorName; + } + + public void setConnectorName(String connectorName) { + this.connectorName = connectorName; + } + + public String getConnectTopicname() { + return connectTopicname; + } + + public void setConnectTopicname(String connectTopicname) { + this.connectTopicname = connectTopicname; + } + + public String getTaskClass() { + return taskClass; + } + + public void setTaskClass(String taskClass) { + this.taskClass = taskClass; + } + + public String getConnectorClass() { + return connectorClass; + } + + public void setConnectorClass(String connectorClass) { + this.connectorClass = connectorClass; + } + + public Long getUpdateTimestamp() { + return updateTimestamp; + } + + public void setUpdateTimestamp(Long updateTimestamp) { + this.updateTimestamp = updateTimestamp; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + @Override + public String toString() { + return "WorkerTask{" + + "connectorName='" + connectorName + '\'' + + ", connectTopicName='" + connectTopicname + '\'' + + ", taskClass='" + taskClass + '\'' + + ", connectorClass='" + connectorClass + '\'' + + ", updateTimestamp=" + updateTimestamp + + ", properties=" + properties + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConnectService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConnectService.java new file mode 100644 index 0000000..cc21579 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConnectService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service; + + +import org.apache.rocketmq.dashboard.model.connect.WorkerConnector; +import org.apache.rocketmq.dashboard.model.connect.WorkerInfo; +import org.apache.rocketmq.dashboard.model.connect.WorkerTask; + +import java.util.List; +import java.util.Map; + +public interface ConnectService { + + List queryWorkerConnectorList(); + + List queryWorkerList(); + + List queryWorkerTaskList(); + + + String stopConnector(String connectorName); + + String stopAllConnectors(); + + String reloadAllConnectors(); + + String createConnector(Map workerConnector); + + + String getAllocatedConnectors(String ipAddr); + + String getAllocatedTasks(String ipAddr); + + String getConnectorStatus(String connectorName); + +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConnectServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConnectServiceImpl.java new file mode 100644 index 0000000..10bc997 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConnectServiceImpl.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.dashboard.model.connect.WorkerConnector; +import org.apache.rocketmq.dashboard.model.connect.WorkerInfo; +import org.apache.rocketmq.dashboard.model.connect.WorkerTask; +import org.apache.rocketmq.dashboard.service.ConnectService; +import org.apache.rocketmq.dashboard.util.HttpRequestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +@Service +public class ConnectServiceImpl implements ConnectService { + private static final Logger log = LoggerFactory.getLogger(ConnectServiceImpl.class); + + @Value("${rocketmq.config.connectAPIAddr}") + private static String ipAddr; + + @Override + public List queryWorkerList() { + String status = HttpRequestUtil.requestString("/getClusterInfo"); + JSONArray jsonArray = JSON.parseArray(status); + List workerInfoList = new ArrayList<>(); + for (Iterator iterator = jsonArray.iterator(); iterator.hasNext(); ) { + WorkerInfo workerInfo = new WorkerInfo(); + String next = (String) iterator.next(); + int seperator = next.indexOf("@", 0); + String worker = next.substring(0, seperator); + String namesrv = next.substring(seperator + 1, next.length()); + + workerInfo.setIpAddr(worker); + workerInfo.setNamesrvAddr(namesrv); + + workerInfoList.add(workerInfo); + } + return workerInfoList; + } + + @Override + public List queryWorkerConnectorList() { + String status = HttpRequestUtil.requestString("/getConfigInfo"); + JSONObject jsonObject = JSON.parseObject(status).getJSONObject("connectorConfigs"); + List workerConnectorList = new ArrayList<>(); + for (Map.Entry entry : jsonObject.entrySet()) { + JSONObject properties = ((JSONObject) entry.getValue()).getJSONObject("properties"); + properties.put("connector-name", entry.getKey()); + + WorkerConnector workerConnector = JSON.parseObject(properties.toJSONString(), WorkerConnector.class); + + Map finalProperties = (Map) JSONObject.parse(properties.toJSONString()); + workerConnector.setProperties(finalProperties); + + workerConnectorList.add(workerConnector); + } + return workerConnectorList; + } + + @Override + public List queryWorkerTaskList() { + String status = HttpRequestUtil.requestString("/getConfigInfo"); + JSONObject jsonObject = JSON.parseObject(status).getJSONObject("taskConfigs"); + List workerTaskList = new ArrayList<>(); + for (Map.Entry entry : jsonObject.entrySet()) { + JSONObject properties = ((JSONArray) entry.getValue()).getJSONObject(0).getJSONObject("properties"); + properties.put("connector-name", entry.getKey()); + + WorkerTask workerTask = JSON.parseObject(properties.toJSONString(), WorkerTask.class); + + Map finalProperties = (Map) JSONObject.parse(properties.toJSONString()); + workerTask.setProperties(finalProperties); + + workerTaskList.add(workerTask); + } + return workerTaskList; + } + + + @Override + public String getConnectorStatus(String connectorName) { + return HttpRequestUtil.requestString("/connectors/" + connectorName + "/status"); + } + + @Override + public String createConnector(Map workerConnector) { + String url = null; + JSONObject jsonObject = null; + String ipAddr = workerConnector.get("clusterAddr").trim(); + String workerPort = workerConnector.get("workerPort").trim(); + String connectorName = workerConnector.get("connectorName").trim(); + url = "http://" + ipAddr + ":" + workerPort + "/connectors/" + connectorName; + + String properties = workerConnector.get("Properties").trim(); + if (properties != null) { + jsonObject = JSONObject.parseObject(properties); + } + + try { + jsonObject.put("connect-topicname", workerConnector.get("connectTopicname").trim()); + jsonObject.put("connector-class", workerConnector.get("connectorClass").trim()); + } catch (Exception e) { + throw new RuntimeException("BAD PARAMS, " + e); + } + + return HttpRequestUtil.postRequest(url, jsonObject); + } + + @Override + public String stopConnector(String connectorName) { + return HttpRequestUtil.requestString("/connectors/" + connectorName + "/stop"); + } + + @Override + public String stopAllConnectors() { + return HttpRequestUtil.requestString("/connectors/stopAll"); + } + + @Override + public String reloadAllConnectors() { + return HttpRequestUtil.requestString("/plugin/reload"); + } + + @Override + public String getAllocatedConnectors(String ipAddr) { + String status = HttpRequestUtil.requestString("/getAllocatedConnectors"); + + return null; + } + + @Override + public String getAllocatedTasks(String ipAddr) { + String status = HttpRequestUtil.requestString("/getAllocatedTasks"); + + return null; + } + + +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/util/HttpRequestUtil.java b/src/main/java/org/apache/rocketmq/dashboard/util/HttpRequestUtil.java new file mode 100644 index 0000000..c3dd921 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/util/HttpRequestUtil.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.util; + +import com.alibaba.fastjson.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; + + +@Component +public class HttpRequestUtil { + + private static final Logger log = LoggerFactory.getLogger(HttpRequestUtil.class); + + private static String ipAddr; + + @Value("${rocketmq.config.connectAPIAddr}") + public void setIpAddr(String ipAddr) { + HttpRequestUtil.ipAddr = ipAddr; + } + + public static String requestString(String urlSuffix) { + String urlString = getUrl(urlSuffix); + + return requestMsg(urlString); + } + + public static T requestJSON(String urlSuffix, Class targetClass) { + String urlString = getUrl(urlSuffix); + String responseByte = requestMsg(urlString); + + return JsonUtil.string2Obj(responseByte, targetClass); + } + + public static String getUrl(String urlSuffix) { + if (ipAddr == null || ipAddr.length() == 0) { + throw new RuntimeException("Failed to get url! Please edit it in application.properties!"); + } + + return "http://" + ipAddr + urlSuffix; + } + + public static String requestMsg(String urlString) { + + HttpURLConnection conn = null; + InputStream is = null; + + try { + + URL mURL = new URL(urlString); + conn = (HttpURLConnection) mURL.openConnection(); + + conn.setRequestMethod("GET"); + conn.setReadTimeout(5000); + conn.setConnectTimeout(10000); + + int responseCode = conn.getResponseCode(); + if (responseCode == 200) { + + is = conn.getInputStream(); + String state = getStringFromInputStream(is); + return state; + } else { + log.info("Failed to request! Code: " + responseCode); + + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + return null; + } + + private static String getStringFromInputStream(InputStream is) + throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + + byte[] buffer = new byte[1024]; + int len = -1; + + while ((len = is.read(buffer)) != -1) { + os.write(buffer, 0, len); + } + is.close(); + String state = os.toString(); + os.close(); + return state; + } + + public static String postRequest(String url, JSONObject requestParam) { + + PrintWriter out = null; + BufferedReader in = null; + String result = ""; + try { + URL realUrl = new URL(url); + URLConnection conn = realUrl.openConnection(); + + conn.setDoOutput(true); + conn.setDoInput(true); + + conn.setReadTimeout(5000); + conn.setConnectTimeout(5000); + + conn.setRequestProperty("Content-type", "application/json;charset=UTF-8"); + + out = new PrintWriter(conn.getOutputStream()); + out.print(requestParam); + out.flush(); + + in = new BufferedReader( + new InputStreamReader(conn.getInputStream())); + + String line; + while ((line = in.readLine()) != null) { + line = new String(line.getBytes(), "utf-8"); + result += line; + } + } catch (Exception e) { + throw new RuntimeException("Error connecting to: " + url + "," + e); + } finally { + try { + if (out != null) { + out.close(); + } + if (in != null) { + in.close(); + } + } catch (IOException ex) { + throw new RuntimeException("Error closing connection"); + } + } + return result; + + + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0ab405e..8afc032 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -60,7 +60,8 @@ rocketmq: # set the accessKey and secretKey if you used acl accessKey: # if version > 4.4.0 secretKey: # if version > 4.4.0 - + # set if use rocketmq-connect + connectAPIAddr: 127.0.0.1:8082 threadpool: config: coreSize: 10 diff --git a/src/main/resources/role-permission.yml b/src/main/resources/role-permission.yml index 9676b39..7825c1c 100644 --- a/src/main/resources/role-permission.yml +++ b/src/main/resources/role-permission.yml @@ -38,3 +38,5 @@ rolePerms: - /dlqMessage/exportDlqMessage.do - /dlqMessage/batchResendDlqMessage.do - /acl/*.query + - /connect/*.query + - /connect/*.do diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index c2bf349..b1be32a 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -114,5 +114,6 @@ + diff --git a/src/main/resources/static/src/app.js b/src/main/resources/static/src/app.js index a7ca1be..b1c663b 100644 --- a/src/main/resources/static/src/app.js +++ b/src/main/resources/static/src/app.js @@ -216,6 +216,9 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro }).when('/acl', { templateUrl: 'view/pages/acl.html', controller: 'aclController' + }).when('/connect', { + templateUrl: 'view/pages/connect.html', + controller: 'connectController' }).when('/404', { templateUrl: 'view/pages/404.html' }).otherwise('/404'); diff --git a/src/main/resources/static/src/connect.js b/src/main/resources/static/src/connect.js new file mode 100644 index 0000000..f0ae13f --- /dev/null +++ b/src/main/resources/static/src/connect.js @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var module = app; + +module.controller('connectController', ['$scope', 'ngDialog', '$http', 'Notification', function ($scope, ngDialog, $http, Notification) { + $scope.allTopicList = []; + $scope.selectedTopic = []; + + $scope.connectorList = []; + $scope.workerTaskList = []; + $scope.allWorkerList = []; + + $scope.respConnectors = []; + $scope.respTasks = []; + $scope.respWorkers = []; + + $scope.connectorPaginationConf = { + currentPage: 1, + totalItems: 0, + itemsPerPage: 10, + pagesLength: 15, + perPageOptions: [10], + rememberPerPage: 'perPageItems', + onChange: function () { + $scope.queryWorkerConnectorList(this.currentPage, this.totalItems); + } + }; + + $scope.taskPaginationConf = { + currentPage: 1, + totalItems: 0, + itemsPerPage: 10, + pagesLength: 15, + perPageOptions: [10], + rememberPerPage: 'perPageItems', + onChange: function () { + $scope.queryWorkerTaskList(this.currentPage, this.totalItems); + } + }; + + $scope.workerPaginationConf = { + currentPage: 1, + totalItems: 0, + itemsPerPage: 10, + pagesLength: 15, + perPageOptions: [10], + rememberPerPage: 'perPageItems', + onChange: function () { + $scope.queryWorkerList(this.currentPage, this.totalItems); + } + }; + + $scope.queryWorkerConnectorList = function (currentPage, totalItem) { + var perPage = $scope.connectorPaginationConf.itemsPerPage; + var from = (currentPage - 1) * perPage; + var to = (from + perPage) > totalItem ? totalItem : from + perPage; + $scope.connectorList = $scope.respConnectors.slice(from, to); + $scope.connectorPaginationConf.totalItems = totalItem; + + }; + + $scope.queryWorkerTaskList = function (currentPage, totalItem) { + var perPage = $scope.taskPaginationConf.itemsPerPage; + var from = (currentPage - 1) * perPage; + var to = (from + perPage) > totalItem ? totalItem : from + perPage; + $scope.workerTaskList = $scope.respTasks.slice(from, to); + $scope.taskPaginationConf.totalItems = totalItem; + + }; + + $scope.queryWorkerList = function (currentPage, totalItem) { + var perPage = $scope.workerPaginationConf.itemsPerPage; + var from = (currentPage - 1) * perPage; + var to = (from + perPage) > totalItem ? totalItem : from + perPage; + $scope.allWorkerList = $scope.respWorkers.slice(from, to); + $scope.workerPaginationConf.totalItems = totalItem; + + }; + + function queryTopicName() { + $http({ + method: "GET", + url: "topic/list.query", + }).success(function (resp) { + if (resp.status == 0) { + $scope.allTopicList = resp.data.topicList.sort(); + console.log($scope.allTopicList); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + } + + + + $scope.showWorkerTaskList = function () { + $http({ + method: "GET", + url: "connect/workerTasks.query", + }).success(function (resp) { + if (resp.status === 0) { + console.log(resp); + + $scope.respTasks = resp.data; + $scope.queryWorkerTaskList($scope.taskPaginationConf.currentPage, $scope.respTasks.length); + + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + + $scope.showWorkerConnectorList = function () { + $http({ + method: "GET", + url: "connect/WorkerConnectors.query", + }).success(function (resp) { + if (resp.status === 0) { + console.log(resp); + $scope.respConnectors = resp.data; + + $scope.queryWorkerConnectorList($scope.connectorPaginationConf.currentPage, $scope.respConnectors.length); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + + $scope.showWorkerList = function () { + $http({ + method: "GET", + url: "connect/worker.query", + }).success(function (resp) { + if (resp.status === 0) { + console.log(resp); + + $scope.respWorkers = resp.data; + $scope.queryWorkerList($scope.workerPaginationConf.currentPage, $scope.respWorkers.length); + + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + + + $scope.queryConnectorStatus = function (name) { + $http({ + method: "GET", + url: "connect/connectorStatus.query", + params: { + name: name + } + }).success(function (resp) { + if (resp.data === "running") { + Notification.info(resp.data); + } else { + Notification.error("not running"); + } + console.log("Connector: " + name + ", Status:" + resp.data); + }) + }; + + + $scope.stopThisConnector = function (name) { + $http({ + method: "GET", + url: "connect/stopConnector.do", + params: { + name: name + } + }).success(function (resp) { + if (resp.status == 0) { + if (resp.data === "success") { + Notification.info({message: "success!", delay: 2000}); + } else { + Notification.error(resp.data); + } + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + $scope.showWorkerConnectorList(); + }) + }; + + $scope.stopAllConnectors = function () { + $http({ + method: "GET", + url: "connect/stopAllConnectors.do" + }).success(function (resp) { + if (resp.status == 0) { + if (resp.data === "success") { + Notification.info({message: "success!", delay: 2000}); + } else { + Notification.error(resp.data); + } + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + $scope.queryWorkerConnectorList(); + }) + }; + + $scope.reloadAllConnectors = function () { + $http({ + method: "GET", + url: "connect/reloadConnector.do" + }).success(function (resp) { + if (resp.status == 0) { + if (resp.data === "success") { + Notification.info({message: "success!", delay: 2000}); + } else { + Notification.error(resp.data); + } + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }) + + }; + + + $scope.connectorDetail = function (item) { + ngDialog.open({ + template: 'ConnectorViewDialog', + data: item + }); + }; + + $scope.taskDetail = function (item) { + ngDialog.open({ + template: 'TaskViewDialog', + data: item + }); + }; + + $scope.workerDetail = function (item) { + ngDialog.open({ + template: 'WorkerViewDialog', + data: item + }); + }; + + + $scope.openCreationDialog = function () { + $scope.showWorkerList(); + queryTopicName(); + ngDialog.open({ + template: 'connectorCreationDialog', + scope: $scope + + }); + }; + + + $scope.postConnectorRequest = function (connectRequestItem) { + console.log(connectRequestItem); + connectRequestItem.clusterAddr = connectRequestItem.clusterAddr.ipAddr; + var request = JSON.parse(JSON.stringify(connectRequestItem)); + console.log(request); + $http({ + method: "POST", + url: "connect/createConnector.do", + data: request + }).success(function (resp) { + if (resp.status == 0) { + if (resp.data === "success") { + Notification.info({message: "success!", delay: 2000}); + $scope.queryWorkerConnectorList(); + ngDialog.close(this); + } else { + Notification.error(resp.data); + } + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + } + +}]); \ No newline at end of file diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index f9a4e3c..67922c3 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -32,16 +32,16 @@ var en = { "CLUSTER": "Cluster", "CLUSTER_DETAIL": "Cluster Detail", "TOPIC": "Topic", - "SUBSCRIPTION_GROUP":"SubscriptionGroup", - "PRODUCER_GROUP":"ProducerGroup", - "CONSUMER":"Consumer", - "PRODUCER":"Producer", - "MESSAGE":"Message", - "MESSAGE_DETAIL":"Message Detail", - "RESEND_MESSAGE":"Resend Message", - "VIEW_EXCEPTION":"View Exception", - "MESSAGETRACE":"MessageTrace", - "DLQ_MESSAGE":"DLQMessage", + "SUBSCRIPTION_GROUP": "SubscriptionGroup", + "PRODUCER_GROUP": "ProducerGroup", + "CONSUMER": "Consumer", + "PRODUCER": "Producer", + "MESSAGE": "Message", + "MESSAGE_DETAIL": "Message Detail", + "RESEND_MESSAGE": "Resend Message", + "VIEW_EXCEPTION": "View Exception", + "MESSAGETRACE": "MessageTrace", + "DLQ_MESSAGE": "DLQMessage", "COMMIT": "Commit", "OPERATION": "Operation", "ADD": "Add", @@ -58,70 +58,78 @@ var en = { "NORMAL": "NORMAL", "RETRY": "RETRY", "DLQ": "DLQ", - "QUANTITY":"Quantity", - "TYPE":"Type", - "MODE":"Mode", - "DELAY":"Delay", - "DASHBOARD":"Dashboard", - "CONSUME_DETAIL":"CONSUME DETAIL", - "CLIENT":"CLIENT", - "LAST_CONSUME_TIME":"LastConsumeTime", - "TIME":"Time", - "RESET":"RESET", - "DATE":"Date", - "NO_DATA":"NO DATA", - "SEARCH":"Search", - "BEGIN":"Begin", - "END":"End", - "TOPIC_CHANGE":"Topic Change", - "SEND":"Send", - "SUBSCRIPTION_CHANGE":"Subscription Change", - "QUEUE":"Queue", - "MIN_OFFSET":"minOffset", - "MAX_OFFSET":"maxOffset", - "LAST_UPDATE_TIME_STAMP":"lastUpdateTimeStamp", - "QUEUE_DATAS":"queueDatas", - "READ_QUEUE_NUMS":"readQueueNums", - "WRITE_QUEUE_NUMS":"writeQueueNums", - "PERM":"perm", - "TAG":"Tag", - "KEY":"Key", - "MESSAGE_BODY":"Message Body", - "TOPIC_NAME":"topicName", - "ORDER":"order", - "CONSUMER_CLIENT":"consumerClient", - "BROKER_OFFSET":"brokerOffset", - "CONSUMER_OFFSET":"consumerOffset", - "DIFF_TOTAL":"diffTotal", - "LAST_TIME_STAMP":"lastTimeStamp", - "RESET_OFFSET":"resetOffset", - "CLUSTER_NAME":"clusterName", - "OPS":"OPS", - "AUTO_REFRESH":"AUTO_REFRESH", - "REFRESH":"REFRESH", - "LOGOUT":"Logout", - "LOGIN":"Login", - "USER_NAME":"Username", - "PASSWORD":"Password", - "NO_DATA":"Don't have ", - "SYSTEM":"SYSTEM", - "WELCOME":"Hi, welcome using RocketMQ Dashboard", - "ENABLE_MESSAGE_TRACE":"Enable Message Trace", - "MESSAGE_TRACE_DETAIL":"Message Trace Detail", - "TRACE_TOPIC":"TraceTopic", - "SELECT_TRACE_TOPIC":"selectTraceTopic", + "QUANTITY": "Quantity", + "TYPE": "Type", + "MODE": "Mode", + "DELAY": "Delay", + "DASHBOARD": "Dashboard", + "CONSUME_DETAIL": "CONSUME DETAIL", + "CLIENT": "CLIENT", + "LAST_CONSUME_TIME": "LastConsumeTime", + "TIME": "Time", + "RESET": "RESET", + "DATE": "Date", + "NO_DATA": "NO DATA", + "SEARCH": "Search", + "BEGIN": "Begin", + "END": "End", + "TOPIC_CHANGE": "Topic Change", + "SEND": "Send", + "SUBSCRIPTION_CHANGE": "Subscription Change", + "QUEUE": "Queue", + "MIN_OFFSET": "minOffset", + "MAX_OFFSET": "maxOffset", + "LAST_UPDATE_TIME_STAMP": "lastUpdateTimeStamp", + "QUEUE_DATAS": "queueDatas", + "READ_QUEUE_NUMS": "readQueueNums", + "WRITE_QUEUE_NUMS": "writeQueueNums", + "PERM": "perm", + "TAG": "Tag", + "KEY": "Key", + "MESSAGE_BODY": "Message Body", + "TOPIC_NAME": "topicName", + "ORDER": "order", + "CONSUMER_CLIENT": "consumerClient", + "BROKER_OFFSET": "brokerOffset", + "CONSUMER_OFFSET": "consumerOffset", + "DIFF_TOTAL": "diffTotal", + "LAST_TIME_STAMP": "lastTimeStamp", + "RESET_OFFSET": "resetOffset", + "CLUSTER_NAME": "clusterName", + "OPS": "OPS", + "AUTO_REFRESH": "AUTO_REFRESH", + "REFRESH": "REFRESH", + "LOGOUT": "Logout", + "LOGIN": "Login", + "USER_NAME": "Username", + "PASSWORD": "Password", + "NO_DATA": "Don't have ", + "SYSTEM": "SYSTEM", + "WELCOME": "Hi, welcome using RocketMQ Dashboard", + "ENABLE_MESSAGE_TRACE": "Enable Message Trace", + "MESSAGE_TRACE_DETAIL": "Message Trace Detail", + "TRACE_TOPIC": "TraceTopic", + "SELECT_TRACE_TOPIC": "selectTraceTopic", "EXPORT": "export", "NO_MATCH_RESULT": "no match result", "BATCH_RESEND": "batchReSend", "BATCH_EXPORT": "batchExport", - "WHITE_LIST":"White List", - "ACCOUNT_INFO":"Account Info", - "IS_ADMIN":"Is Admin", - "DEFAULT_TOPIC_PERM":"Default Topic Permission", - "DEFAULT_GROUP_PERM":"Default Group Permission", - "TOPIC_PERM":"Topic Permission", - "GROUP_PERM":"Group Permission", - "SYNCHRONIZE":"Synchronize Data", - "SHOW":"Show", - "HIDE":"Hide" + "WHITE_LIST": "White List", + "ACCOUNT_INFO": "Account Info", + "IS_ADMIN": "Is Admin", + "DEFAULT_TOPIC_PERM": "Default Topic Permission", + "DEFAULT_GROUP_PERM": "Default Group Permission", + "TOPIC_PERM": "Topic Permission", + "GROUP_PERM": "Group Permission", + "SYNCHRONIZE": "Synchronize Data", + "SHOW": "Show", + "HIDE": "Hide", + "CONNECT": "Connect", + "CREATE_CONNECTOR": "Create Connector", + "RELOAD_ALL_CONNECTORS": "Reload All Connectors", + "STOP_ALL_CONNECTORS": "Stop All Connectors", + "CONNECTOR_DETAIL": "Connector Detail", + "STOP": "Stop", + "TASK_DETAIL": "Task Detail", + "DETAIL": "Detail" } diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index b6fa589..78ae755 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -124,5 +124,13 @@ var zh = { "GROUP_PERM":"消费组权限", "SYNCHRONIZE":"同步", "SHOW":"显示", - "HIDE":"隐藏" + "HIDE":"隐藏", + "CONNECT":"连接", + "CREATE_CONNECTOR":"创建连接器", + "RELOAD_ALL_CONNECTORS": "重新载入连接器", + "STOP_ALL_CONNECTORS": "停止所有连接器", + "CONNECTOR_DETAIL": "连接器详情", + "STOP": "停止", + "TASK_DETAIL": "任务详情", + "DETAIL": "详情" } \ No newline at end of file diff --git a/src/main/resources/static/view/layout/_header.html b/src/main/resources/static/view/layout/_header.html index f448541..6604c86 100644 --- a/src/main/resources/static/view/layout/_header.html +++ b/src/main/resources/static/view/layout/_header.html @@ -36,6 +36,7 @@
  • {{'MESSAGE' | translate}}
  • {{'DLQ_MESSAGE' | translate}}
  • {{'MESSAGETRACE' | translate}}
  • +
  • {{'CONNECT' | translate}}
  • Acl