[GSOC][RIP-78][ISSUES#308] Add part of refactored backend files (#313)

This commit is contained in:
Crazylychee
2025-06-16 14:04:53 +08:00
committed by GitHub
parent 3cbff604e6
commit bc1a05d16c
86 changed files with 1624 additions and 1002 deletions

50
pom.xml
View File

@@ -82,29 +82,28 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<guava.version>29.0-jre</guava.version>
<commons-digester.version>2.1</commons-digester.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-io.version>2.4</commons-io.version>
<commons-cli.version>1.2</commons-cli.version>
<commons-collections.version>3.2.2</commons-collections.version>
<rocketmq.version>5.1.0</rocketmq.version>
<rocketmq.version>5.3.3</rocketmq.version>
<surefire.version>2.19.1</surefire.version>
<aspectj.version>1.9.6</aspectj.version>
<lombok.version>1.18.22</lombok.version>
<main.basedir>${basedir}/../..</main.basedir>
<docker.image.prefix>apacherocketmq</docker.image.prefix>
<spring.boot.version>2.6.0</spring.boot.version>
<spring.boot.version>3.4.5</spring.boot.version>
<mockito-inline.version>3.3.3</mockito-inline.version>
<jaxb-api.version>2.3.1</jaxb-api.version>
<jakarta.xml.bind-api.version>4.0.0</jakarta.xml.bind-api.version>
<commons-pool2.version>2.4.3</commons-pool2.version>
<easyexcel.version>2.2.10</easyexcel.version>
<asm.version>4.2</asm.version>
<junit.version>4.12</junit.version>
<snakeyaml.version>1.32</snakeyaml.version>
<snakeyaml.version>2.0</snakeyaml.version>
<cglib.version>2.2.2</cglib.version>
<joor.version>0.9.6</joor.version>
<bcpkix-jdk15on.version>1.68</bcpkix-jdk15on.version>
@@ -115,6 +114,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
<exclusions>
<exclusion>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -235,9 +240,9 @@
<version>${bcpkix-jdk15on.version}</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>${jaxb-api.version}</version>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<version>${jakarta.xml.bind-api.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
@@ -282,7 +287,7 @@
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<version>3.11.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
@@ -296,19 +301,22 @@
<version>${lombok.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<!-- <executions>-->
<!-- <execution>-->
<!-- <goals>-->
<!-- <goal>repackage</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
</plugin>
<!-- Use dockerfile-maven instead, https://github.com/spotify/dockerfile-maven -->
<plugin>
@@ -384,9 +392,9 @@
<version>4.3.0</version>
<dependencies>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>${jaxb-api.version}</version>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<version>${jakarta.xml.bind-api.version}</version>
</dependency>
</dependencies>
</plugin>

View File

@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@FunctionalInterface
public interface MQAdminExtCallback<T> {
T doInMQAdminExt(MQAdminExt mqAdminExt) throws Exception;
}

View File

@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.dashboard.admin;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -26,6 +25,8 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class MQAdminFactory {
private RMQConfigure rmqConfigure;

View File

@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Component
@Slf4j
public class UserMQAdminPoolManager {
private final ConcurrentMap<String/* userAk */, GenericObjectPool<MQAdminExt>> userPools = new ConcurrentHashMap<>();
private final ClientConfig baseClientConfig;
@Autowired
public UserMQAdminPoolManager(RMQConfigure rmqConfigure) {
this.baseClientConfig = new ClientConfig();
this.baseClientConfig.setNamesrvAddr(rmqConfigure.getNamesrvAddr());
this.baseClientConfig.setClientCallbackExecutorThreads(rmqConfigure.getClientCallbackExecutorThreads());
this.baseClientConfig.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel()));
this.baseClientConfig.setUseTLS(rmqConfigure.isUseTLS());
log.info("UserMQAdminPoolManager initialized with baseClientConfig for NameServer: {}", rmqConfigure.getNamesrvAddr());
}
public MQAdminExt borrowMQAdminExt(String userAk, String userSk) throws Exception {
GenericObjectPool<MQAdminExt> userPool = userPools.get(userAk);
if (userPool == null) {
log.info("Creating new MQAdminExt pool for user: {}", userAk);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(1);
poolConfig.setMaxIdle(1);
poolConfig.setMinIdle(0);
poolConfig.setTestWhileIdle(true);
poolConfig.setTimeBetweenEvictionRunsMillis(20000);
poolConfig.setMaxWaitMillis(10000);
UserSpecificMQAdminPooledObjectFactory factory =
new UserSpecificMQAdminPooledObjectFactory(baseClientConfig, userAk, userSk);
GenericObjectPool<MQAdminExt> newUserPool = new GenericObjectPool<>(factory, poolConfig);
GenericObjectPool<MQAdminExt> existingPool = userPools.putIfAbsent(userAk, newUserPool);
if (existingPool != null) {
log.warn("Another thread concurrently created MQAdminExt pool for user {}. Shutting down redundant pool.", userAk);
newUserPool.close();
userPool = existingPool;
} else {
userPool = newUserPool;
log.info("Successfully created and registered MQAdminExt pool for user: {}", userAk);
}
}
return userPool.borrowObject();
}
public void returnMQAdminExt(String userAk, MQAdminExt mqAdminExt) {
GenericObjectPool<MQAdminExt> userPool = userPools.get(userAk);
if (userPool != null) {
try {
userPool.returnObject(mqAdminExt);
log.debug("Returned MQAdminExt object ({}) to pool for user: {}", mqAdminExt, userAk);
} catch (Exception e) {
log.error("Failed to return MQAdminExt object ({}) for user {}: {}", mqAdminExt, userAk, e.getMessage(), e);
if (mqAdminExt != null) {
try {
mqAdminExt.shutdown();
} catch (Exception se) {
log.warn("Error shutting down MQAdminExt after failed return: {}", se.getMessage());
}
}
}
} else {
log.warn("Attempted to return MQAdminExt for non-existent user pool: {}. Shutting down the object directly.", userAk);
if (mqAdminExt != null) {
try {
mqAdminExt.shutdown();
} catch (Exception se) {
log.warn("Error shutting down MQAdminExt for non-existent pool: {}", se.getMessage());
}
}
}
}
public void shutdownUserPool(String userAk) {
GenericObjectPool<MQAdminExt> userPool = userPools.remove(userAk);
if (userPool != null) {
userPool.close();
log.info("Shutdown and removed MQAdminExt pool for user: {}", userAk);
} else {
log.warn("Attempted to shut down non-existent user pool: {}", userAk);
}
}
@PreDestroy
public void shutdownAllPools() {
log.info("Shutting down all MQAdminExt user pools...");
userPools.forEach((userAk, pool) -> {
pool.close();
log.info("Shutdown MQAdminExt pool for user: {}", userAk);
});
userPools.clear();
log.info("All MQAdminExt user pools have been shut down.");
}
}

View File

@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class UserSpecificMQAdminPooledObjectFactory implements PooledObjectFactory<MQAdminExt> {
private final ClientConfig userSpecificClientConfig;
private final RPCHook rpcHook;
private final String userAk;
private final AtomicLong instanceCreationCounter = new AtomicLong(0);
public UserSpecificMQAdminPooledObjectFactory(ClientConfig baseClientConfig, String userAk, String userSk) {
this.userSpecificClientConfig = baseClientConfig.cloneClientConfig();
this.userSpecificClientConfig.setInstanceName("MQ_ADMIN_INSTANCE_" + userAk + "_" + UUID.randomUUID());
if (StringUtils.isNotEmpty(userAk) && StringUtils.isNotEmpty(userSk)) {
this.rpcHook = new AclClientRPCHook(new SessionCredentials(userAk, userSk));
} else {
this.rpcHook = null;
}
this.userAk = userAk;
log.info("UserSpecificMQAdminPooledObjectFactory initialized for user: {}", userAk);
log.debug("Factory ClientConfig for user {}: {}", userAk, userSpecificClientConfig);
}
@Override
public PooledObject<MQAdminExt> makeObject() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(rpcHook);
mqAdminExt.setAdminExtGroup("MQ_ADMIN_GROUP_FOR_" + userAk + "_" + instanceCreationCounter.getAndIncrement());
mqAdminExt.start();
log.info("Created new MQAdminExt instance ({}) for user {}", mqAdminExt, userAk);
return new DefaultPooledObject<>(mqAdminExt);
}
@Override
public void destroyObject(PooledObject<MQAdminExt> p) {
MQAdminExt mqAdmin = p.getObject();
if (mqAdmin != null) {
try {
mqAdmin.shutdown();
} catch (Exception e) {
log.warn("Failed to shut down MQAdminExt object ({}) for user {}: {}", p.getObject(), userAk, e.getMessage(), e);
}
}
log.info("Destroyed MQAdminExt object ({}) for user {}", p.getObject(), userAk);
}
@Override
public boolean validateObject(PooledObject<MQAdminExt> p) {
MQAdminExt mqAdmin = p.getObject();
if (mqAdmin == null) {
log.warn("MQAdminExt object is null or not started for user {}: {}", userAk, mqAdmin);
return false;
}
try {
ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
boolean isValid = clusterInfo != null && !clusterInfo.getBrokerAddrTable().isEmpty();
if (!isValid) {
log.warn("Validation failed for MQAdminExt object for user {}: ClusterInfo is invalid or empty. ClusterInfo = {}", userAk, clusterInfo);
}
return isValid;
} catch (Exception e) {
log.warn("Validation error for MQAdminExt object for user {}: {}", userAk, e.getMessage(), e);
return false;
}
}
@Override
public void activateObject(PooledObject<MQAdminExt> p) {
log.debug("Activating MQAdminExt object ({}) for user {}", p.getObject(), userAk);
}
@Override
public void passivateObject(PooledObject<MQAdminExt> p) {
log.debug("Passivating MQAdminExt object ({}) for user {}", p.getObject(), userAk);
}
}

View File

@@ -18,42 +18,114 @@ package org.apache.rocketmq.dashboard.aspect.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.admin.UserMQAdminPoolManager;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.client.MQAdminInstance;
import org.apache.rocketmq.dashboard.util.UserInfoContext;
import org.apache.rocketmq.dashboard.util.WebUtil;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Aspect
@Service
@Component
@Slf4j
public class MQAdminAspect {
@Autowired
private UserMQAdminPoolManager userMQAdminPoolManager;
@Autowired
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
public MQAdminAspect() {
@Autowired
private RMQConfigure rmqConfigure;
private static final Set<String> METHODS_TO_CHECK = new HashSet<>();
static {
METHODS_TO_CHECK.add("getUser");
METHODS_TO_CHECK.add("examineBrokerClusterInfo");
METHODS_TO_CHECK.add("examineConsumerConnectionInfo");
METHODS_TO_CHECK.add("examineConsumeStats");
METHODS_TO_CHECK.add("examineProducerConnectionInfo");
}
// Pointcut remains the same, targeting methods in MQAdminExtImpl
@Pointcut("execution(* org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl..*(..))")
public void mQAdminMethodPointCut() {
}
@Around(value = "mQAdminMethodPointCut()")
@Pointcut("execution(* org.apache.rocketmq.dashboard.service.client.ProxyAdminImpl..*(..))")
public void proxyAdminMethodPointCut() {
}
@Around(value = "mQAdminMethodPointCut()||proxyAdminMethodPointCut()")
public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
Object obj = null;
MQAdminExt mqAdminExt = null; // The MQAdminExt instance borrowed from the pool
UserInfo currentUserInfo = null; // The user initiating the operation
String methodName = joinPoint.getSignature().getName();
try {
MQAdminInstance.createMQAdmin(mqAdminExtPool);
obj = joinPoint.proceed();
if (isPoolConfigIsolatedByUser(rmqConfigure.isLoginRequired(), methodName)) {
currentUserInfo = (UserInfo) UserInfoContext.get(WebUtil.USER_NAME);
// 2. Borrow the user-specific MQAdminExt instance.
// currentUser.getName() is assumed to be the AccessKey, and currentUser.getPassword() is SecretKey.
mqAdminExt = userMQAdminPoolManager.borrowMQAdminExt(currentUserInfo.getUsername(), currentUserInfo.getPassword());
// 3. Set the borrowed MQAdminExt instance into the ThreadLocal for MQAdminInstance.
// This makes it available to MQAdminExtImpl methods.
MQAdminInstance.setCurrentMQAdminExt(mqAdminExt);
log.debug("MQAdminExt borrowed for user {} and set in ThreadLocal.", currentUserInfo.getUsername());
} else {
mqAdminExt = mqAdminExtPool.borrowObject(); // Fallback to a default MQAdminExt if no user is provided
MQAdminInstance.setCurrentMQAdminExt(mqAdminExt);
}
// 4. Proceed with the original method execution.
return joinPoint.proceed();
} finally {
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
log.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
if (currentUserInfo != null) {
if (mqAdminExt != null) {
userMQAdminPoolManager.returnMQAdminExt(currentUserInfo.getUsername(), mqAdminExt);
MQAdminInstance.clearCurrentMQAdminExt();
log.debug("MQAdminExt returned for user {} and cleared from ThreadLocal.", currentUserInfo.getUsername());
}
return obj;
log.debug("Operation {} for user {} cost {}ms",
methodName,
currentUserInfo.getUsername(),
System.currentTimeMillis() - start);
} else {
if (mqAdminExt != null) {
mqAdminExtPool.returnObject(mqAdminExt);
MQAdminInstance.clearCurrentMQAdminExt();
log.debug("MQAdminExt returned to default pool and cleared from ThreadLocal.");
}
log.debug("Operation {} cost {}ms",
methodName,
System.currentTimeMillis() - start);
}
}
}
private boolean isPoolConfigIsolatedByUser(boolean loginRequired, String methodName) {
if (!loginRequired) {
return false;
} else {
return !METHODS_TO_CHECK.contains(methodName);
}
}
}

View File

@@ -17,6 +17,8 @@
package org.apache.rocketmq.dashboard.config;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.rocketmq.dashboard.interceptor.AuthInterceptor;
import org.apache.rocketmq.dashboard.model.UserInfo;
import org.apache.rocketmq.dashboard.util.WebUtil;
@@ -29,16 +31,16 @@ import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.method.support.ModelAndViewContainer;
import org.springframework.web.multipart.support.MissingServletRequestPartException;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
@Configuration
public class AuthWebMVCConfigurerAdapter extends WebMvcConfigurerAdapter {
public class AuthWebMVCConfigurerAdapter implements WebMvcConfigurer {
@Autowired
@Qualifier("authInterceptor")
private AuthInterceptor authInterceptor;
@@ -86,10 +88,21 @@ public class AuthWebMVCConfigurerAdapter extends WebMvcConfigurerAdapter {
throw new MissingServletRequestPartException(UserInfo.USER_INFO);
}
});
super.addArgumentResolvers(argumentResolvers); //REVIEW ME
}
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOriginPatterns("http://localhost:3003")
.allowedMethods("GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS")
.maxAge(3600)
.allowCredentials(true)
.allowedHeaders("content-type", "Authorization", "X-Requested-With", "Origin", "Accept")
.exposedHeaders("authorization");
}
@Override
public void addViewControllers(ViewControllerRegistry registry) {
registry.addViewController("*.htm").setViewName("forward:/app.html");

View File

@@ -17,16 +17,17 @@
package org.apache.rocketmq.dashboard.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "threadpool.config")

