[ISSUES #315]: Add acl2.0 cluster support

This commit is contained in:
Crazylychee
2025-06-16 14:31:18 +08:00
committed by GitHub
parent bc1a05d16c
commit e81dceb6ae
3 changed files with 217 additions and 77 deletions

View File

@@ -0,0 +1,35 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<link rel="shortcut icon" href="./favicon.ico" type="image/x-icon" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="description" content="" />
<meta name="keywords" content="" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<title>RocketMQ Dashboard</title>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>
<!-- React App will mount here -->
<div id="root"></div>
</body>
</html>

View File

@@ -65,7 +65,110 @@ const Acl = () => {
const ipRegex =
/^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$|^((?:[0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}|(?:[0-9A-Fa-f]{1,4}:){6}[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4}){0,1}|(?:[0-9A-Fa-f]{1,4}:){5}[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4}){0,2}|(?:[0-9A-Fa-f]{1,4}:){4}[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4}){0,3}|(?:[0-9A-Fa-f]{1,4}:){3}[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4}){0,4}|(?:[0-9A-Fa-f]{1,4}:){2}[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4}){0,5}|(?:[0-9A-Fa-f]{1,4}:){1}[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4}){0,6}|(?::(?::[0-9A-Fa-f]{1,4}){1,7}|::))(\/(?:12[0-7]|1[0-1][0-9]|[1-9]?[0-9]))?$/;
// 支持 IPv4 和 IPv6包括 CIDR 表示法
// State to store the entire clusterInfo object for easy access
const [clusterData, setClusterData] = useState(null);
// State for the list of available cluster names for the dropdown
const [clusterNamesOptions, setClusterNamesOptions] = useState([]);
// State for the currently selected cluster name
const [selectedCluster, setSelectedCluster] = useState(undefined);
// State for the list of available broker names for the dropdown (depends on selectedCluster)
const [brokerNamesOptions, setBrokerNamesOptions] = useState([]);
// State for the currently selected broker name
const [selectedBroker, setSelectedBroker] = useState(undefined);
// State for the address of the selected broker
const [brokerAddress, setBrokerAddress] = useState(undefined);
// --- Data Fetching and Initial Setup ---
useEffect(() => {
const fetchData = async () => {
const clusterResponse = await remoteApi.getClusterList();
if (clusterResponse.status === 0 && clusterResponse.data) {
const { clusterInfo } = clusterResponse.data;
setClusterData(clusterInfo); // Store the entire clusterInfo
// Populate cluster names for the first dropdown
const clusterNames = Object.keys(clusterInfo?.clusterAddrTable || {});
setClusterNamesOptions(clusterNames.map(name => ({ label: name, value: name })));
// Set initial selections if clusters are available
if (clusterNames.length > 0) {
const defaultCluster = clusterNames[0];
setSelectedCluster(defaultCluster);
// Manually trigger broker list update for the default cluster
updateBrokerOptions(defaultCluster, clusterInfo);
// Set default broker and its address if available
const brokersInDefaultCluster = clusterInfo.clusterAddrTable[defaultCluster] || [];
if (brokersInDefaultCluster.length > 0) {
const defaultBroker = brokersInDefaultCluster[0];
setSelectedBroker(defaultBroker);
// Get the address from brokerAddrTable using the defaultBroker name
const addr = clusterInfo.brokerAddrTable?.[defaultBroker]?.brokerAddrs?.["0"];
setBrokerAddress(addr);
}
}
} else {
console.error('Failed to fetch cluster list:', clusterResponse.errMsg);
}
};
if(!clusterData){
fetchData();
}
if(brokerAddress){
// Call fetchUsers or fetchAcls based on activeTab initially
if (activeTab === 'users') {
fetchUsers();
} else {
fetchAcls();
}
}
}, [activeTab]); // Dependencies for useEffect
// --- Helper function to update broker options based on selected cluster ---
const updateBrokerOptions = (clusterName, info = clusterData) => {
if (!info || !info.clusterAddrTable) {
setBrokerNamesOptions([]);
return;
}
const brokersInCluster = info.clusterAddrTable[clusterName] || [];
setBrokerNamesOptions(brokersInCluster.map(broker => ({ label: broker, value: broker })));
};
// --- Event Handlers ---
const handleClusterChange = (value) => {
setSelectedCluster(value);
setSelectedBroker(undefined); // Reset broker selection
setBrokerAddress(undefined); // Reset broker address
// Update the broker options based on the newly selected cluster
updateBrokerOptions(value);
};
const handleBrokerChange = (value) => {
setSelectedBroker(value);
// Find the corresponding broker address from clusterData
if (clusterData && clusterData.brokerAddrTable && clusterData.brokerAddrTable[value]) {
const addr = clusterData.brokerAddrTable[value].brokerAddrs?.["0"];
setBrokerAddress(addr);
} else {
setBrokerAddress(undefined);
}
};
// --- Log selected values for debugging (optional) ---
useEffect(() => {
console.log('Selected Cluster:', selectedCluster);
console.log('Selected Broker:', selectedBroker);
console.log('Broker Address:', brokerAddress);
}, [selectedCluster, selectedBroker, brokerAddress]);
const handleIpChange = value => {
// 过滤掉重复的IP地址
const uniqueIps = Array.from(new Set(value));
@@ -92,7 +195,7 @@ const Acl = () => {
const fetchUsers = async () => {
setLoading(true);
try {
const result = await remoteApi.listUsers();
const result = await remoteApi.listUsers(brokerAddress);
if (result && result.status === 0 && result.data) {
const formattedUsers = result.data.map(user => ({
...user,
@@ -114,7 +217,7 @@ const Acl = () => {
const fetchAcls = async (value) => {
setLoading(true);
try {
const result = await remoteApi.listAcls(null, value);
const result = await remoteApi.listAcls(brokerAddress, value);
if (result && result.status === 0) {
const formattedAcls = [];
@@ -162,13 +265,6 @@ const Acl = () => {
}
};
useEffect(() => {
if (activeTab === 'users') {
fetchUsers();
} else {
fetchAcls();
}
}, [activeTab]);
// --- User Management Logic ---
@@ -194,10 +290,10 @@ const Acl = () => {
const handleDeleteUser = async (username) => {
setLoading(true);
try {
const result = await remoteApi.deleteUser(null, username);
const result = await remoteApi.deleteUser(brokerAddress, username);
if (result.status === 0) {
messageApi.success(t.USER_DELETE_SUCCESS);
fetchUsers();
fetchUsers(brokerAddress);
} else {
messageApi.error(t.USER_DELETE_FAILED + result.errMsg);
}
@@ -223,14 +319,14 @@ const Acl = () => {
};
if (currentUser) {
result = await remoteApi.updateUser(null, userInfoParam);
result = await remoteApi.updateUser(brokerAddress, userInfoParam);
if (result.status === 0) {
messageApi.success(t.USER_UPDATE_SUCCESS);
} else {
messageApi.error(result.errMsg);
}
} else {
result = await remoteApi.createUser(null, userInfoParam);
result = await remoteApi.createUser(brokerAddress, userInfoParam);
if (result.status === 0) {
messageApi.success(t.USER_CREATE_SUCCESS);
} else {
@@ -272,7 +368,7 @@ const Acl = () => {
const handleDeleteAcl = async (subject, resource) => {
setLoading(true);
try {
const result = await remoteApi.deleteAcl(null, subject, resource);
const result = await remoteApi.deleteAcl(brokerAddress, subject, resource);
if (result.status === 0) {
messageApi.success(t.ACL_DELETE_SUCCESS);
fetchAcls();
@@ -308,22 +404,22 @@ const Acl = () => {
];
if (isUpdate) { // This condition seems reversed for update/create based on the current logic.
result = await remoteApi.updateAcl(null, values.subject, policiesParam);
result = await remoteApi.updateAcl(brokerAddress, values.subject, policiesParam);
if (result.status === 0) {
messageApi.success(t.ACL_UPDATE_SUCCESS);
setIsAclModalVisible(false);
fetchAcls();
fetchAcls(brokerAddress);
} else {
messageApi.error(t.ACL_UPDATE_FAILED+result.errMsg);
}
setIsUpdate(false)
} else {
result = await remoteApi.createAcl(null, values.subject, policiesParam);
result = await remoteApi.createAcl(brokerAddress, values.subject, policiesParam);
console.log(result)
if (result.status === 0) {
messageApi.success(t.ACL_CREATE_SUCCESS);
setIsAclModalVisible(false);
fetchAcls();
fetchAcls(brokerAddress);
} else {
messageApi.error(t.ACL_CREATE_FAILED+result.errMsg);
}
@@ -474,14 +570,40 @@ const Acl = () => {
return (
<>
{msgContextHolder}
<div style={{ padding: 24 }}>
<div style={{padding: 24}}>
<h2>{t.ACL_MANAGEMENT}</h2>
<div style={{ marginBottom: 16, display: 'flex', gap: 16 }}>
<Form.Item label={t.PLEASE_SELECT_CLUSTER} style={{ marginBottom: 0 }}>
<Select
placeholder={t.PLEASE_SELECT_CLUSTER}
style={{ width: 200 }}
onChange={handleClusterChange}
value={selectedCluster}
options={clusterNamesOptions}
/>
</Form.Item>
<Form.Item label={t.PLEASE_SELECT_BROKER} style={{ marginBottom: 0 }}>
<Select
placeholder={t.PLEASE_SELECT_BROKER}
style={{ width: 200 }}
onChange={handleBrokerChange}
value={selectedBroker}
options={brokerNamesOptions} // Now dynamically updated
disabled={!selectedCluster} // Disable broker selection if no cluster is chosen
/>
</Form.Item>
<Button type="primary" onClick={activeTab === 'users' ? fetchUsers : fetchAcls}>
{t.CONFIRM}
</Button>
</div>
<Tabs activeKey={activeTab} onChange={setActiveTab}>
<TabPane tab={t.ACL_USERS} key="users" />
<TabPane tab={t.ACL_PERMISSIONS} key="acls" />
<TabPane tab={t.ACL_USERS} key="users"/>
<TabPane tab={t.ACL_PERMISSIONS} key="acls"/>
</Tabs>
<div style={{ marginBottom: 16, display: 'flex', justifyContent: 'space-between' }}>
<div style={{marginBottom: 16, display: 'flex', justifyContent: 'space-between'}}>
<Button type="primary" onClick={activeTab === 'users' ? handleAddUser : handleAddAcl}>
{activeTab === 'users' ? t.ADD_USER : t.ADD_ACL_PERMISSION}
</Button>
@@ -489,7 +611,7 @@ return (
placeholder={t.SEARCH_PLACEHOLDER}
allowClear
onSearch={handleSearch}
style={{ width: 300 }}
style={{width: 300}}
/>
</div>
@@ -498,7 +620,7 @@ return (
columns={userColumns}
dataSource={userListData}
loading={loading}
pagination={{ pageSize: 10 }}
pagination={{pageSize: 10}}
rowKey="username"
/>
)}
@@ -508,7 +630,7 @@ return (
columns={aclColumns}
dataSource={aclListData}
loading={loading}
pagination={{ pageSize: 10 }}
pagination={{pageSize: 10}}
rowKey="key"
/>
)}
@@ -533,31 +655,31 @@ return (
form={userForm}
layout="vertical"
name="user_form"
initialValues={{ userStatus: 'enable' }}
initialValues={{userStatus: 'enable'}}
>
<Form.Item
name="username"
label={t.USERNAME}
rules={[{ required: true, message: t.PLEASE_ENTER_USERNAME }]}
rules={[{required: true, message: t.PLEASE_ENTER_USERNAME}]}
>
<Input disabled={!!currentUser} />
<Input disabled={!!currentUser}/>
</Form.Item>
<Form.Item
name="password"
label={t.PASSWORD}
rules={[{ required: !currentUser, message: t.PLEASE_ENTER_PASSWORD }]}
rules={[{required: !currentUser, message: t.PLEASE_ENTER_PASSWORD}]}
>
<Input.Password
placeholder={t.PASSWORD}
iconRender={visible => (visible ? <EyeOutlined /> : <EyeInvisibleOutlined />)}
iconRender={visible => (visible ? <EyeOutlined/> : <EyeInvisibleOutlined/>)}
/>
</Form.Item>
<Form.Item
name="userType"
label={t.USER_TYPE}
rules={[{ required: true, message: t.PLEASE_SELECT_USER_TYPE }]}
rules={[{required: true, message: t.PLEASE_SELECT_USER_TYPE}]}
>
<Select mode="single" placeholder="Super, Normal" style={{ width: '100%' }}>
<Select mode="single" placeholder="Super, Normal" style={{width: '100%'}}>
<Select.Option value="Super">Super</Select.Option>
<Select.Option value="Normal">Normal</Select.Option>
</Select>
@@ -565,9 +687,9 @@ return (
<Form.Item
name="userStatus"
label={t.USER_STATUS}
rules={[{ required: true, message: t.PLEASE_SELECT_USER_STATUS }]}
rules={[{required: true, message: t.PLEASE_SELECT_USER_STATUS}]}
>
<Select mode="single" placeholder="enable, disable" style={{ width: '100%' }}>
<Select mode="single" placeholder="enable, disable" style={{width: '100%'}}>
<Select.Option value="enable">enable</Select.Option>
<Select.Option value="disable">disable</Select.Option>
</Select>
@@ -591,17 +713,17 @@ return (
<Form.Item
name="subject"
label={t.SUBJECT_LABEL}
rules={[{ required: true, message: t.PLEASE_ENTER_SUBJECT }]}
rules={[{required: true, message: t.PLEASE_ENTER_SUBJECT}]}
>
<SubjectInput disabled={!!currentAcl} />
<SubjectInput disabled={!!currentAcl}/>
</Form.Item>
<Form.Item
name="policyType"
label={t.POLICY_TYPE}
rules={[{ required: true, message: t.PLEASE_ENTER_POLICY_TYPE }]}
rules={[{required: true, message: t.PLEASE_ENTER_POLICY_TYPE}]}
>
<Select mode="single" disabled={isUpdate} placeholder="policyType" style={{ width: '100%' }}>
<Select mode="single" disabled={isUpdate} placeholder="policyType" style={{width: '100%'}}>
<Select.Option value="Custom">Custom</Select.Option>
<Select.Option value="Default">Default</Select.Option>
</Select>
@@ -610,12 +732,12 @@ return (
<Form.Item
name="resource"
label={t.RESOURCE}
rules={[{ required: true, message: t.PLEASE_ADD_RESOURCE }]}
rules={[{required: true, message: t.PLEASE_ADD_RESOURCE}]}
>
{isUpdate ? (
<Input disabled={isUpdate} />
<Input disabled={isUpdate}/>
) : (
<ResourceInput />
<ResourceInput/>
)}
</Form.Item>
@@ -623,7 +745,7 @@ return (
name="actions"
label={t.OPERATION_TYPE}
>
<Select mode="multiple" placeholder="action" style={{ width: '100%' }}>
<Select mode="multiple" placeholder="action" style={{width: '100%'}}>
<Select.Option value="All">All</Select.Option>
<Select.Option value="Pub">Pub</Select.Option>
<Select.Option value="Sub">Sub</Select.Option>
@@ -645,7 +767,7 @@ return (
>
<Select
mode="tags"
style={{ width: '100%' }}
style={{width: '100%'}}
placeholder={t.ENTER_IP_HINT}
onChange={handleIpChange}
onDeselect={handleIpDeselect}
@@ -660,9 +782,9 @@ return (
<Form.Item
name="decision"
label={t.DECISION}
rules={[{ required: true, message: t.PLEASE_ENTER_DECISION }]}
rules={[{required: true, message: t.PLEASE_ENTER_DECISION}]}
>
<Select mode="single" placeholder="Allow, Deny" style={{ width: '100%' }}>
<Select mode="single" placeholder="Allow, Deny" style={{width: '100%'}}>
<Select.Option value="Allow">Allow</Select.Option>
<Select.Option value="Deny">Deny</Select.Option>
</Select>
@@ -671,6 +793,7 @@ return (
</Modal>
</div>
</>
);}
);
}
export default Acl;

View File

@@ -18,13 +18,11 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.Entry;
import org.apache.rocketmq.dashboard.model.Policy;
import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.request.UserInfoParam;
import org.apache.rocketmq.dashboard.service.AclService;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -47,20 +45,12 @@ public class AclServiceImpl implements AclService {
@Autowired
private MQAdminExt mqAdminExt;
@Autowired
private RMQConfigure rmqConfigure;
@Autowired
private ClusterInfoService clusterInfoService;
private static final String DEFAULT_BROKER_ADDRESS = "localhost:10911";
@Override
public List<UserInfo> listUsers(String brokerAddress) {
List<UserInfo> userList;
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
userList = mqAdminExt.listUser(address, "");
userList = mqAdminExt.listUser(brokerAddress, "");
} catch (Exception ex) {
logger.error("Failed to list users from broker: {}", brokerAddress, ex);
throw new RuntimeException("Failed to list users", ex);
@@ -76,14 +66,13 @@ public class AclServiceImpl implements AclService {
public Object listAcls(String brokerAddress, String searchParam) {
List<AclInfo> aclList;
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
String user = searchParam != null ? searchParam : "";
String res = searchParam != null ? searchParam : "";
aclList = mqAdminExt.listAcl(address, user, "");
aclList = mqAdminExt.listAcl(brokerAddress, user, "");
if (aclList == null) {
aclList = new ArrayList<>();
}
List<AclInfo> resAclList = mqAdminExt.listAcl(address, "", res);
List<AclInfo> resAclList = mqAdminExt.listAcl(brokerAddress, "", res);
if (resAclList != null) {
aclList.addAll(resAclList);
}
@@ -117,8 +106,6 @@ public class AclServiceImpl implements AclService {
return successfulResources;
}
String brokerAddress = policyRequest.getBrokerAddress() != null && !policyRequest.getBrokerAddress().isEmpty() ?
policyRequest.getBrokerAddress() : DEFAULT_BROKER_ADDRESS;
String subject = policyRequest.getSubject();
if (subject == null || subject.isEmpty()) {
@@ -150,12 +137,12 @@ public class AclServiceImpl implements AclService {
aclInfo.setSubject(subject);
try {
logger.info("Attempting to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress);
mqAdminExt.createAcl(brokerAddress, aclInfo);
logger.info("Attempting to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, policyRequest.getBrokerAddress());
mqAdminExt.createAcl(policyRequest.getBrokerAddress(), aclInfo);
successfulResources.add(resource);
logger.info("Successfully created ACL for subject: {}, resource: {}", subject, resource);
} catch (Exception ex) {
logger.error("Failed to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress, ex);
logger.error("Failed to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, policyRequest.getBrokerAddress(), ex);
throw new RuntimeException("Failed to create ACL", ex);
}
}
@@ -169,8 +156,7 @@ public class AclServiceImpl implements AclService {
@Override
public void deleteUser(String brokerAddress, String username) {
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.deleteUser(address, username);
mqAdminExt.deleteUser(brokerAddress, username);
} catch (Exception ex) {
logger.error("Failed to delete user: {} from broker: {}", username, brokerAddress, ex);
throw new RuntimeException("Failed to delete user", ex);
@@ -186,8 +172,7 @@ public class AclServiceImpl implements AclService {
user.setUserType(userParam.getUserType());
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.updateUser(address, user);
mqAdminExt.updateUser(brokerAddress, user);
} catch (Exception ex) {
logger.error("Failed to update user: {} on broker: {}", userParam.getUsername(), brokerAddress, ex);
throw new RuntimeException("Failed to update user", ex);
@@ -202,8 +187,7 @@ public class AclServiceImpl implements AclService {
user.setUserStatus(userParam.getUserStatus());
user.setUserType(userParam.getUserType());
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.createUser(address, user);
mqAdminExt.createUser(brokerAddress, user);
} catch (Exception ex) {
logger.error("Failed to create user: {} on broker: {}", userParam.getUsername(), brokerAddress, ex);
throw new RuntimeException("Failed to create user", ex);
@@ -213,9 +197,8 @@ public class AclServiceImpl implements AclService {
@Override
public void deleteAcl(String brokerAddress, String subject, String resource) {
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
String res = resource != null ? resource : "";
mqAdminExt.deleteAcl(address, subject, res);
mqAdminExt.deleteAcl(brokerAddress, subject, res);
} catch (Exception ex) {
logger.error("Failed to delete ACL for subject: {} and resource: {} on broker: {}", subject, resource, brokerAddress, ex);
throw new RuntimeException("Failed to delete ACL", ex);
@@ -229,8 +212,8 @@ public class AclServiceImpl implements AclService {
logger.warn("Policy request is null or policies list is empty. No ACLs to update.");
}
String brokerAddress = policyRequest.getBrokerAddress() != null && !policyRequest.getBrokerAddress().isEmpty() ?
policyRequest.getBrokerAddress() : DEFAULT_BROKER_ADDRESS;
assert policyRequest != null;
String brokerAddress = policyRequest.getBrokerAddress();
String subject = policyRequest.getSubject();
if (subject == null || subject.isEmpty()) {
@@ -262,8 +245,7 @@ public class AclServiceImpl implements AclService {
aclInfo.setSubject(subject);
try {
String address = brokerAddress != null && !brokerAddress.isEmpty() ? brokerAddress : DEFAULT_BROKER_ADDRESS;
mqAdminExt.updateAcl(address, aclInfo);
mqAdminExt.updateAcl(brokerAddress, aclInfo);
} catch (Exception ex) {
logger.error("Failed to update ACL for subject: {} on broker: {}", subject, brokerAddress, ex);
throw new RuntimeException("Failed to update ACL", ex);