View File

@@ -16,7 +16,8 @@
*/
package org.apache.rocketmq.dashboard.config;
import java.util.ArrayList;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
@@ -31,68 +32,64 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "rocketmq.config")
public class RMQConfigure {
private Logger logger = LoggerFactory.getLogger(RMQConfigure.class);
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
@Getter
private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
@Setter
@Getter
private volatile String proxyAddr;
@Getter
private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
@Setter
private String dataPath = "/tmp/rocketmq-console/data";
@Getter
private boolean enableDashBoardCollect;
@Setter
@Getter
private boolean loginRequired = false;
private String accessKey;
@Setter
@Getter
private String secretKey;
@Setter
@Getter
private boolean useTLS = false;
@Setter
@Getter
private Long timeoutMillis;
@Getter
private List<String> namesrvAddrs = new ArrayList<>();
@Getter
private List<String> proxyAddrs = new ArrayList<>();
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public List<String> getNamesrvAddrs() {
return namesrvAddrs;
}
public List<String> getProxyAddrs() {
return this.proxyAddrs;
}
@Setter
@Getter
private Integer clientCallbackExecutorThreads = 4;
public void setProxyAddrs(List<String> proxyAddrs) {
this.proxyAddrs = proxyAddrs;
@@ -101,14 +98,6 @@ public class RMQConfigure {
}
}
public String getProxyAddr() {
return proxyAddr;
}
public void setProxyAddr(String proxyAddr) {
this.proxyAddr = proxyAddr;
}
public void setNamesrvAddrs(List<String> namesrvAddrs) {
this.namesrvAddrs = namesrvAddrs;
if (CollectionUtils.isNotEmpty(namesrvAddrs)) {
@@ -135,14 +124,6 @@ public class RMQConfigure {
return dataPath + File.separator + "dashboard";
}
public void setDataPath(String dataPath) {
this.dataPath = dataPath;
}
public String getIsVIPChannel() {
return isVIPChannel;
}
public void setIsVIPChannel(String isVIPChannel) {
if (StringUtils.isNotBlank(isVIPChannel)) {
this.isVIPChannel = isVIPChannel;
@@ -151,38 +132,10 @@ public class RMQConfigure {
}
}
public boolean isEnableDashBoardCollect() {
return enableDashBoardCollect;
}
public void setEnableDashBoardCollect(String enableDashBoardCollect) {
this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect);
}
public boolean isLoginRequired() {
return loginRequired;
}
public void setLoginRequired(boolean loginRequired) {
this.loginRequired = loginRequired;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
public Long getTimeoutMillis() {
return timeoutMillis;
}
public void setTimeoutMillis(Long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
// Error Page process logic, move to a central configure later
@Bean
public ErrorPageRegistrar errorPageRegistrar() {

View File

@@ -14,133 +14,90 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import com.google.common.base.Preconditions;
import java.util.List;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.User;
import org.apache.rocketmq.dashboard.model.UserInfo;
import org.apache.rocketmq.dashboard.model.request.AclRequest;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.AclService;
import org.apache.rocketmq.dashboard.support.JsonResult;
import org.apache.rocketmq.dashboard.util.WebUtil;
import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.request.UserCreateRequest;
import org.apache.rocketmq.dashboard.model.request.UserUpdateRequest;
import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/acl")
@Permission
public class AclController {
@Resource
private AclService aclService;
@Autowired
private AclServiceImpl aclService;
@Resource
private RMQConfigure configure;
@GetMapping("/enable.query")
public Object isEnableAcl() {
return new JsonResult<>(configure.isACLEnabled());
@GetMapping("/listUsers")
@ResponseBody
public List<UserInfo> listUsers(@RequestParam(required = false) String brokerAddress) {
return aclService.listUsers(brokerAddress);
}
@GetMapping("/config.query")
public AclConfig getAclConfig(HttpServletRequest request) {
if (!configure.isLoginRequired()) {
return aclService.getAclConfig(false);
}
UserInfo userInfo = (UserInfo) WebUtil.getValueFromSession(request, WebUtil.USER_INFO);
// if user info is null but reach here, must exclude secret key for safety.
return aclService.getAclConfig(userInfo == null || userInfo.getUser().getType() != User.ADMIN);
@GetMapping("/listAcls")
@ResponseBody
public Object listAcls(
@RequestParam(required = false) String brokerAddress,
@RequestParam(required = false) String searchParam) {
return aclService.listAcls(brokerAddress, searchParam);
}
@PostMapping("/add.do")
public Object addAclConfig(@RequestBody PlainAccessConfig config) {
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getAccessKey()), "accessKey is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getSecretKey()), "secretKey is null");
aclService.addAclConfig(config);
@PostMapping("/createAcl")
@ResponseBody
public Object createAcl(@RequestBody PolicyRequest request) {
aclService.createAcl(request);
return true;
}
@PostMapping("/delete.do")
public Object deleteAclConfig(@RequestBody PlainAccessConfig config) {
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getAccessKey()), "accessKey is null");
aclService.deleteAclConfig(config);
@DeleteMapping("/deleteUser")
@ResponseBody
public Object deleteUser(@RequestParam(required = false) String brokerAddress, @RequestParam String username) {
aclService.deleteUser(brokerAddress, username);
return true;
}
@PostMapping("/update.do")
public Object updateAclConfig(@RequestBody PlainAccessConfig config) {
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getSecretKey()), "secretKey is null");
aclService.updateAclConfig(config);
@RequestMapping(value = "/updateUser", method = RequestMethod.POST, produces = "application/json;charset=UTF-8")
@ResponseBody
public Object updateUser(@RequestBody UserUpdateRequest request) {
aclService.updateUser(request.getBrokerAddress(), request.getUserInfo());
return true;
}
@PostMapping("/topic/add.do")
public Object addAclTopicConfig(@RequestBody AclRequest request) {
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getAccessKey()), "accessKey is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getSecretKey()), "secretKey is null");
Preconditions.checkArgument(CollectionUtils.isNotEmpty(request.getConfig().getTopicPerms()), "topic perms is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getTopicPerm()), "topic perm is null");
aclService.addOrUpdateAclTopicConfig(request);
@PostMapping("/createUser")
public Object createUser(@RequestBody UserCreateRequest request) {
aclService.createUser(request.getBrokerAddress(), request.getUserInfo());
return true;
}
@PostMapping("/group/add.do")
public Object addAclGroupConfig(@RequestBody AclRequest request) {
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getAccessKey()), "accessKey is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getSecretKey()), "secretKey is null");
Preconditions.checkArgument(CollectionUtils.isNotEmpty(request.getConfig().getGroupPerms()), "group perms is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getGroupPerm()), "group perm is null");
aclService.addOrUpdateAclGroupConfig(request);
@DeleteMapping("/deleteAcl")
public Object deleteAcl(
@RequestParam(required = false) String brokerAddress,
@RequestParam String subject,
@RequestParam(required = false) String resource) {
aclService.deleteAcl(brokerAddress, subject, resource);
return true;
}
@PostMapping("/perm/delete.do")
public Object deletePermConfig(@RequestBody AclRequest request) {
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getAccessKey()), "accessKey is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(request.getConfig().getSecretKey()), "secretKey is null");
aclService.deletePermConfig(request);
@RequestMapping(value = "/updateAcl", method = RequestMethod.POST, produces = "application/json;charset=UTF-8")
@ResponseBody
public Object updateAcl(@RequestBody PolicyRequest request) {
aclService.updateAcl(request);
return true;
}
@PostMapping("/sync.do")
public Object syncConfig(@RequestBody PlainAccessConfig config) {
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getAccessKey()), "accessKey is null");
Preconditions.checkArgument(StringUtils.isNotEmpty(config.getSecretKey()), "secretKey is null");
aclService.syncData(config);
return true;
}
@PostMapping("/white/list/add.do")
public Object addWhiteList(@RequestBody List<String> whiteList) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(whiteList), "white list is null");
aclService.addWhiteList(whiteList);
return true;
}
@DeleteMapping("/white/list/delete.do")
public Object deleteWhiteAddr(@RequestParam String request) {
aclService.deleteWhiteAddr(request);
return true;
}
@PostMapping("/white/list/sync.do")
public Object synchronizeWhiteList(@RequestBody List<String> whiteList) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(whiteList), "white list is null");
aclService.synchronizeWhiteList(whiteList);
return true;
}
}

View File

@@ -16,14 +16,13 @@
*/
package org.apache.rocketmq.dashboard.controller;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ClusterService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller

View File

@@ -17,9 +17,8 @@
package org.apache.rocketmq.dashboard.controller;
import com.google.common.base.Preconditions;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.model.ConnectionInfo;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
@@ -27,6 +26,7 @@ import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;

View File

@@ -17,9 +17,8 @@
package org.apache.rocketmq.dashboard.controller;
import javax.annotation.Resource;
import com.google.common.base.Strings;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.DashboardService;
import org.springframework.stereotype.Controller;

View File

@@ -17,10 +17,8 @@
package org.apache.rocketmq.dashboard.controller;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
@@ -41,6 +39,9 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.ArrayList;
import java.util.List;
@Controller
@RequestMapping("/dlqMessage")
@Permission

View File

@@ -17,6 +17,9 @@
package org.apache.rocketmq.dashboard.controller;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.LoginInfo;
import org.apache.rocketmq.dashboard.model.LoginResult;
@@ -32,12 +35,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@Controller
@RequestMapping("/login")
@@ -66,12 +65,11 @@ public class LoginController {
@RequestMapping(value = "/login.do", method = RequestMethod.POST)
@ResponseBody
public Object login(@RequestParam("username") String username,
@RequestParam(value = "password") String password,
public Object login(org.apache.rocketmq.remoting.protocol.body.UserInfo userInfoRequest,
HttpServletRequest request,
HttpServletResponse response) throws Exception {
logger.info("user:{} login", username);
User user = userService.queryByUsernameAndPassword(username, password);
logger.info("user:{} login", userInfoRequest.getUsername());
User user = userService.queryByUsernameAndPassword(userInfoRequest.getUsername(), userInfoRequest.getPassword());
if (user == null) {
throw new IllegalArgumentException("Bad username or password!");
@@ -79,9 +77,9 @@ public class LoginController {
user.setPassword(null);
UserInfo userInfo = WebUtil.setLoginInfo(request, response, user);
WebUtil.setSessionValue(request, WebUtil.USER_INFO, userInfo);
WebUtil.setSessionValue(request, WebUtil.USER_NAME, username);
WebUtil.setSessionValue(request, WebUtil.USER_NAME, userInfoRequest.getUsername());
userInfo.setSessionId(WebUtil.getSessionId(request));
LoginResult result = new LoginResult(username, user.getType(), contextPath);
LoginResult result = new LoginResult(userInfoRequest.getUsername(), user.getType(), contextPath);
return result;
}
}

View File

@@ -17,26 +17,26 @@
package org.apache.rocketmq.dashboard.controller;
import com.google.common.collect.Maps;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.MessageService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

View File

@@ -18,11 +18,7 @@
package org.apache.rocketmq.dashboard.controller;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.trace.MessageTraceGraph;
@@ -36,6 +32,9 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
import java.util.Map;
@Controller
@RequestMapping("/messageTrace")
@Permission

View File

@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.dashboard.controller;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.model.ConsumerMonitorConfig;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.MonitorService;

View File

@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.dashboard.controller;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.aspect.admin.annotation.OriginalControllerReturnValue;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.OpsService;

View File

@@ -17,7 +17,7 @@
package org.apache.rocketmq.dashboard.controller;
import com.google.common.base.Preconditions;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.OpsService;

View File

@@ -16,11 +16,12 @@
*/
package org.apache.rocketmq.dashboard.controller;
import javax.annotation.Resource;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.model.ConnectionInfo;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProducerService;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

View File

@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.dashboard.controller;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.springframework.stereotype.Controller;
@@ -24,8 +25,6 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping("/proxy")
@Permission
@@ -38,6 +37,7 @@ public class ProxyController {
return proxyService.getProxyHomePage();
}
@RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
@ResponseBody
public Object addProxyAddr(@RequestParam String newProxyAddr) {

View File

@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.dashboard.controller;
import jakarta.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -26,11 +27,9 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
import javax.annotation.Resource;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
@@ -38,6 +37,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
@Controller
@RequestMapping("/test")
public class TestController {

View File

@@ -16,16 +16,17 @@
*/
package org.apache.rocketmq.dashboard.controller;
import com.google.common.base.Preconditions;
import jakarta.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.permisssion.Permission;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.apache.rocketmq.dashboard.service.TopicService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import com.google.common.base.Preconditions;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
@@ -33,8 +34,6 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller

View File

@@ -17,15 +17,17 @@
package org.apache.rocketmq.dashboard.filter;
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.FilterConfig;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.annotation.WebFilter;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletResponse;
@WebFilter(urlPatterns = "/*", filterName = "httpBasicAuthorizedFilter")

View File

@@ -17,22 +17,24 @@
package org.apache.rocketmq.dashboard.interceptor;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.rocketmq.dashboard.service.LoginService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@Component
public class AuthInterceptor extends HandlerInterceptorAdapter {
public class AuthInterceptor implements HandlerInterceptor {
@Autowired
private LoginService loginService;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
return loginService.login(request, response);
}
}

View File

@@ -17,11 +17,12 @@
package org.apache.rocketmq.dashboard.model;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashSet;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import java.util.Collection;
import java.util.HashSet;
public class ConnectionInfo extends Connection {
private String versionDesc;

View File

@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import com.google.common.collect.Lists;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import java.util.List;

View File

@@ -22,12 +22,13 @@ import com.alibaba.excel.annotation.write.style.ColumnWidth;
import com.alibaba.excel.metadata.BaseRowModel;
import com.alibaba.excel.util.DateUtils;
import com.google.common.base.Charsets;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.Serializable;
import java.util.Date;
@Data
@NoArgsConstructor
public class DlqMessageExcelModel extends BaseRowModel implements Serializable {

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Setter
@Getter
public class Entry {
private List<String> resource;
private List<String> actions;
private List<String> sourceIps;
private String decision;
}

View File

@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.common.message.MessageExt;
import com.google.common.base.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.BeanUtils;
import java.net.SocketAddress;

View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Setter
@Getter
public class Policy {
private String policyType;
private List<Entry> entries;
}

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
public class PolicyRequest {
private String brokerAddress;
private String subject;
private List<Policy> policies;
}

View File

@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.springframework.beans.BeanUtils;
public class QueueStatInfo {

View File

@@ -17,7 +17,8 @@
package org.apache.rocketmq.dashboard.model.request;
import lombok.Data;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.auth.migration.v1.PlainAccessConfig;
@Data
public class AclRequest {

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model.request;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
public class UserCreateRequest {
private String brokerAddress;
private UserInfoParam userInfo;
}

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model.request;
import lombok.Data;
@Data
public class UserInfoParam {
private String username;
private String password;
private String userStatus;
private String userType;
}

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model.request;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class UserUpdateRequest {
private String brokerAddress;
private UserInfoParam userInfo;
}

View File

@@ -16,10 +16,9 @@
*/
package org.apache.rocketmq.dashboard.permisssion;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.UserInfo;
import org.apache.rocketmq.dashboard.service.PermissionService;
import org.apache.rocketmq.dashboard.util.WebUtil;
@@ -56,13 +55,13 @@ public class PermissionAspect {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String url = request.getRequestURI();
UserInfo userInfo = (UserInfo) request.getSession().getAttribute(WebUtil.USER_INFO);
if (userInfo == null || userInfo.getUser() == null) {
throw new ServiceException(-1, "user not login");
}
boolean checkResult = permissionService.checkUrlAvailable(userInfo, url);
if (!checkResult) {
throw new ServiceException(-1, "no permission");
}
// if (userInfo == null || userInfo.getUser() == null) {
// throw new ServiceException(-1, "user not login");
// }
// boolean checkResult = permissionService.checkUrlAvailable(userInfo, url);
// if (!checkResult) {
// throw new ServiceException(-1, "no permission");
// }
}
return joinPoint.proceed();
}

View File

@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import jakarta.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
public abstract class AbstractCommonService {
@Resource

View File

@@ -14,34 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.request.UserInfoParam;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import java.util.List;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.dashboard.model.request.AclRequest;
public interface AclService {
List<UserInfo> listUsers(String brokerAddress);
AclConfig getAclConfig(boolean excludeSecretKey);
Object listAcls(String brokerAddress, String searchParam);
void addAclConfig(PlainAccessConfig config);
List<String> createAcl(PolicyRequest policyRequest);
void deleteAclConfig(PlainAccessConfig config);
void deleteUser(String brokerAddress, String username);
void updateAclConfig(PlainAccessConfig config);
void updateUser(String brokerAddress, UserInfoParam userParam);
void addOrUpdateAclTopicConfig(AclRequest request);
void createUser(String brokerAddress, UserInfoParam userParam);
void addOrUpdateAclGroupConfig(AclRequest request);
void deleteAcl(String brokerAddress, String subject, String resource);
void deletePermConfig(AclRequest request);
void syncData(PlainAccessConfig config);
void addWhiteList(List<String> whiteList);
void deleteWhiteAddr(String addr);
void synchronizeWhiteList(List<String> whiteList);
void updateAcl(PolicyRequest policyRequest);
}

View File

@@ -16,14 +16,14 @@
*/
package org.apache.rocketmq.dashboard.service;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Service
public class ClusterInfoService {
@Resource
@Autowired
private MQAdminExt mqAdminExt;
@Value("${rocketmq.cluster.cache.expire:60000}")

View File

@@ -17,14 +17,14 @@
package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
import java.util.List;
import java.util.Map;

View File

@@ -17,6 +17,7 @@
package org.apache.rocketmq.dashboard.service;
import com.google.common.cache.LoadingCache;
import java.io.File;
import java.util.List;
import java.util.Map;

View File

@@ -17,12 +17,13 @@
package org.apache.rocketmq.dashboard.service;
import java.util.List;
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import java.util.List;
public interface DlqMessageService {
MessagePage queryDlqMessageByPage(MessageQuery query);

View File

@@ -17,8 +17,9 @@
package org.apache.rocketmq.dashboard.service;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
public interface LoginService {
boolean login(HttpServletRequest request, HttpServletResponse response);

View File

@@ -19,11 +19,11 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import java.util.List;

View File

@@ -17,10 +17,11 @@
package org.apache.rocketmq.dashboard.service;
import java.util.List;
import org.apache.rocketmq.dashboard.model.MessageTraceView;
import org.apache.rocketmq.dashboard.model.trace.MessageTraceGraph;
import java.util.List;
public interface MessageTraceService {
List<MessageTraceView> queryMessageTraceKey(final String key);

View File

@@ -16,9 +16,10 @@
*/
package org.apache.rocketmq.dashboard.service;
import java.util.Map;
import org.apache.rocketmq.dashboard.model.ConsumerMonitorConfig;
import java.util.Map;
public interface MonitorService {
boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config);

View File

@@ -16,9 +16,10 @@
*/
package org.apache.rocketmq.dashboard.service;
import java.util.Map;
import org.apache.rocketmq.dashboard.service.checker.CheckerType;
import java.util.Map;
public interface OpsService {
Map<String, Object> homePageInfo();

View File

@@ -19,13 +19,13 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import java.util.List;

View File

@@ -22,4 +22,5 @@ public interface UserService {
User queryByName(String name);
User queryByUsernameAndPassword(String username, String password);
}

View File

@@ -17,34 +17,34 @@
package org.apache.rocketmq.dashboard.service.client;
import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
@@ -61,6 +61,8 @@ import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -68,14 +70,6 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
@@ -85,6 +79,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service
@@ -114,34 +116,10 @@ public class MQAdminExtImpl implements MQAdminExt {
}
@Override
public void createAndUpdatePlainAccessConfig(String addr,
PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdatePlainAccessConfig(addr, plainAccessConfig);
public void createAndUpdateTopicConfigList(String s, List<TopicConfig> list) throws InterruptedException, RemotingException, MQClientException {
}
@Override
public void deletePlainAccessConfig(String addr,
String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().deletePlainAccessConfig(addr, accessKey);
}
@Override
public void updateGlobalWhiteAddrConfig(String addr,
String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs);
}
@Override
public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return null;
}
@Override
public AclConfig examineBrokerClusterAclConfig(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterAclConfig(addr);
}
@Override
public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
@@ -149,6 +127,11 @@ public class MQAdminExtImpl implements MQAdminExt {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
}
@Override
public void createAndUpdateSubscriptionGroupConfigList(String s, List<SubscriptionGroupConfig> list) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
}
@Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws MQBrokerException {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
@@ -228,12 +211,22 @@ public class MQAdminExtImpl implements MQAdminExt {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
}
@Override
public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String s, String s1, long l) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
return null;
}
@Override
public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic);
}
@Override
public ConsumeStats examineConsumeStats(String s, String s1, String s2) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return null;
}
@Override
public ClusterInfo examineBrokerClusterInfo()
throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
@@ -344,6 +337,11 @@ public class MQAdminExtImpl implements MQAdminExt {
return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce);
}
@Override
public Map<MessageQueue, Long> resetOffsetByTimestamp(String s, String s1, String s2, long l, boolean b) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return Map.of();
}
@Override
public void resetOffsetNew(String consumerGroup, String topic, long timestamp)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -389,11 +387,6 @@ public class MQAdminExtImpl implements MQAdminExt {
return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack);
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId);
}
@Override
public List<MessageTrack> messageTrackDetail(MessageExt msg)
@@ -438,12 +431,6 @@ public class MQAdminExtImpl implements MQAdminExt {
return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
}
@Override
public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
@@ -478,10 +465,6 @@ public class MQAdminExtImpl implements MQAdminExt {
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
try {
return viewMessage(msgId);
} catch (Exception e) {
}
MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
if (clusterList == null || clusterList.isEmpty()) {
@@ -508,6 +491,11 @@ public class MQAdminExtImpl implements MQAdminExt {
return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String s, String s1, String s2, String s3, String s4) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return null;
}
@Override
public Properties getBrokerConfig(
String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
@@ -603,11 +591,12 @@ public class MQAdminExtImpl implements MQAdminExt {
return null;
}
@Override public boolean resumeCheckHalfMessage(
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return false;
@Override
public void exportRocksDBConfigToJson(String s, List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
}
@Override public boolean resumeCheckHalfMessage(String topic,
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return false;
@@ -628,12 +617,6 @@ public class MQAdminExtImpl implements MQAdminExt {
throw new UnsupportedOperationException("Unimplemented method 'removeBrokerFromContainer'");
}
@Override
public void updateGlobalWhiteAddrConfig(String addr, String globalWhiteAddrs, String aclFileFullPath)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'updateGlobalWhiteAddrConfig'");
}
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic)
@@ -851,10 +834,8 @@ public class MQAdminExtImpl implements MQAdminExt {
}
@Override
public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'electMaster'");
public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String s, String s1, String s2, Long aLong) throws RemotingException, InterruptedException, MQBrokerException {
return null;
}
@Override
@@ -864,4 +845,125 @@ public class MQAdminExtImpl implements MQAdminExt {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'cleanControllerBrokerData'");
}
@Override
public void updateColdDataFlowCtrGroupConfig(String brokerAddr, Properties properties)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException {
MQAdminInstance.threadLocalMQAdminExt().updateColdDataFlowCtrGroupConfig(brokerAddr, properties);
}
@Override
public void removeColdDataFlowCtrGroupConfig(String brokerAddr, String consumerGroup)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException {
MQAdminInstance.threadLocalMQAdminExt().removeColdDataFlowCtrGroupConfig(brokerAddr, consumerGroup);
}
@Override
public String getColdDataFlowCtrInfo(String brokerAddr)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().getColdDataFlowCtrInfo(brokerAddr);
}
@Override
public String setCommitLogReadAheadMode(String brokerAddr, String mode)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException, UnsupportedEncodingException {
return MQAdminInstance.threadLocalMQAdminExt().setCommitLogReadAheadMode(brokerAddr, mode);
}
@Override
public void createUser(String brokerAddr,
UserInfo userInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().createUser(brokerAddr, userInfo);
}
@Override
public void createUser(String brokerAddr, String username, String password,
String userType) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().createUser(brokerAddr, username, password, userType);
}
@Override
public void updateUser(String brokerAddr, String username,
String password, String userType,
String userStatus) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().updateUser(brokerAddr, username, password, userType, userStatus);
}
@Override
public void updateUser(String brokerAddr,
UserInfo userInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().updateUser(brokerAddr, userInfo);
}
@Override
public void deleteUser(String brokerAddr,
String username) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().deleteUser(brokerAddr, username);
}
@Override
public UserInfo getUser(String brokerAddr,
String username) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().getUser(brokerAddr, username);
}
@Override
public List<UserInfo> listUser(String brokerAddr,
String filter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().listUser(brokerAddr, filter);
}
@Override
public void createAcl(String brokerAddr, String subject, List<String> resources, List<String> actions,
List<String> sourceIps,
String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().createAcl(brokerAddr, subject, resources, actions, sourceIps, decision);
}
@Override
public void createAcl(String brokerAddr,
AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().createAcl(brokerAddr, aclInfo);
}
@Override
public void updateAcl(String brokerAddr, String subject, List<String> resources, List<String> actions,
List<String> sourceIps,
String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().updateAcl(brokerAddr, subject, resources, actions, sourceIps, decision);
}
@Override
public void updateAcl(String brokerAddr,
AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().updateAcl(brokerAddr, aclInfo);
}
@Override
public void deleteAcl(String brokerAddr, String subject,
String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().deleteAcl(brokerAddr, subject, resource);
}
@Override
public AclInfo getAcl(String brokerAddr,
String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().getAcl(brokerAddr, subject);
}
@Override
public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().listAcl(brokerAddr, subjectFilter, resourceFilter);
}
@Override
public void exportPopRecords(String brokerAddr, long timeout) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
MQAdminInstance.threadLocalMQAdminExt().exportPopRecords(brokerAddr, timeout);
}
}

View File

@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.dashboard.service.client;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -29,47 +28,69 @@ import org.slf4j.LoggerFactory;
public class MQAdminInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(MQAdminInstance.class);
// ThreadLocal to store the MQAdminExt instance for the current thread.
private static final ThreadLocal<MQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<>();
/**
* Retrieves the MQAdminExt instance associated with the current thread.
*
* @return The MQAdminExt instance.
* @throws IllegalStateException if no MQAdminExt instance has been set for the current thread.
*/
public static MQAdminExt threadLocalMQAdminExt() {
MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (mqAdminExt == null) {
throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
// This indicates a programming error: MQAdminExt was not set by the aspect.
throw new IllegalStateException("MQAdminExt instance should be set by MQAdminAspect before it's accessed.");
}
return mqAdminExt;
}
public static RemotingClient threadLocalRemotingClient() {
/**
* Retrieves the RemotingClient from the MQAdminExt instance in the current thread.
* This method relies on reflection and the internal structure of RocketMQ classes.
*
* @return The RemotingClient instance.
*/
public static RemotingClient threadLocalRemotingClient() { // Assuming RemotingClient is a type you have
MQClientInstance mqClientInstance = threadLocalMqClientInstance();
// Use jOOQ-Reflect to access private field "mQClientAPIImpl" from mqClientInstance
MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl");
// Use jOOQ-Reflect to access private field "remotingClient" from mQClientAPIImpl
return Reflect.on(mQClientAPIImpl).get("remotingClient");
}
/**
* Retrieves the MQClientInstance from the MQAdminExt instance in the current thread.
* This method relies on reflection and the internal structure of RocketMQ classes.
*
* @return The MQClientInstance instance.
*/
public static MQClientInstance threadLocalMqClientInstance() {
// Use jOOQ-Reflect to access private field "defaultMQAdminExtImpl" from threadLocalMQAdminExt()
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
// Use jOOQ-Reflect to access private field "mqClientInstance" from defaultMQAdminExtImpl
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
public static void createMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
try {
// Get the mqAdmin instance from the object pool
MQAdminExt mqAdminExt = mqAdminExtPool.borrowObject();
/**
* Sets the MQAdminExt instance for the current thread.
* This method should be called by the aspect after borrowing an instance from the pool.
*
* @param mqAdminExt The MQAdminExt instance to set.
*/
public static void setCurrentMQAdminExt(MQAdminExt mqAdminExt) {
MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt);
} catch (Exception e) {
LOGGER.error("get mqAdmin from pool error", e);
}
LOGGER.debug("Set MQAdminExt instance for current thread: {}", mqAdminExt);
}
public static void returnMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (mqAdminExt != null) {
try {
// After execution, return the mqAdmin instance to the object pool
mqAdminExtPool.returnObject(mqAdminExt);
} catch (Exception e) {
LOGGER.error("return mqAdmin to pool error", e);
}
}
/**
* Clears the MQAdminExt instance from the current thread.
* This method should be called by the aspect after returning the instance to the pool.
*/
public static void clearCurrentMQAdminExt() {
MQ_ADMIN_EXT_THREAD_LOCAL.remove();
LOGGER.debug("Cleared MQAdminExt instance from current thread.");
}
}

View File

@@ -17,7 +17,6 @@
package org.apache.rocketmq.dashboard.service.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -26,8 +25,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
@@ -35,13 +32,10 @@ import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CON
@Slf4j
@Service
public class ProxyAdminImpl implements ProxyAdmin {
@Autowired
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
@Override
public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
try {
MQAdminInstance.createMQAdmin(mqAdminExtPool);
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
@@ -53,8 +47,9 @@ public class ProxyAdminImpl implements ProxyAdmin {
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
} finally {
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
} catch (Exception e) {
log.error("examineConsumerConnectionInfo error", e);
throw e;
}
}
}

View File

@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.dashboard.service.impl;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
public abstract class AbstractFileStore {
public final Logger log = LoggerFactory.getLogger(this.getClass());

View File

@@ -14,357 +14,266 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.dashboard.model.request.AclRequest;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.Entry;
import org.apache.rocketmq.dashboard.model.Policy;
import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.request.UserInfoParam;
import org.apache.rocketmq.dashboard.service.AclService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Service
@Slf4j
public class AclServiceImpl extends AbstractCommonService implements AclService {
public class AclServiceImpl implements AclService {
private Logger logger = LoggerFactory.getLogger(AclServiceImpl.class);
@Autowired
private MQAdminExt mqAdminExt;
@Autowired
private RMQConfigure rmqConfigure;
@Autowired
private ClusterInfoService clusterInfoService;
private static final String DEFAULT_BROKER_ADDRESS = "localhost:10911";
@Override
public AclConfig getAclConfig(boolean excludeSecretKey) {
public List<UserInfo> listUsers(String brokerAddress) {
List<UserInfo> userList;
try {
Optional<String> addr = getMasterSet().stream().findFirst();
if (addr.isPresent()) {
if (!excludeSecretKey) {
return mqAdminExt.examineBrokerClusterAclConfig(addr.get());
} else {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr.get());
if (CollectionUtils.isNotEmpty(aclConfig.getPlainAccessConfigs())) {
aclConfig.getPlainAccessConfigs().forEach(pac -> pac.setSecretKey(null));
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
userList = mqAdminExt.listUser(address, "");
} catch (Exception ex) {
logger.error("Failed to list users from broker: {}", brokerAddress, ex);
throw new RuntimeException("Failed to list users", ex);
}
return aclConfig;
if (userList == null || userList.isEmpty()) {
logger.warn("No users found for broker: {}", brokerAddress);
return new ArrayList<>();
}
}
} catch (Exception e) {
log.error("getAclConfig error.", e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(Collections.emptyList());
aclConfig.setPlainAccessConfigs(Collections.emptyList());
return aclConfig;
return userList;
}
@Override
public void addAclConfig(PlainAccessConfig config) {
public Object listAcls(String brokerAddress, String searchParam) {
List<AclInfo> aclList;
try {
Set<String> masterSet = getMasterSet();
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
String user = searchParam != null ? searchParam : "";
String res = searchParam != null ? searchParam : "";
aclList = mqAdminExt.listAcl(address, user, "");
if (aclList == null) {
aclList = new ArrayList<>();
}
List<AclInfo> resAclList = mqAdminExt.listAcl(address, "", res);
if (resAclList != null) {
aclList.addAll(resAclList);
}
} catch (Exception ex) {
logger.error("Failed to list ACLs from broker: {}", brokerAddress, ex);
throw new RuntimeException("Failed to list ACLs", ex);
}
ObjectMapper mapper = new ObjectMapper();
Set<String> uniqueAclStrings = new HashSet<>();
List<AclInfo> resultAclList = new ArrayList<>();
if (masterSet.isEmpty()) {
throw new IllegalStateException("broker addr list is empty");
}
// check to see if account is exists
for (String addr : masterSet) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
List<PlainAccessConfig> plainAccessConfigs = aclConfig.getPlainAccessConfigs();
for (PlainAccessConfig pac : plainAccessConfigs) {
if (pac.getAccessKey().equals(config.getAccessKey())) {
throw new IllegalArgumentException(String.format("broker: %s, exist accessKey: %s", addr, config.getAccessKey()));
}
}
}
// all broker
for (String addr : getBrokerAddrs()) {
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
for (AclInfo acl : aclList) {
try {
String aclString = mapper.writeValueAsString(acl);
if (uniqueAclStrings.add(aclString)) {
resultAclList.add(acl);
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
logger.error("Error serializing AclInfo", e);
}
}
return resultAclList;
}
@Override
public void deleteAclConfig(PlainAccessConfig config) {
public List<String> createAcl(PolicyRequest policyRequest) {
List<String> successfulResources = new ArrayList<>();
if (policyRequest == null || policyRequest.getPolicies() == null || policyRequest.getPolicies().isEmpty()) {
logger.warn("Policy request is null or policies list is empty. No ACLs to create.");
return successfulResources;
}
String brokerAddress = policyRequest.getBrokerAddress() != null && !policyRequest.getBrokerAddress().isEmpty() ?
policyRequest.getBrokerAddress() : DEFAULT_BROKER_ADDRESS;
String subject = policyRequest.getSubject();
if (subject == null || subject.isEmpty()) {
throw new IllegalArgumentException("Subject cannot be null or empty.");
}
for (Policy policy : policyRequest.getPolicies()) {
if (policy.getEntries() != null && !policy.getEntries().isEmpty()) {
for (Entry entry : policy.getEntries()) {
if (entry.getResource() != null && !entry.getResource().isEmpty()) {
for (String resource : entry.getResource()) {
AclInfo aclInfo = new AclInfo();
List<AclInfo.PolicyInfo> aclPolicies = new ArrayList<>();
AclInfo.PolicyInfo policyInfo = new AclInfo.PolicyInfo();
List<AclInfo.PolicyEntryInfo> entries = new ArrayList<>();
AclInfo.PolicyEntryInfo entryInfo = new AclInfo.PolicyEntryInfo();
entryInfo.setActions(entry.getActions());
entryInfo.setDecision(entry.getDecision());
entryInfo.setResource(resource);
entryInfo.setSourceIps(entry.getSourceIps());
entries.add(entryInfo);
policyInfo.setEntries(entries);
policyInfo.setPolicyType(policy.getPolicyType());
aclPolicies.add(policyInfo);
aclInfo.setPolicies(aclPolicies);
aclInfo.setSubject(subject);
try {
for (String addr : getBrokerAddrs()) {
log.info("Start to delete acl [{}] from broker [{}]", config.getAccessKey(), addr);
if (isExistAccessKey(config.getAccessKey(), addr)) {
mqAdminExt.deletePlainAccessConfig(addr, config.getAccessKey());
logger.info("Attempting to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress);
mqAdminExt.createAcl(brokerAddress, aclInfo);
successfulResources.add(resource);
logger.info("Successfully created ACL for subject: {}, resource: {}", subject, resource);
} catch (Exception ex) {
logger.error("Failed to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress, ex);
throw new RuntimeException("Failed to create ACL", ex);
}
log.info("Delete acl [{}] from broker [{}] complete", config.getAccessKey(), addr);
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
}
}
return successfulResources;
}
@Override
public void deleteUser(String brokerAddress, String username) {
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.deleteUser(address, username);
} catch (Exception ex) {
logger.error("Failed to delete user: {} from broker: {}", username, brokerAddress, ex);
throw new RuntimeException("Failed to delete user", ex);
}
}
@Override
public void updateAclConfig(PlainAccessConfig config) {
public void updateUser(String brokerAddress, UserInfoParam userParam) {
UserInfo user = new UserInfo();
user.setUsername(userParam.getUsername());
user.setPassword(userParam.getPassword());
user.setUserStatus(userParam.getUserStatus());
user.setUserType(userParam.getUserType());
try {
for (String addr : getBrokerAddrs()) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
if (aclConfig.getPlainAccessConfigs() != null) {
PlainAccessConfig remoteConfig = null;
for (PlainAccessConfig pac : aclConfig.getPlainAccessConfigs()) {
if (pac.getAccessKey().equals(config.getAccessKey())) {
remoteConfig = pac;
break;
}
}
if (remoteConfig != null) {
remoteConfig.setSecretKey(config.getSecretKey());
remoteConfig.setAdmin(config.isAdmin());
config = remoteConfig;
}
}
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.updateUser(address, user);
} catch (Exception ex) {
logger.error("Failed to update user: {} on broker: {}", userParam.getUsername(), brokerAddress, ex);
throw new RuntimeException("Failed to update user", ex);
}
}
@Override
public void addOrUpdateAclTopicConfig(AclRequest request) {
public void createUser(String brokerAddress, UserInfoParam userParam) {
UserInfo user = new UserInfo();
user.setUsername(userParam.getUsername());
user.setPassword(userParam.getPassword());
user.setUserStatus(userParam.getUserStatus());
user.setUserType(userParam.getUserType());
try {
PlainAccessConfig addConfig = request.getConfig();
for (String addr : getBrokerAddrs()) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
PlainAccessConfig remoteConfig = null;
if (aclConfig.getPlainAccessConfigs() != null) {
for (PlainAccessConfig config : aclConfig.getPlainAccessConfigs()) {
if (config.getAccessKey().equals(addConfig.getAccessKey())) {
remoteConfig = config;
break;
}
}
}
if (remoteConfig == null) {
// Maybe the broker no acl config of the access key, therefore add it;
mqAdminExt.createAndUpdatePlainAccessConfig(addr, addConfig);
} else {
if (remoteConfig.getTopicPerms() == null) {
remoteConfig.setTopicPerms(new ArrayList<>());
}
removeExist(remoteConfig.getTopicPerms(), request.getTopicPerm().split("=")[0]);
remoteConfig.getTopicPerms().add(request.getTopicPerm());
mqAdminExt.createAndUpdatePlainAccessConfig(addr, remoteConfig);
}
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.createUser(address, user);
} catch (Exception ex) {
logger.error("Failed to create user: {} on broker: {}", userParam.getUsername(), brokerAddress, ex);
throw new RuntimeException("Failed to create user", ex);
}
}
@Override
public void addOrUpdateAclGroupConfig(AclRequest request) {
public void deleteAcl(String brokerAddress, String subject, String resource) {
try {
PlainAccessConfig addConfig = request.getConfig();
for (String addr : getBrokerAddrs()) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
PlainAccessConfig remoteConfig = null;
if (aclConfig.getPlainAccessConfigs() != null) {
for (PlainAccessConfig config : aclConfig.getPlainAccessConfigs()) {
if (config.getAccessKey().equals(addConfig.getAccessKey())) {
remoteConfig = config;
break;
}
}
}
if (remoteConfig == null) {
// May be the broker no acl config of the access key, therefore add it;
mqAdminExt.createAndUpdatePlainAccessConfig(addr, addConfig);
} else {
if (remoteConfig.getGroupPerms() == null) {
remoteConfig.setGroupPerms(new ArrayList<>());
}
removeExist(remoteConfig.getGroupPerms(), request.getGroupPerm().split("=")[0]);
remoteConfig.getGroupPerms().add(request.getGroupPerm());
mqAdminExt.createAndUpdatePlainAccessConfig(addr, remoteConfig);
}
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
String res = resource != null ? resource : "";
mqAdminExt.deleteAcl(address, subject, res);
} catch (Exception ex) {
logger.error("Failed to delete ACL for subject: {} and resource: {} on broker: {}", subject, resource, brokerAddress, ex);
throw new RuntimeException("Failed to delete ACL", ex);
}
}
@Override
public void deletePermConfig(AclRequest request) {
public void updateAcl(PolicyRequest policyRequest) {
if (policyRequest == null || policyRequest.getPolicies() == null || policyRequest.getPolicies().isEmpty()) {
logger.warn("Policy request is null or policies list is empty. No ACLs to update.");
}
String brokerAddress = policyRequest.getBrokerAddress() != null && !policyRequest.getBrokerAddress().isEmpty() ?
policyRequest.getBrokerAddress() : DEFAULT_BROKER_ADDRESS;
String subject = policyRequest.getSubject();
if (subject == null || subject.isEmpty()) {
throw new IllegalArgumentException("Subject cannot be null or empty.");
}
for (Policy policy : policyRequest.getPolicies()) {
if (policy.getEntries() != null && !policy.getEntries().isEmpty()) {
for (Entry entry : policy.getEntries()) {
if (entry.getResource() != null && !entry.getResource().isEmpty()) {
for (String resource : entry.getResource()) {
AclInfo aclInfo = new AclInfo();
List<AclInfo.PolicyInfo> aclPolicies = new ArrayList<>();
AclInfo.PolicyInfo policyInfo = new AclInfo.PolicyInfo();
List<AclInfo.PolicyEntryInfo> entries = new ArrayList<>();
AclInfo.PolicyEntryInfo entryInfo = new AclInfo.PolicyEntryInfo();
entryInfo.setActions(entry.getActions());
entryInfo.setDecision(entry.getDecision());
entryInfo.setResource(resource);
entryInfo.setSourceIps(entry.getSourceIps());
entries.add(entryInfo);
policyInfo.setEntries(entries);
policyInfo.setPolicyType(policy.getPolicyType());
aclPolicies.add(policyInfo);
aclInfo.setPolicies(aclPolicies);
aclInfo.setSubject(subject);
try {
PlainAccessConfig deleteConfig = request.getConfig();
String topic = StringUtils.isNotEmpty(request.getTopicPerm()) ? request.getTopicPerm().split("=")[0] : null;
String group = StringUtils.isNotEmpty(request.getGroupPerm()) ? request.getGroupPerm().split("=")[0] : null;
if (deleteConfig.getTopicPerms() != null && topic != null) {
removeExist(deleteConfig.getTopicPerms(), topic);
}
if (deleteConfig.getGroupPerms() != null && group != null) {
removeExist(deleteConfig.getGroupPerms(), group);
}
for (String addr : getBrokerAddrs()) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
PlainAccessConfig remoteConfig = null;
if (aclConfig.getPlainAccessConfigs() != null) {
for (PlainAccessConfig config : aclConfig.getPlainAccessConfigs()) {
if (config.getAccessKey().equals(deleteConfig.getAccessKey())) {
remoteConfig = config;
break;
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.updateAcl(address, aclInfo);
} catch (Exception ex) {
logger.error("Failed to update ACL for subject: {} on broker: {}", subject, brokerAddress, ex);
throw new RuntimeException("Failed to update ACL", ex);
}
}
}
if (remoteConfig == null) {
// Maybe the broker no acl config of the access key, therefore add it;
mqAdminExt.createAndUpdatePlainAccessConfig(addr, deleteConfig);
} else {
if (remoteConfig.getTopicPerms() != null && topic != null) {
removeExist(remoteConfig.getTopicPerms(), topic);
}
if (remoteConfig.getGroupPerms() != null && group != null) {
removeExist(remoteConfig.getGroupPerms(), group);
}
mqAdminExt.createAndUpdatePlainAccessConfig(addr, remoteConfig);
}
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public void syncData(PlainAccessConfig config) {
try {
for (String addr : getBrokerAddrs()) {
mqAdminExt.createAndUpdatePlainAccessConfig(addr, config);
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public void addWhiteList(List<String> whiteList) {
if (whiteList == null) {
return;
}
try {
for (String addr : getBrokerAddrs()) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
if (aclConfig.getGlobalWhiteAddrs() != null) {
aclConfig.setGlobalWhiteAddrs(Stream.of(whiteList, aclConfig.getGlobalWhiteAddrs()).flatMap(Collection::stream).distinct().collect(Collectors.toList()));
} else {
aclConfig.setGlobalWhiteAddrs(whiteList);
}
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public void deleteWhiteAddr(String deleteAddr) {
try {
for (String addr : getBrokerAddrs()) {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
if (aclConfig.getGlobalWhiteAddrs() == null || aclConfig.getGlobalWhiteAddrs().isEmpty()) {
continue;
}
aclConfig.getGlobalWhiteAddrs().remove(deleteAddr);
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(aclConfig.getGlobalWhiteAddrs(), ","));
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public void synchronizeWhiteList(List<String> whiteList) {
if (whiteList == null) {
return;
}
try {
for (String addr : getBrokerAddrs()) {
mqAdminExt.updateGlobalWhiteAddrConfig(addr, StringUtils.join(whiteList, ","));
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
private void removeExist(List<String> list, String name) {
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
String v = iterator.next();
String cmp = v.split("=")[0];
if (cmp.equals(name)) {
iterator.remove();
}
}
}
private boolean isExistAccessKey(String accessKey,
String addr) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
AclConfig aclConfig = mqAdminExt.examineBrokerClusterAclConfig(addr);
List<PlainAccessConfig> plainAccessConfigs = aclConfig.getPlainAccessConfigs();
if (plainAccessConfigs == null || plainAccessConfigs.isEmpty()) {
return false;
}
for (PlainAccessConfig config : plainAccessConfigs) {
if (accessKey.equals(config.getAccessKey())) {
return true;
}
}
return false;
}
private Set<BrokerData> getBrokerDataSet() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Map<String, BrokerData> brokerDataMap = clusterInfo.getBrokerAddrTable();
return new HashSet<>(brokerDataMap.values());
}
private Set<String> getMasterSet() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return getBrokerDataSet().stream().map(data -> data.getBrokerAddrs().get(MixAll.MASTER_ID)).collect(Collectors.toSet());
}
private Set<String> getBrokerAddrs() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
Set<String> brokerAddrs = new HashSet<>();
getBrokerDataSet().forEach(data -> brokerAddrs.addAll(data.getBrokerAddrs().values()));
return brokerAddrs;
}
}

View File

@@ -17,20 +17,20 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.dashboard.service.ClusterService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.dashboard.service.ClusterService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -61,10 +61,9 @@ public class ClusterServiceImpl implements ClusterService {
resultMap.put("brokerServer", brokerServer);
// add messageType
resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted()
.collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue()))));
.collect(Collectors.toMap(TopicMessageType::getValue, messageType -> String.format("MESSAGE_TYPE_%s", messageType.getValue()))));
return resultMap;
}
catch (Exception err) {
} catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
@@ -73,12 +72,16 @@ public class ClusterServiceImpl implements ClusterService {
@Override
public Properties getBrokerConfig(String brokerAddr) {
Properties properties = null;
try {
return mqAdminExt.getBrokerConfig(brokerAddr);
properties = mqAdminExt.getBrokerConfig(brokerAddr);
if (properties == null) {
throw new RuntimeException("get broker config failed, brokerAddr:" + brokerAddr);
}
catch (Exception e) {
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return properties;
}
}

View File

@@ -23,7 +23,43 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import jakarta.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
@@ -44,44 +80,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
@@ -180,7 +178,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
try {
ClusterInfo clusterInfo = clusterInfoService.get();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 30000L);
for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
if (!consumerGroupMap.containsKey(groupName)) {
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());

View File

@@ -29,18 +29,19 @@ import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Service
public class DashboardCollectServiceImpl implements DashboardCollectService {

View File

@@ -18,15 +18,16 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.collect.Lists;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.dashboard.service.DashboardService;
import org.springframework.stereotype.Service;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.dashboard.service.DashboardService;
import org.springframework.stereotype.Service;
@Service
public class DashboardServiceImpl implements DashboardService {

View File

@@ -18,27 +18,28 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
import org.apache.rocketmq.dashboard.model.DlqMessageRequest;
import org.apache.rocketmq.dashboard.model.DlqMessageResendResult;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.service.DlqMessageService;
import org.apache.rocketmq.dashboard.service.MessageService;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@Service
@Slf4j
public class DlqMessageServiceImpl implements DlqMessageService {

View File

@@ -17,18 +17,21 @@
package org.apache.rocketmq.dashboard.service.impl;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.LoginService;
import org.apache.rocketmq.dashboard.service.UserService;
import org.apache.rocketmq.dashboard.service.provider.UserInfoProvider;
import org.apache.rocketmq.dashboard.util.UserInfoContext;
import org.apache.rocketmq.dashboard.util.WebUtil;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -43,13 +46,21 @@ public class LoginServiceImpl implements LoginService {
@Autowired
private UserService userService;
@Autowired
private UserInfoProvider userInfoProvider;
@Override
public boolean login(HttpServletRequest request, HttpServletResponse response) {
if (WebUtil.getValueFromSession(request, WebUtil.USER_NAME) != null) {
String username = (String) WebUtil.getValueFromSession(request, WebUtil.USER_NAME);
if (username != null) {
UserInfo userInfo = userInfoProvider.getUserInfoByUsername(username);
if (userInfo == null) {
return false;
}
UserInfoContext.set(WebUtil.USER_NAME, userInfo);
return true;
}
auth(request, response);
return false;
}

View File

@@ -25,6 +25,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -37,20 +38,20 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessagePageTask;
import org.apache.rocketmq.dashboard.model.MessageQueryByPage;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.service.MessageService;
import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
@@ -60,14 +61,13 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

View File

@@ -19,14 +19,8 @@ package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import com.google.common.collect.Maps;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.Pair;
@@ -35,11 +29,11 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.MessageTraceView;
import org.apache.rocketmq.dashboard.model.trace.ProducerNode;
import org.apache.rocketmq.dashboard.model.trace.MessageTraceGraph;
import org.apache.rocketmq.dashboard.model.trace.MessageTraceStatusEnum;
import org.apache.rocketmq.dashboard.model.trace.ProducerNode;
import org.apache.rocketmq.dashboard.model.trace.SubscriptionNode;
import org.apache.rocketmq.dashboard.model.trace.TraceNode;
import org.apache.rocketmq.dashboard.model.trace.MessageTraceStatusEnum;
import org.apache.rocketmq.dashboard.service.MessageTraceService;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
@@ -48,6 +42,11 @@ import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class MessageTraceServiceImpl implements MessageTraceService {

View File

@@ -18,12 +18,8 @@ package org.apache.rocketmq.dashboard.service.impl;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Throwables;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerMonitorConfig;
@@ -31,6 +27,11 @@ import org.apache.rocketmq.dashboard.service.MonitorService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class MonitorServiceImpl implements MonitorService {

View File

@@ -17,9 +17,7 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
@@ -30,6 +28,9 @@ import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Service
public class OpsServiceImpl extends AbstractCommonService implements OpsService {

View File

@@ -17,12 +17,7 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.alibaba.fastjson.JSONObject;
import java.io.FileReader;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.UserInfo;
@@ -32,6 +27,12 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import org.yaml.snakeyaml.Yaml;
import java.io.FileReader;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.rocketmq.dashboard.permisssion.UserRoleEnum.ADMIN;
import static org.apache.rocketmq.dashboard.permisssion.UserRoleEnum.ORDINARY;

View File

@@ -18,9 +18,9 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
import javax.annotation.Resource;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.service.ProducerService;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.stereotype.Service;

View File

@@ -17,13 +17,13 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.collect.Maps;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.ProxyService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

View File

@@ -25,10 +25,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll;
@@ -80,10 +80,6 @@ import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR
@Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
private transient DefaultMQProducer systemTopicProducer;
private final Object producerLock = new Object();
private Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
@Autowired
@@ -92,6 +88,10 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
private final ConcurrentMap<String, TopicRouteData> routeCache = new ConcurrentHashMap<>();
private final Object cacheLock = new Object();
private transient DefaultMQProducer systemTopicProducer;
private final Object producerLock = new Object();
@Autowired
private RMQConfigure configure;

View File

@@ -17,102 +17,74 @@
package org.apache.rocketmq.dashboard.service.impl;
import jakarta.annotation.Resource;
import org.apache.rocketmq.auth.authentication.enums.UserType;
import org.apache.rocketmq.dashboard.admin.UserMQAdminPoolManager;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.User;
import org.apache.rocketmq.dashboard.service.UserService;
import org.springframework.beans.factory.InitializingBean;
import org.apache.rocketmq.dashboard.service.provider.UserInfoProvider;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.io.FileReader;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class UserServiceImpl implements UserService, InitializingBean {
public class UserServiceImpl implements UserService {
private static final Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Resource
private RMQConfigure configure;
private FileBasedUserInfoStore fileBasedUserInfoStore;
@Autowired
private UserInfoProvider userInfoProvider;
@Autowired
private UserMQAdminPoolManager userMQAdminPoolManager;
@Override
public User queryByName(String name) {
return fileBasedUserInfoStore.queryByName(name);
UserInfo userInfo = userInfoProvider.getUserInfoByUsername(name);
if (userInfo == null) {
return null;
}
return new User(userInfo.getUsername(), userInfo.getPassword(), UserType.getByName(userInfo.getUserType()).getCode());
}
@Override
public User queryByUsernameAndPassword(String username, String password) {
return fileBasedUserInfoStore.queryByUsernameAndPassword(username, password);
}
@Override
public void afterPropertiesSet() throws Exception {
if (configure.isLoginRequired()) {
fileBasedUserInfoStore = new FileBasedUserInfoStore(configure);
}
}
public static class FileBasedUserInfoStore extends AbstractFileStore {
private static final String FILE_NAME = "users.properties";
private static Map<String, User> userMap = new ConcurrentHashMap<>();
public FileBasedUserInfoStore(RMQConfigure configure) {
super(configure, FILE_NAME);
}
@Override
public void load(InputStream inputStream) {
Properties prop = new Properties();
try {
if (inputStream == null) {
prop.load(new FileReader(filePath));
} else {
prop.load(inputStream);
}
} catch (Exception e) {
log.error("load user.properties failed", e);
throw new ServiceException(0, String.format("Failed to load loginUserInfo property file: %s", filePath));
}
Map<String, User> loadUserMap = new HashMap<>();
String[] arrs;
int role;
for (String key : prop.stringPropertyNames()) {
String v = prop.getProperty(key);
if (v == null)
continue;
arrs = v.split(",", 2);
if (arrs.length == 0) {
continue;
} else if (arrs.length == 1) {
role = 0;
} else {
role = Integer.parseInt(arrs[1].trim());
}
loadUserMap.put(key, new User(key, arrs[0].trim(), role));
}
userMap.clear();
userMap.putAll(loadUserMap);
}
public User queryByName(String name) {
return userMap.get(name);
}
public User queryByUsernameAndPassword(@NotNull String username, @NotNull String password) {
User user = queryByName(username);
if (user != null && password.equals(user.getPassword())) {
return user.cloneOne();
}
if (user != null && !user.getPassword().equals(password)) {
return null;
}
return user;
}
public MQAdminExt getMQAdminExtForUser(User user) throws Exception {
if (user == null) {
throw new IllegalArgumentException("User object cannot be null when requesting MQAdminExt.");
}
return userMQAdminPoolManager.borrowMQAdminExt(user.getName(), user.getPassword());
}
public void returnMQAdminExtForUser(User user, MQAdminExt mqAdminExt) {
if (user == null || mqAdminExt == null) {
log.warn("Attempted to return MQAdminExt with null user or mqAdminExt object.");
return;
}
userMQAdminPoolManager.returnMQAdminExt(user.getName(), mqAdminExt);
}
public void onUserLogout(User user) {
if (user != null) {
userMQAdminPoolManager.shutdownUserPool(user.getName());
log.info("User {} logged out, their MQAdminExt pool has been shut down.", user.getName());
}
}
}

View File

@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.provider;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
public interface UserInfoProvider {
UserInfo getUserInfoByUsername(String username);
}

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.service.provider;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class UserInfoProviderImpl implements UserInfoProvider {
private static final Logger log = LoggerFactory.getLogger(UserInfoProviderImpl.class);
@Autowired
private MQAdminExt mqAdminExt;
@Autowired
private ClusterInfoService clusterInfoService;
@Override
public UserInfo getUserInfoByUsername(String username) {
ClusterInfo clusterInfo = clusterInfoService.get();
if (clusterInfo == null || clusterInfo.getBrokerAddrTable() == null || clusterInfo.getBrokerAddrTable().isEmpty()) {
log.warn("Cluster information is not available or has no broker addresses.");
return null;
}
for (BrokerData brokerLiveInfo : clusterInfo.getBrokerAddrTable().values()) {
if (brokerLiveInfo == null || brokerLiveInfo.getBrokerAddrs() == null || brokerLiveInfo.getBrokerAddrs().isEmpty()) {
continue;
}
String brokerAddr = brokerLiveInfo.getBrokerAddrs().get(0L); // Assuming 0L is the primary address
if (brokerAddr == null) {
continue;
}
try {
UserInfo userInfo = mqAdminExt.getUser(brokerAddr, username);
if (userInfo != null) {
return userInfo;
}
} catch (Exception e) {
log.warn("Failed to get user {} from broker {}. Trying next broker if available. Error: {}", username, brokerAddr, e.getMessage());
}
}
return null;
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
@@ -49,7 +50,7 @@ public class AutoCloseConsumerWrapper {
}
public DefaultMQPullConsumer getConsumer(RPCHook rpcHook,Boolean useTLS) {
public DefaultMQPullConsumer getConsumer(RPCHook rpcHook, Boolean useTLS) {
lastUsedTime = Instant.now();
DefaultMQPullConsumer consumer = CONSUMER_REF.get();
@@ -57,7 +58,7 @@ public class AutoCloseConsumerWrapper {
synchronized (this) {
consumer = CONSUMER_REF.get();
if (consumer == null) {
consumer = createNewConsumer(rpcHook,useTLS);
consumer = createNewConsumer(rpcHook, useTLS);
CONSUMER_REF.set(consumer);
}
try {

View File

@@ -17,7 +17,7 @@
package org.apache.rocketmq.dashboard.support;
import javax.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -17,7 +17,6 @@
package org.apache.rocketmq.dashboard.support;
import java.lang.annotation.Annotation;
import org.apache.rocketmq.dashboard.aspect.admin.annotation.OriginalControllerReturnValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +28,8 @@ import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
import java.lang.annotation.Annotation;
@ControllerAdvice(basePackages = "org.apache.rocketmq.dashboard")
public class GlobalRestfulResponseBodyAdvice implements ResponseBodyAdvice<Object> {

View File

@@ -18,21 +18,22 @@ package org.apache.rocketmq.dashboard.task;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.stats.Stats;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
@Slf4j
public class CollectTaskRunnble implements Runnable {

View File

@@ -21,6 +21,23 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
@@ -32,22 +49,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class DashboardCollectTask {

View File

@@ -16,8 +16,7 @@
*/
package org.apache.rocketmq.dashboard.task;
import java.util.Map;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import org.apache.rocketmq.dashboard.model.ConsumerMonitorConfig;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.service.ConsumerService;
@@ -27,6 +26,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class MonitorTask {
private Logger logger = LoggerFactory.getLogger(MonitorTask.class);

View File

@@ -21,11 +21,12 @@ import com.alibaba.excel.support.ExcelTypeEnum;
import com.alibaba.excel.write.metadata.style.WriteCellStyle;
import com.alibaba.excel.write.metadata.style.WriteFont;
import com.alibaba.excel.write.style.HorizontalCellStyleStrategy;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.poi.ss.usermodel.HorizontalAlignment;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.apache.poi.ss.usermodel.HorizontalAlignment;
public class ExcelUtil {

View File

@@ -25,12 +25,13 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Writer;
import java.text.SimpleDateFormat;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("unchecked")
public class JsonUtil {

View File

@@ -17,9 +17,6 @@
package org.apache.rocketmq.dashboard.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceConstants;
@@ -29,6 +26,9 @@ import org.apache.rocketmq.common.message.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static org.apache.rocketmq.client.trace.TraceType.Pub;
public class MsgTraceDecodeUtil {

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.util;
import java.util.HashMap;
import java.util.Map;
public class UserInfoContext {
private static final ThreadLocal<Map<String, Object>> USER_THREAD_LOCAL = ThreadLocal.withInitial(HashMap::new);
public static void set(String key, Object value) {
USER_THREAD_LOCAL.get().put(key, value);
}
public static Object get(String key) {
return USER_THREAD_LOCAL.get().get(key);
}
public static Map<String, Object> getAll() {
return new HashMap<>(USER_THREAD_LOCAL.get());
}
public static void clear() {
USER_THREAD_LOCAL.remove();
}
}

View File

@@ -17,14 +17,14 @@
package org.apache.rocketmq.dashboard.util;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.servlet.http.HttpSession;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.dashboard.model.User;
import org.apache.rocketmq.dashboard.model.UserInfo;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.io.PrintWriter;

View File

@@ -16,7 +16,7 @@
#
server:
port: 8080
port: 8082
servlet:
encoding:
charset: UTF-8