[Enhancement] ACL can add rules in clusters (#340)

* [Enhancement] ACL can add rules in clusters and fix ISSUE #297

* rollback the yml change
This commit is contained in:
Crazylychee
2025-07-08 10:46:25 +08:00
committed by GitHub
parent 87cfa3e872
commit a4e02f472f
20 changed files with 1158 additions and 613 deletions

View File

@@ -94,8 +94,9 @@
## ACL2.0管理界面 ## ACL2.0管理界面
- 支持根据broker地址的acl规则的查询 - 支持根据集群名字或者broker地址的acl规则的查询
- acl规则的修改、新增、删除、查找 - acl规则的修改、新增、删除、查找
- 如果只是选取了集群名字那么查询的acl列表将会取交集如果选取了brokerName就会返回该broker的acl列表。
- 不再支持acl1.0 - 不再支持acl1.0
![image-20250706145313629](UserGuide_CN/image-20250706145313629.png) ![image-20250706145313629](UserGuide_CN/image-20250706145313629.png)

View File

@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
const appConfig = { const appConfig = {
apiBaseUrl: 'http://localhost:8082' // 请替换为你的实际 API Base URL apiBaseUrl: 'http://localhost:8082'
}; };
let _redirectHandler = null; let _redirectHandler = null;
@@ -74,34 +74,36 @@ const remoteApi = {
} }
}, },
listUsers: async (brokerAddress) => { listUsers: async (brokerName, clusterName) => {
const params = new URLSearchParams(); const params = new URLSearchParams();
if (brokerAddress) params.append('brokerAddress', brokerAddress); if (brokerName) params.append('brokerName', brokerName);
if (clusterName) params.append('clusterName', clusterName);
const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/users.query?${params.toString()}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/users.query?${params.toString()}`));
return await response.json(); return await response.json();
}, },
createUser: async (brokerAddress, userInfo) => { createUser: async (brokerName, userInfo, clusterName) => {
const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/createUser.do'), { const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/createUser.do'), {
method: 'POST', method: 'POST',
headers: {'Content-Type': 'application/json'}, headers: {'Content-Type': 'application/json'},
body: JSON.stringify({brokerAddress, userInfo}) body: JSON.stringify({brokerName, userInfo, clusterName})
});
return await response.json(); // 返回字符串消息
},
updateUser: async (brokerAddress, userInfo) => {
const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/updateUser.do'), {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({brokerAddress, userInfo})
}); });
return await response.json(); return await response.json();
}, },
deleteUser: async (brokerAddress, username) => { updateUser: async (brokerName, userInfo, clusterName) => {
const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/updateUser.do'), {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({brokerName, userInfo, clusterName})
});
return await response.json();
},
deleteUser: async (brokerName, username, clusterName) => {
const params = new URLSearchParams(); const params = new URLSearchParams();
if (brokerAddress) params.append('brokerAddress', brokerAddress); if (brokerName) params.append('brokerName', brokerName);
if (clusterName) params.append('clusterName', clusterName);
params.append('username', username); params.append('username', username);
const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/deleteUser.do?${params.toString()}`), { const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/deleteUser.do?${params.toString()}`), {
method: 'DELETE' method: 'DELETE'
@@ -109,38 +111,40 @@ const remoteApi = {
return await response.json(); return await response.json();
}, },
// --- ACL 权限相关 API --- listAcls: async (brokerName, searchParam, clusterName) => {
listAcls: async (brokerAddress, searchParam) => {
const params = new URLSearchParams(); const params = new URLSearchParams();
if (brokerAddress) params.append('brokerAddress', brokerAddress); if (brokerName) params.append('brokerName', brokerName);
if (clusterName) params.append('clusterName', clusterName);
if (searchParam) params.append('searchParam', searchParam); if (searchParam) params.append('searchParam', searchParam);
if (searchParam != null) console.log(1111)
const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/acls.query?${params.toString()}`)); const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/acls.query?${params.toString()}`));
return await response.json(); return await response.json();
}, },
createAcl: async (brokerAddress, subject, policies) => { createAcl: async (brokerName, subject, policies, clusterName) => {
const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/createAcl.do'), { const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/createAcl.do'), {
method: 'POST', method: 'POST',
headers: {'Content-Type': 'application/json'}, headers: {'Content-Type': 'application/json'},
body: JSON.stringify({brokerAddress, subject, policies}) body: JSON.stringify({brokerName, subject, policies, clusterName})
}); });
return await response.json(); return await response.json();
}, },
updateAcl: async (brokerAddress, subject, policies) => { updateAcl: async (brokerName, subject, policies, clusterName) => {
const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/updateAcl.do'), { const response = await remoteApi._fetch(remoteApi.buildUrl('/acl/updateAcl.do'), {
method: 'POST', method: 'POST',
headers: {'Content-Type': 'application/json'}, headers: {'Content-Type': 'application/json'},
body: JSON.stringify({brokerAddress, subject, policies}) body: JSON.stringify({brokerName, subject, policies, clusterName})
}); });
return await response.json(); return await response.json();
}, },
deleteAcl: async (brokerAddress, subject, resource) => { deleteAcl: async (brokerName, subject, resource, clusterName) => {
const params = new URLSearchParams(); const params = new URLSearchParams();
if (brokerAddress) params.append('brokerAddress', brokerAddress); if (brokerName) params.append('brokerAddress', brokerName);
params.append('subject', subject); params.append('subject', subject);
if (resource) params.append('resource', resource); if (resource) params.append('resource', resource);
if (clusterName) params.append('clusterName', clusterName);
const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/deleteAcl.do?${params.toString()}`), { const response = await remoteApi._fetch(remoteApi.buildUrl(`/acl/deleteAcl.do?${params.toString()}`), {
method: 'DELETE' method: 'DELETE'
}); });

View File

@@ -20,13 +20,13 @@ import React, {useEffect, useState} from 'react';
const {Option} = Select; const {Option} = Select;
// Subject 类型枚举
const subjectTypes = [ const subjectTypes = [
{value: 'User', label: 'User'}, {value: 'User', label: 'User'},
]; ];
const SubjectInput = ({value, onChange, disabled}) => { const SubjectInput = ({value, onChange, disabled, t}) => {
// 解析传入的 value将其拆分为 type 和 name
const parseValue = (val) => { const parseValue = (val) => {
if (!val || typeof val !== 'string') { if (!val || typeof val !== 'string') {
return {type: subjectTypes[0].value, name: ''}; // 默认值 return {type: subjectTypes[0].value, name: ''}; // 默认值
@@ -35,27 +35,25 @@ const SubjectInput = ({value, onChange, disabled}) => {
if (parts.length === 2 && subjectTypes.some(t => t.value === parts[0])) { if (parts.length === 2 && subjectTypes.some(t => t.value === parts[0])) {
return {type: parts[0], name: parts[1]}; return {type: parts[0], name: parts[1]};
} }
return {type: subjectTypes[0].value, name: val}; // 如果格式不匹配,将整个值作为 name类型设为默认 return {type: subjectTypes[0].value, name: val};
}; };
const [currentType, setCurrentType] = useState(() => parseValue(value).type); const [currentType, setCurrentType] = useState(() => parseValue(value).type);
const [currentName, setCurrentName] = useState(() => parseValue(value).name); const [currentName, setCurrentName] = useState(() => parseValue(value).name);
// 当外部 value 变化时,更新内部状态
useEffect(() => { useEffect(() => {
const parsed = parseValue(value); const parsed = parseValue(value);
setCurrentType(parsed.type); setCurrentType(parsed.type);
setCurrentName(parsed.name); setCurrentName(parsed.name);
}, [value]); }, [value]);
// 当类型或名称变化时,通知 Form.Item
const triggerChange = (changedType, changedName) => { const triggerChange = (changedType, changedName) => {
if (onChange) { if (onChange) {
// 只有当名称不为空时才组合,否则只返回类型或空字符串
if (changedName) { if (changedName) {
onChange(`${changedType}:${changedName}`); onChange(`${changedType}:${changedName}`);
} else if (changedType) { // 如果只选择了类型,但名称为空,则不组合 } else if (changedType) {
onChange(''); // 或者根据需求返回 'User:' 等,但通常这种情况下不应该有值 onChange('');
} else { } else {
onChange(''); onChange('');
} }
@@ -91,7 +89,7 @@ const SubjectInput = ({value, onChange, disabled}) => {
style={{width: '70%'}} style={{width: '70%'}}
value={currentName} value={currentName}
onChange={onNameChange} onChange={onNameChange}
placeholder="请输入名称 (例如: yourUsername)" placeholder={t.PLEASE_INPUT_NAME}
disabled={disabled} disabled={disabled}
/> />
</Input.Group> </Input.Group>

View File

@@ -16,15 +16,15 @@
*/ */
import React, {useEffect, useState} from 'react'; import React, {useEffect, useState} from 'react';
import {Modal, Spin, Table} from 'antd'; import {Descriptions, Modal, Spin, Table, Tag, Tooltip} from 'antd';
import {remoteApi} from '../../api/remoteApi/remoteApi'; import {remoteApi} from '../../api/remoteApi/remoteApi';
import {useLanguage} from '../../i18n/LanguageContext'; import {useLanguage} from '../../i18n/LanguageContext';
const ClientInfoModal = ({visible, group, address, onCancel}) => { const ClientInfoModal = ({visible, group, address, onCancel}) => {
const {t} = useLanguage(); const {t} = useLanguage();
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
const [connectionData, setConnectionData] = useState(null); const [connectionData, setConnectionData] = useState(null);
const [subscriptionData, setSubscriptionData] = useState(null);
useEffect(() => { useEffect(() => {
const fetchData = async () => { const fetchData = async () => {
@@ -33,10 +33,8 @@ const ClientInfoModal = ({visible, group, address, onCancel}) => {
setLoading(true); setLoading(true);
try { try {
const connResponse = await remoteApi.queryConsumerConnection(group, address); const connResponse = await remoteApi.queryConsumerConnection(group, address);
const topicResponse = await remoteApi.queryTopicByConsumer(group, address);
if (connResponse.status === 0) setConnectionData(connResponse.data); if (connResponse.status === 0) setConnectionData(connResponse.data);
if (topicResponse.status === 0) setSubscriptionData(topicResponse.data);
} finally { } finally {
setLoading(false); setLoading(false);
} }
@@ -46,53 +44,118 @@ const ClientInfoModal = ({visible, group, address, onCancel}) => {
}, [visible, group, address]); }, [visible, group, address]);
const connectionColumns = [ const connectionColumns = [
{title: 'ClientId', dataIndex: 'clientId'}, {
{title: 'ClientAddr', dataIndex: 'clientAddr'}, title: t.CLIENTID, dataIndex: 'clientId', key: 'clientId', width: 220, ellipsis: true,
{title: 'Language', dataIndex: 'language'}, render: (text) => (
{title: 'Version', dataIndex: 'versionDesc'}, <Tooltip title={text}>
{text}
</Tooltip>
)
},
{title: t.CLIENTADDR, dataIndex: 'clientAddr', key: 'clientAddr', width: 150, ellipsis: true},
{title: t.LANGUAGE, dataIndex: 'language', key: 'language', width: 100},
{title: t.VERSION, dataIndex: 'versionDesc', key: 'versionDesc', width: 100},
]; ];
const subscriptionColumns = [ const subscriptionColumns = [
{title: 'Topic', dataIndex: 'topic'}, {
{title: 'SubExpression', dataIndex: 'subString'}, title: t.TOPIC, dataIndex: 'topic', key: 'topic', width: 250, ellipsis: true,
render: (text) => (
<Tooltip title={text}>
{text}
</Tooltip>
)
},
{title: t.SUBSCRIPTION_EXPRESSION, dataIndex: 'subString', key: 'subString', width: 150, ellipsis: true},
{
title: t.EXPRESSION_TYPE, dataIndex: 'expressionType', key: 'expressionType', width: 120,
render: (text) => <Tag color="blue">{text}</Tag>
},
// --- Added Columns for TagsSet and CodeSet ---
{
title: t.TAGS_SET, // Ensure t.TAGS_SET is defined in your language file
dataIndex: 'tagsSet',
key: 'tagsSet',
width: 150,
render: (tags) => (
tags && tags.length > 0 ? (
<Tooltip title={tags.join(', ')}>
{tags.map((tag, index) => (
<Tag key={index} color="default">{tag}</Tag>
))}
</Tooltip>
) : 'N/A'
),
ellipsis: true,
},
{
title: t.CODE_SET, // Ensure t.CODE_SET is defined in your language file
dataIndex: 'codeSet',
key: 'codeSet',
width: 150,
render: (codes) => (
codes && codes.length > 0 ? (
<Tooltip title={codes.join(', ')}>
{codes.map((code, index) => (
<Tag key={index} color="default">{code}</Tag>
))}
</Tooltip>
) : 'N/A'
),
ellipsis: true,
},
// --- End of Added Columns ---
{title: t.SUB_VERSION, dataIndex: 'subVersion', key: 'subVersion', width: 150},
]; ];
const formattedSubscriptionData = connectionData?.subscriptionTable
? Object.keys(connectionData.subscriptionTable).map(key => ({
...connectionData.subscriptionTable[key],
key: key,
}))
: [];
return ( return (
<Modal <Modal
title={`[${group}]${t.CLIENT}`} title={`[${group}] ${t.CLIENT_INFORMATION}`}
visible={visible} visible={visible}
onCancel={onCancel} onCancel={onCancel}
footer={null} footer={null}
width={800} width={1200} // Increased width to accommodate more columns
> >
<Spin spinning={loading}> <Spin spinning={loading}>
{connectionData && ( {connectionData && (
<> <>
<Descriptions bordered column={2} title={t.CONNECTION_OVERVIEW} style={{marginBottom: 20}}>
<Descriptions.Item label={t.CONSUME_TYPE}>
<Tag color="green">{connectionData.consumeType}</Tag>
</Descriptions.Item>
<Descriptions.Item label={t.MESSAGE_MODEL}>
<Tag color="geekblue">{connectionData.messageModel}</Tag>
</Descriptions.Item>
<Descriptions.Item label={t.CONSUME_FROM_WHERE}>
<Tag color="purple">{connectionData.consumeFromWhere}</Tag>
</Descriptions.Item>
</Descriptions>
<h3>{t.CLIENT_CONNECTIONS}</h3>
<Table <Table
columns={connectionColumns} columns={connectionColumns}
dataSource={connectionData.connectionSet} dataSource={connectionData.connectionSet}
rowKey="clientId" rowKey="clientId"
pagination={false} pagination={false}
scroll={{x: 'max-content'}}
style={{marginBottom: 20}}
/> />
<h4>{t.SUBSCRIPTION}</h4>
<h3>{t.CLIENT_SUBSCRIPTIONS}</h3>
<Table <Table
columns={subscriptionColumns} columns={subscriptionColumns}
dataSource={ dataSource={formattedSubscriptionData}
subscriptionData?.subscriptionTable rowKey="key"
? Object.entries(subscriptionData.subscriptionTable).map(([topic, detail]) => ({
topic,
...detail,
}))
: []
}
rowKey="topic"
pagination={false} pagination={false}
locale={{ scroll={{x: 'max-content'}}
emptyText: loading ? <Spin size="small"/> : t.NO_DATA
}}
/> />
<p>ConsumeType: {connectionData.consumeType}</p>
<p>MessageModel: {connectionData.messageModel}</p>
</> </>
)} )}
</Spin> </Spin>

View File

@@ -43,34 +43,90 @@ const ConsumerDetailModal = ({visible, group, address, onCancel}) => {
fetchData(); fetchData();
}, [visible, group, address]); }, [visible, group, address]);
// Format timestamp to readable date
const formatTimestamp = (timestamp) => {
if (!timestamp || timestamp === 0) return '-';
return new Date(timestamp).toLocaleString();
};
// Group data by topic for better organization
const groupByTopic = (data) => {
const grouped = {};
data.forEach(item => {
if (!grouped[item.topic]) {
grouped[item.topic] = [];
}
grouped[item.topic].push(item);
});
return grouped;
};
const groupedDetails = groupByTopic(details);
const queueColumns = [ const queueColumns = [
{title: 'Broker', dataIndex: 'brokerName'}, {title: 'Broker', dataIndex: 'brokerName', width: 120},
{title: 'Queue', dataIndex: 'queueId'}, {title: 'Queue ID', dataIndex: 'queueId', width: 100},
{title: 'BrokerOffset', dataIndex: 'brokerOffset'}, {title: 'Broker Offset', dataIndex: 'brokerOffset', width: 120},
{title: 'ConsumerOffset', dataIndex: 'consumerOffset'}, {title: 'Consumer Offset', dataIndex: 'consumerOffset', width: 120},
{title: 'DiffTotal', dataIndex: 'diffTotal'}, {
{title: 'LastTimestamp', dataIndex: 'lastTimestamp'}, title: 'Lag (Diff)', dataIndex: 'diffTotal', width: 100,
render: (diff) => (
<span style={{color: diff > 0 ? '#f5222d' : '#52c41a'}}>
{diff}
</span>
)
},
{title: 'Client Info', dataIndex: 'clientInfo', width: 200},
{
title: 'Last Consume Time', dataIndex: 'lastTimestamp', width: 180,
render: (timestamp) => formatTimestamp(timestamp)
},
]; ];
return ( return (
<Modal <Modal
title={`[${group}]${t.CONSUME_DETAIL}`} title={
<span>Consumer Details - Group: <strong>{group}</strong> | Address: <strong>{address}</strong></span>}
visible={visible} visible={visible}
onCancel={onCancel} onCancel={onCancel}
footer={null} footer={null}
width={1200} width={1400}
style={{top: 20}}
> >
<Spin spinning={loading}> <Spin spinning={loading}>
{details.map((consumeDetail, index) => ( {Object.entries(groupedDetails).map(([topic, topicDetails]) => (
<div key={index}> <div key={topic} style={{marginBottom: 24}}>
<div style={{
background: '#f0f0f0',
padding: '8px 16px',
marginBottom: 8,
borderRadius: 4,
display: 'flex',
justifyContent: 'space-between',
alignItems: 'center'
}}>
<h3 style={{margin: 0}}>Topic: <strong>{topic}</strong></h3>
<div>
<span style={{marginRight: 16}}>Total Lag: <strong>{topicDetails[0].diffTotal}</strong></span>
<span>Last Consume Time: <strong>{formatTimestamp(topicDetails[0].lastTimestamp)}</strong></span>
</div>
</div>
{topicDetails.map((detail, index) => (
<div key={index} style={{marginBottom: 16}}>
<Table <Table
columns={queueColumns} columns={queueColumns}
dataSource={consumeDetail.queueStatInfoList} dataSource={detail.queueStatInfoList}
rowKey="queueId" rowKey={(record) => `${record.brokerName}-${record.queueId}`}
pagination={false} pagination={false}
size="small"
bordered
scroll={{x: 'max-content'}}
/> />
</div> </div>
))} ))}
</div>
))}
</Spin> </Spin>
</Modal> </Modal>
); );

View File

@@ -290,6 +290,23 @@ export const translations = {
"USERNAME_PLACEHOLDER": "用户名", "USERNAME_PLACEHOLDER": "用户名",
"PASSWORD_REQUIRED": "密码为必填项", "PASSWORD_REQUIRED": "密码为必填项",
"PASSWORD_PLACEHOLDER": "密码", "PASSWORD_PLACEHOLDER": "密码",
"PLEASE_INPUT_NAME":"请输入名称",
"PLEASE_SELECT_CLUSTER": "请选择集群",
"CLIENT_INFORMATION": "客户端信息",
"CONSUME_TYPE": "消费类型",
"MESSAGE_MODEL": "消息模型",
"CONSUME_FROM_WHERE": "从何处消费",
"CLIENT_CONNECTIONS": "客户端连接",
"CLIENT_SUBSCRIPTIONS": "客户端订阅",
"CONNECTION_OVERVIEW": "连接概览",
"CLIENTID": "客户端 ID",
"CLIENTADDR": "客户端地址",
"LANGUAGE": "语言",
"SUBSCRIPTION_EXPRESSION": "订阅表达式",
"EXPRESSION_TYPE": "表达式类型",
"SUB_VERSION": "订阅版本",
"CODE_SET": "代码集",
"TAGS_SET": "标签集"
}, },
en: { en: {
"DEFAULT": "Default", "DEFAULT": "Default",
@@ -558,6 +575,24 @@ export const translations = {
"USERNAME_PLACEHOLDER": "Username placeholder", "USERNAME_PLACEHOLDER": "Username placeholder",
"PASSWORD_REQUIRED": "Password is required", "PASSWORD_REQUIRED": "Password is required",
"PASSWORD_PLACEHOLDER": "Password placeholder", "PASSWORD_PLACEHOLDER": "Password placeholder",
"PLEASE_INPUT_NAME": "Please input name",
"PLEASE_SELECT_CLUSTER": "Please select cluster",
"SUBSCRIPTION": "Subscription",
"CLIENT_INFORMATION": "Client Information",
"CONSUME_TYPE": "Consume Type",
"MESSAGE_MODEL": "Message Model",
"CONSUME_FROM_WHERE": "Consume From Where",
"CLIENT_CONNECTIONS": "Client Connections",
"CLIENT_SUBSCRIPTIONS": "Client Subscriptions",
"CONNECTION_OVERVIEW": "Connection Overview",
"CLIENTID": "Client ID",
"CLIENTADDR": "Client Address",
"LANGUAGE": "Language",
"SUBSCRIPTION_EXPRESSION": "Subscription Expression",
"EXPRESSION_TYPE": "Expression Type",
"SUB_VERSION": "Sub Version",
"CODE_SET": "Code Set",
"TAGS_SET": "Tags Set"
} }

View File

@@ -15,30 +15,14 @@
* limitations under the License. * limitations under the License.
*/ */
import React, { useState, useEffect } from 'react'; import React, {useEffect, useState} from 'react';
import { import {Button, Form, Input, message, Modal, Popconfirm, Select, Space, Table, Tabs, Tag} from 'antd';
Table, import {DeleteOutlined, EditOutlined, EyeInvisibleOutlined, EyeOutlined} from '@ant-design/icons';
Button,
Input,
Tabs,
Modal,
Form,
message,
Space,
Tag,
Popconfirm,
Select
} from 'antd';
import {
EditOutlined,
DeleteOutlined,
EyeOutlined,
EyeInvisibleOutlined
} from '@ant-design/icons';
import {remoteApi} from "../../api/remoteApi/remoteApi"; import {remoteApi} from "../../api/remoteApi/remoteApi";
import ResourceInput from '../../components/acl/ResourceInput'; import ResourceInput from '../../components/acl/ResourceInput';
import SubjectInput from "../../components/acl/SubjectInput"; import SubjectInput from "../../components/acl/SubjectInput";
import {useLanguage} from "../../i18n/LanguageContext"; import {useLanguage} from "../../i18n/LanguageContext";
const {TabPane} = Tabs; const {TabPane} = Tabs;
const {Search} = Input; const {Search} = Input;
@@ -84,6 +68,8 @@ const Acl = () => {
// State for the address of the selected broker // State for the address of the selected broker
const [brokerAddress, setBrokerAddress] = useState(undefined); const [brokerAddress, setBrokerAddress] = useState(undefined);
const [searchValue, setSearchValue] = useState('');
// --- Data Fetching and Initial Setup --- // --- Data Fetching and Initial Setup ---
useEffect(() => { useEffect(() => {
const fetchData = async () => { const fetchData = async () => {
@@ -120,14 +106,14 @@ const Acl = () => {
} }
}; };
if (!clusterData) { if (!clusterData) {
fetchData(); setLoading(true);
fetchData().finally(() => setLoading(false));
} }
if (brokerAddress) { if (brokerAddress) {
// Call fetchUsers or fetchAcls based on activeTab initially
if (activeTab === 'users') { if (activeTab === 'users') {
fetchUsers(); fetchUsers().finally(() => setLoading(false));
} else { } else {
fetchAcls(); fetchAcls().finally(() => setLoading(false));
} }
} }
@@ -135,7 +121,6 @@ const Acl = () => {
useEffect(() => { useEffect(() => {
const userPermission = localStorage.getItem('userrole'); const userPermission = localStorage.getItem('userrole');
console.log(userPermission);
if (userPermission == 2) { if (userPermission == 2) {
setWriteOperationEnabled(false); setWriteOperationEnabled(false);
} else { } else {
@@ -174,12 +159,6 @@ const Acl = () => {
} }
}; };
// --- Log selected values for debugging (optional) ---
useEffect(() => {
console.log('Selected Cluster:', selectedCluster);
console.log('Selected Broker:', selectedBroker);
console.log('Broker Address:', brokerAddress);
}, [selectedCluster, selectedBroker, brokerAddress]);
const handleIpChange = value => { const handleIpChange = value => {
// 过滤掉重复的IP地址 // 过滤掉重复的IP地址
const uniqueIps = Array.from(new Set(value)); const uniqueIps = Array.from(new Set(value));
@@ -206,7 +185,7 @@ const Acl = () => {
const fetchUsers = async () => { const fetchUsers = async () => {
setLoading(true); setLoading(true);
try { try {
const result = await remoteApi.listUsers(brokerAddress); const result = await remoteApi.listUsers(selectedBroker, selectedCluster);
if (result && result.status === 0 && result.data) { if (result && result.status === 0 && result.data) {
const formattedUsers = result.data.map(user => ({ const formattedUsers = result.data.map(user => ({
...user, ...user,
@@ -225,10 +204,10 @@ const Acl = () => {
} }
}; };
const fetchAcls = async (value) => { const fetchAcls = async () => {
setLoading(true); setLoading(true);
try { try {
const result = await remoteApi.listAcls(brokerAddress, value); const result = await remoteApi.listAcls(selectedBroker, searchValue, selectedCluster);
if (result && result.status === 0) { if (result && result.status === 0) {
const formattedAcls = []; const formattedAcls = [];
@@ -245,7 +224,6 @@ const Acl = () => {
const resources = Array.isArray(entry.resource) ? entry.resource : (entry.resource ? [entry.resource] : []); const resources = Array.isArray(entry.resource) ? entry.resource : (entry.resource ? [entry.resource] : []);
resources.forEach((singleResource, resourceIndex) => { resources.forEach((singleResource, resourceIndex) => {
console.log(singleResource)
formattedAcls.push({ formattedAcls.push({
key: `acl-${aclIndex}-policy-${policyIndex}-entry-${entryIndex}-resource-${singleResource}`, key: `acl-${aclIndex}-policy-${policyIndex}-entry-${entryIndex}-resource-${singleResource}`,
subject: subject, subject: subject,
@@ -301,10 +279,10 @@ const Acl = () => {
const handleDeleteUser = async (username) => { const handleDeleteUser = async (username) => {
setLoading(true); setLoading(true);
try { try {
const result = await remoteApi.deleteUser(brokerAddress, username); const result = await remoteApi.deleteUser(selectedBroker, username, selectedCluster);
if (result.status === 0) { if (result.status === 0) {
messageApi.success(t.USER_DELETE_SUCCESS); messageApi.success(t.USER_DELETE_SUCCESS);
fetchUsers(brokerAddress); fetchUsers();
} else { } else {
messageApi.error(t.USER_DELETE_FAILED + result.errMsg); messageApi.error(t.USER_DELETE_FAILED + result.errMsg);
} }
@@ -330,14 +308,14 @@ const Acl = () => {
}; };
if (currentUser) { if (currentUser) {
result = await remoteApi.updateUser(brokerAddress, userInfoParam); result = await remoteApi.updateUser(selectedBroker, userInfoParam, selectedCluster);
if (result.status === 0) { if (result.status === 0) {
messageApi.success(t.USER_UPDATE_SUCCESS); messageApi.success(t.USER_UPDATE_SUCCESS);
} else { } else {
messageApi.error(result.errMsg); messageApi.error(result.errMsg);
} }
} else { } else {
result = await remoteApi.createUser(brokerAddress, userInfoParam); result = await remoteApi.createUser(selectedBroker, userInfoParam, selectedCluster);
if (result.status === 0) { if (result.status === 0) {
messageApi.success(t.USER_CREATE_SUCCESS); messageApi.success(t.USER_CREATE_SUCCESS);
} else { } else {
@@ -379,7 +357,7 @@ const Acl = () => {
const handleDeleteAcl = async (subject, resource) => { const handleDeleteAcl = async (subject, resource) => {
setLoading(true); setLoading(true);
try { try {
const result = await remoteApi.deleteAcl(brokerAddress, subject, resource); const result = await remoteApi.deleteAcl(selectedBroker, subject, resource, selectedCluster);
if (result.status === 0) { if (result.status === 0) {
messageApi.success(t.ACL_DELETE_SUCCESS); messageApi.success(t.ACL_DELETE_SUCCESS);
fetchAcls(); fetchAcls();
@@ -415,22 +393,21 @@ const Acl = () => {
]; ];
if (isUpdate) { // This condition seems reversed for update/create based on the current logic. if (isUpdate) { // This condition seems reversed for update/create based on the current logic.
result = await remoteApi.updateAcl(brokerAddress, values.subject, policiesParam); result = await remoteApi.updateAcl(selectedBroker, values.subject, policiesParam, selectedCluster);
if (result.status === 0) { if (result.status === 0) {
messageApi.success(t.ACL_UPDATE_SUCCESS); messageApi.success(t.ACL_UPDATE_SUCCESS);
setIsAclModalVisible(false); setIsAclModalVisible(false);
fetchAcls(brokerAddress); fetchAcls();
} else { } else {
messageApi.error(t.ACL_UPDATE_FAILED + result.errMsg); messageApi.error(t.ACL_UPDATE_FAILED + result.errMsg);
} }
setIsUpdate(false) setIsUpdate(false)
} else { } else {
result = await remoteApi.createAcl(brokerAddress, values.subject, policiesParam); result = await remoteApi.createAcl(selectedBroker, values.subject, policiesParam, selectedCluster);
console.log(result)
if (result.status === 0) { if (result.status === 0) {
messageApi.success(t.ACL_CREATE_SUCCESS); messageApi.success(t.ACL_CREATE_SUCCESS);
setIsAclModalVisible(false); setIsAclModalVisible(false);
fetchAcls(brokerAddress); fetchAcls();
} else { } else {
messageApi.error(t.ACL_CREATE_FAILED + result.errMsg); messageApi.error(t.ACL_CREATE_FAILED + result.errMsg);
} }
@@ -444,6 +421,10 @@ const Acl = () => {
} }
}; };
const handleInputChange = (e) => {
setSearchValue(e.target.value);
};
// --- Search Functionality --- // --- Search Functionality ---
const handleSearch = (value) => { const handleSearch = (value) => {
@@ -459,7 +440,7 @@ const Acl = () => {
setUserListData(filteredData); setUserListData(filteredData);
} }
} else { } else {
fetchAcls(value); fetchAcls();
} }
}; };
@@ -604,8 +585,9 @@ return (
style={{width: 200}} style={{width: 200}}
onChange={handleBrokerChange} onChange={handleBrokerChange}
value={selectedBroker} value={selectedBroker}
options={brokerNamesOptions} // Now dynamically updated options={brokerNamesOptions}
disabled={!selectedCluster} // Disable broker selection if no cluster is chosen disabled={!selectedCluster}
allowClear
/> />
</Form.Item> </Form.Item>
<Button type="primary" onClick={activeTab === 'users' ? fetchUsers : fetchAcls}> <Button type="primary" onClick={activeTab === 'users' ? fetchUsers : fetchAcls}>
@@ -626,6 +608,8 @@ return (
placeholder={t.SEARCH_PLACEHOLDER} placeholder={t.SEARCH_PLACEHOLDER}
allowClear allowClear
onSearch={handleSearch} onSearch={handleSearch}
value={searchValue}
onChange={handleInputChange}
style={{width: 300}} style={{width: 300}}
/> />
</div> </div>
@@ -730,7 +714,7 @@ return (
label={t.SUBJECT_LABEL} label={t.SUBJECT_LABEL}
rules={[{required: true, message: t.PLEASE_ENTER_SUBJECT}]} rules={[{required: true, message: t.PLEASE_ENTER_SUBJECT}]}
> >
<SubjectInput disabled={!!currentAcl}/> <SubjectInput disabled={!!currentAcl} t={t}/>
</Form.Item> </Form.Item>
<Form.Item <Form.Item

View File

@@ -98,12 +98,12 @@ const ConsumerGroupList = () => {
messageApi.error({title: t.ERROR, content: response.errMsg}); messageApi.error({title: t.ERROR, content: response.errMsg});
} }
} catch (error) { } catch (error) {
messageApi.error({title: t.ERROR, content: t.FAILED_TO_FETCH_DATA});
console.error("Error loading consumer groups:", error); console.error("Error loading consumer groups:", error);
messageApi.error({title: t.ERROR, content: t.FAILED_TO_FETCH_DATA});
} finally { } finally {
setLoading(false); setLoading(false);
} }
}, [t]); }, [t, proxyEnabled, selectedProxy, messageApi, setAllConsumerGroupList, remoteApi, setLoading]);
const filterByType = (str, type, version) => { const filterByType = (str, type, version) => {
if (filterSystem && type === "SYSTEM") return true; if (filterSystem && type === "SYSTEM") return true;
@@ -467,11 +467,17 @@ const ConsumerGroupList = () => {
{notificationContextHolder} {notificationContextHolder}
<div style={{padding: '20px'}}> <div style={{padding: '20px'}}>
<Spin spinning={loading} tip={t.LOADING}> <Spin spinning={loading} tip={t.LOADING}>
<div style={{ marginBottom: '20px', display: 'flex', justifyContent: 'space-between', alignItems: 'center' }}> <div style={{
marginBottom: '20px',
display: 'flex',
justifyContent: 'space-between',
alignItems: 'center'
}}>
{/* 左侧:筛选和操作按钮 */} {/* 左侧:筛选和操作按钮 */}
<div style={{display: 'flex', alignItems: 'center', gap: '15px', flexWrap: 'wrap'}}> <div style={{display: 'flex', alignItems: 'center', gap: '15px', flexWrap: 'wrap'}}>
<div style={{display: 'flex', alignItems: 'center'}}> <div style={{display: 'flex', alignItems: 'center'}}>
<label style={{ marginRight: '8px', whiteSpace: 'nowrap' }}>{t.SUBSCRIPTION_GROUP}:</label> <label
style={{marginRight: '8px', whiteSpace: 'nowrap'}}>{t.SUBSCRIPTION_GROUP}:</label>
<Input <Input
style={{width: '200px'}} style={{width: '200px'}}
value={filterStr} value={filterStr}

View File

@@ -33,10 +33,9 @@ const MessageQueryPage = () => {
const [form] = Form.useForm(); const [form] = Form.useForm();
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
// Topic 查询状态
const [allTopicList, setAllTopicList] = useState([]); const [allTopicList, setAllTopicList] = useState([]);
const [selectedTopic, setSelectedTopic] = useState(null); const [selectedTopic, setSelectedTopic] = useState(null);
const [timepickerBegin, setTimepickerBegin] = useState(moment().subtract(1, 'hour')); // 默认一小时前 const [timepickerBegin, setTimepickerBegin] = useState(moment().subtract(1, 'hour'));
const [timepickerEnd, setTimepickerEnd] = useState(moment()); const [timepickerEnd, setTimepickerEnd] = useState(moment());
const [messageShowList, setMessageShowList] = useState([]); const [messageShowList, setMessageShowList] = useState([]);
const [paginationConf, setPaginationConf] = useState({ const [paginationConf, setPaginationConf] = useState({

View File

@@ -18,10 +18,10 @@
package org.apache.rocketmq.dashboard.controller; package org.apache.rocketmq.dashboard.controller;
import org.apache.rocketmq.dashboard.model.PolicyRequest; import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.UserInfoDto;
import org.apache.rocketmq.dashboard.model.request.UserCreateRequest; import org.apache.rocketmq.dashboard.model.request.UserCreateRequest;
import org.apache.rocketmq.dashboard.model.request.UserUpdateRequest; import org.apache.rocketmq.dashboard.model.request.UserUpdateRequest;
import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl; import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.DeleteMapping;
@@ -44,16 +44,18 @@ public class AclController {
@GetMapping("/users.query") @GetMapping("/users.query")
@ResponseBody @ResponseBody
public List<UserInfo> listUsers(@RequestParam(required = false) String brokerAddress) { public List<UserInfoDto> listUsers(@RequestParam(required = false) String brokerName,
return aclService.listUsers(brokerAddress); @RequestParam(required = false) String clusterName) {
return aclService.listUsers(clusterName, brokerName);
} }
@GetMapping("/acls.query") @GetMapping("/acls.query")
@ResponseBody @ResponseBody
public Object listAcls( public Object listAcls(
@RequestParam(required = false) String brokerAddress, @RequestParam(required = false) String brokerName,
@RequestParam(required = false) String searchParam) { @RequestParam(required = false) String searchParam,
return aclService.listAcls(brokerAddress, searchParam); @RequestParam(required = false) String clusterName) {
return aclService.listAcls(clusterName, brokerName, searchParam);
} }
@PostMapping("/createAcl.do") @PostMapping("/createAcl.do")
@@ -65,30 +67,35 @@ public class AclController {
@DeleteMapping("/deleteUser.do") @DeleteMapping("/deleteUser.do")
@ResponseBody @ResponseBody
public Object deleteUser(@RequestParam(required = false) String brokerAddress, @RequestParam String username) { public Object deleteUser(@RequestParam(required = false) String brokerName,
aclService.deleteUser(brokerAddress, username); @RequestParam String username,
@RequestParam(required = false) String clusterName) {
aclService.deleteUser(clusterName, brokerName, username);
return true; return true;
} }
@RequestMapping(value = "/updateUser.do", method = RequestMethod.POST, produces = "application/json;charset=UTF-8") @RequestMapping(value = "/updateUser.do", method = RequestMethod.POST, produces = "application/json;charset=UTF-8")
@ResponseBody @ResponseBody
public Object updateUser(@RequestBody UserUpdateRequest request) { public Object updateUser(@RequestBody UserUpdateRequest request) {
aclService.updateUser(request.getBrokerAddress(), request.getUserInfo()); aclService.updateUser(request.getClusterName(), request.getBrokerName(), request.getUserInfo());
return true; return true;
} }
@PostMapping("/createUser.do") @PostMapping("/createUser.do")
@ResponseBody
public Object createUser(@RequestBody UserCreateRequest request) { public Object createUser(@RequestBody UserCreateRequest request) {
aclService.createUser(request.getBrokerAddress(), request.getUserInfo()); aclService.createUser(request.getClusterName(), request.getBrokerName(), request.getUserInfo());
return true; return true;
} }
@DeleteMapping("/deleteAcl.do") @DeleteMapping("/deleteAcl.do")
@ResponseBody
public Object deleteAcl( public Object deleteAcl(
@RequestParam(required = false) String brokerAddress, @RequestParam(required = false) String brokerName,
@RequestParam(required = false) String clusterName,
@RequestParam String subject, @RequestParam String subject,
@RequestParam(required = false) String resource) { @RequestParam(required = false) String resource) {
aclService.deleteAcl(brokerAddress, subject, resource); aclService.deleteAcl(clusterName, brokerName, subject, resource);
return true; return true;
} }

View File

@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class AclInfo {
private String subject;
private List<PolicyInfo> policies;
public static AclInfo of(String subject, List<String> resources, List<String> actions,
List<String> sourceIps,
String decision) {
AclInfo aclInfo = new AclInfo();
aclInfo.setSubject(subject);
PolicyInfo policyInfo = PolicyInfo.of(resources, actions, sourceIps, decision);
aclInfo.setPolicies(Collections.singletonList(policyInfo));
return aclInfo;
}
public static class PolicyInfo {
private String policyType;
private List<PolicyEntryInfo> entries;
public static PolicyInfo of(List<String> resources, List<String> actions,
List<String> sourceIps, String decision) {
PolicyInfo policyInfo = new PolicyInfo();
List<PolicyEntryInfo> entries = resources.stream()
.map(resource -> PolicyEntryInfo.of(resource, actions, sourceIps, decision))
.collect(Collectors.toList());
policyInfo.setEntries(entries);
return policyInfo;
}
public String getPolicyType() {
return policyType;
}
public void setPolicyType(String policyType) {
this.policyType = policyType;
}
public List<PolicyEntryInfo> getEntries() {
return entries;
}
public void setEntries(List<PolicyEntryInfo> entries) {
this.entries = entries;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PolicyInfo that = (PolicyInfo) o;
return Objects.equals(policyType, that.policyType) &&
Objects.equals(entries, that.entries);
}
@Override
public int hashCode() {
return Objects.hash(policyType, entries);
}
}
public static class PolicyEntryInfo {
private String resource;
private List<String> actions;
private List<String> sourceIps;
private String decision;
public static PolicyEntryInfo of(String resource, List<String> actions, List<String> sourceIps,
String decision) {
PolicyEntryInfo policyEntryInfo = new PolicyEntryInfo();
policyEntryInfo.setResource(resource);
policyEntryInfo.setActions(actions);
policyEntryInfo.setSourceIps(sourceIps);
policyEntryInfo.setDecision(decision);
return policyEntryInfo;
}
public String getResource() {
return resource;
}
public void setResource(String resource) {
this.resource = resource;
}
public List<String> getActions() {
return actions;
}
public void setActions(List<String> actions) {
this.actions = actions;
}
public List<String> getSourceIps() {
return sourceIps;
}
public void setSourceIps(List<String> sourceIps) {
this.sourceIps = sourceIps;
}
public String getDecision() {
return decision;
}
public void setDecision(String decision) {
this.decision = decision;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PolicyEntryInfo that = (PolicyEntryInfo) o;
return Objects.equals(resource, that.resource) &&
Objects.equals(actions, that.actions) &&
Objects.equals(sourceIps, that.sourceIps) &&
Objects.equals(decision, that.decision);
}
@Override
public int hashCode() {
return Objects.hash(resource, actions, sourceIps, decision);
}
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public List<PolicyInfo> getPolicies() {
return policies;
}
public void setPolicies(List<PolicyInfo> policies) {
this.policies = policies;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AclInfo aclInfo = (AclInfo) o;
return Objects.equals(subject, aclInfo.subject) &&
Objects.equals(policies, aclInfo.policies);
}
@Override
public int hashCode() {
return Objects.hash(subject, policies);
}
public void copyFrom(org.apache.rocketmq.remoting.protocol.body.AclInfo source) {
this.subject = source.getSubject();
if (source.getPolicies() != null) {
List<PolicyInfo> copiedPolicies = new ArrayList<>();
for (org.apache.rocketmq.remoting.protocol.body.AclInfo.PolicyInfo policy : source.getPolicies()) {
PolicyInfo copiedPolicy = new PolicyInfo();
copiedPolicy.setPolicyType(policy.getPolicyType());
if (policy.getEntries() != null) {
List<PolicyEntryInfo> copiedEntries = new ArrayList<>();
for (org.apache.rocketmq.remoting.protocol.body.AclInfo.PolicyEntryInfo entry : policy.getEntries()) {
PolicyEntryInfo copiedEntry = new PolicyEntryInfo();
copiedEntry.setResource(entry.getResource());
copiedEntry.setActions(new ArrayList<>(entry.getActions()));
copiedEntry.setSourceIps(new ArrayList<>(entry.getSourceIps()));
copiedEntry.setDecision(entry.getDecision());
copiedEntries.add(copiedEntry);
}
copiedPolicy.setEntries(copiedEntries);
}
copiedPolicies.add(copiedPolicy);
}
this.setPolicies(copiedPolicies);
} else {
this.setPolicies(null);
}
}
}

View File

@@ -25,7 +25,8 @@ import java.util.List;
@Getter @Getter
@Setter @Setter
public class PolicyRequest { public class PolicyRequest {
private String brokerAddress; private String clusterName;
private String brokerName;
private String subject; private String subject;
private List<Policy> policies; private List<Policy> policies;
} }

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserInfoDto {
private String username;
private String password;
private String userType;
private String userStatus;
public UserInfoDto setUserInfo(UserInfo userInfo) {
this.username = userInfo.getUsername();
this.password = userInfo.getPassword();
this.userType = userInfo.getUserType();
this.userStatus = userInfo.getUserStatus();
return this;
}
}

View File

@@ -23,6 +23,7 @@ import lombok.Setter;
@Setter @Setter
@Getter @Getter
public class UserCreateRequest { public class UserCreateRequest {
private String brokerAddress; private String clusterName;
private String brokerName;
private UserInfoParam userInfo; private UserInfoParam userInfo;
} }

View File

@@ -23,6 +23,7 @@ import lombok.Setter;
@Getter @Getter
@Setter @Setter
public class UserUpdateRequest { public class UserUpdateRequest {
private String brokerAddress; private String clusterName;
private String brokerName;
private UserInfoParam userInfo; private UserInfoParam userInfo;
} }

View File

@@ -20,24 +20,21 @@ package org.apache.rocketmq.dashboard.service;
import org.apache.rocketmq.dashboard.model.PolicyRequest; import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.request.UserInfoParam; import org.apache.rocketmq.dashboard.model.request.UserInfoParam;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import java.util.List;
public interface AclService { public interface AclService {
List<UserInfo> listUsers(String brokerAddress); Object listUsers(String clusterName, String brokerAddress);
Object listAcls(String brokerAddress, String searchParam); Object listAcls(String clusterName,String brokerAddress, String searchParam);
List<String> createAcl(PolicyRequest policyRequest); Object createAcl(PolicyRequest policyRequest);
void deleteUser(String brokerAddress, String username); void deleteUser(String clusterName,String brokerAddress, String username);
void updateUser(String brokerAddress, UserInfoParam userParam); void updateUser(String clusterName,String brokerAddress, UserInfoParam userParam);
void createUser(String brokerAddress, UserInfoParam userParam); void createUser(String clusterName,String brokerAddress, UserInfoParam userParam);
void deleteAcl(String brokerAddress, String subject, String resource); void deleteAcl(String clusterName,String brokerAddress, String subject, String resource);
void updateAcl(PolicyRequest policyRequest); void updateAcl(PolicyRequest policyRequest);
} }

View File

@@ -21,9 +21,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.dashboard.model.Entry; import org.apache.rocketmq.dashboard.model.Entry;
import org.apache.rocketmq.dashboard.model.Policy; import org.apache.rocketmq.dashboard.model.Policy;
import org.apache.rocketmq.dashboard.model.PolicyRequest; import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.UserInfoDto;
import org.apache.rocketmq.dashboard.model.request.UserInfoParam; import org.apache.rocketmq.dashboard.model.request.UserInfoParam;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.AclService; import org.apache.rocketmq.dashboard.service.AclService;
import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.remoting.protocol.body.AclInfo; import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -37,7 +41,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
@Service @Service
public class AclServiceImpl implements AclService { public class AclServiceImpl extends AbstractCommonService implements AclService {
private Logger logger = LoggerFactory.getLogger(AclServiceImpl.class); private Logger logger = LoggerFactory.getLogger(AclServiceImpl.class);
@@ -45,60 +49,117 @@ public class AclServiceImpl implements AclService {
@Autowired @Autowired
private MQAdminExt mqAdminExt; private MQAdminExt mqAdminExt;
@Autowired
private ClusterInfoService clusterInfoService;
@Override @Override
public List<UserInfo> listUsers(String brokerAddress) { public List<UserInfoDto> listUsers(String clusterName, String brokerName) {
List<String> brokerAddrList = getBrokerAddressList(clusterName, brokerName);
Set<UserInfoDto> commonUsers = new HashSet<>();
final boolean[] firstIteration = {true};
brokerAddrList.forEach(address -> {
List<UserInfo> userList; List<UserInfo> userList;
try { try {
userList = mqAdminExt.listUser(brokerAddress, ""); userList = mqAdminExt.listUser(address, "");
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to list users from broker: {}", brokerAddress, ex); logger.error("Failed to list users from broker: {}", address, ex);
throw new RuntimeException("Failed to list users", ex); throw new RuntimeException("Failed to list users", ex);
} }
if (userList == null || userList.isEmpty()) {
logger.warn("No users found for broker: {}", brokerAddress); List<UserInfoDto> userListDtos = new ArrayList<>();
return new ArrayList<>(); userList.forEach(user -> {
UserInfoDto userInfoDto = new UserInfoDto();
userListDtos.add(userInfoDto.setUserInfo(user));
});
if (!userList.isEmpty()) {
Set<UserInfoDto> currentUsers = new HashSet<>(userListDtos);
if (firstIteration[0]) {
commonUsers.addAll(userListDtos);
firstIteration[0] = false;
} else {
commonUsers.retainAll(currentUsers);
} }
return userList; } else {
logger.warn("No users found for broker: {}", address);
}
});
return new ArrayList<>(commonUsers);
} }
@Override @Override
public Object listAcls(String brokerAddress, String searchParam) { public Object listAcls(String clusterName, String brokerName, String searchParam) {
List<AclInfo> aclList; List<String> brokerAddrList = getBrokerAddressList(clusterName, brokerName);
Set<org.apache.rocketmq.dashboard.model.AclInfo> commonAcls = new HashSet<>();
final boolean[] firstIteration = {true};
ObjectMapper mapper = new ObjectMapper(); // Initialize ObjectMapper once
brokerAddrList.forEach(address -> {
List<AclInfo> aclListForBroker;
try { try {
String user = searchParam != null ? searchParam : ""; String user = searchParam != null ? searchParam : "";
String res = searchParam != null ? searchParam : ""; String res = searchParam != null ? searchParam : "";
aclList = mqAdminExt.listAcl(brokerAddress, user, ""); // Combine results from both listAcl calls for a single broker
if (aclList == null) { List<AclInfo> byUser = mqAdminExt.listAcl(address, user, "");
aclList = new ArrayList<>(); List<AclInfo> byRes = mqAdminExt.listAcl(address, "", res);
}
List<AclInfo> resAclList = mqAdminExt.listAcl(brokerAddress, "", res);
if (resAclList != null) {
aclList.addAll(resAclList);
}
} catch (Exception ex) {
logger.error("Failed to list ACLs from broker: {}", brokerAddress, ex);
throw new RuntimeException("Failed to list ACLs", ex);
}
ObjectMapper mapper = new ObjectMapper();
Set<String> uniqueAclStrings = new HashSet<>();
List<AclInfo> resultAclList = new ArrayList<>();
for (AclInfo acl : aclList) { aclListForBroker = new ArrayList<>();
if (byUser != null) {
aclListForBroker.addAll(byUser);
}
if (byRes != null) {
aclListForBroker.addAll(byRes);
}
// Deduplicate ACLs for the current broker to ensure accurate intersection
Set<AclInfo> uniqueAclsForBroker = new HashSet<>();
Set<String> uniqueAclStringsForBroker = new HashSet<>();
for (AclInfo acl : aclListForBroker) {
try { try {
String aclString = mapper.writeValueAsString(acl); String aclString = mapper.writeValueAsString(acl);
if (uniqueAclStrings.add(aclString)) { if (uniqueAclStringsForBroker.add(aclString)) {
resultAclList.add(acl); uniqueAclsForBroker.add(acl);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Error serializing AclInfo", e); logger.error("Error serializing AclInfo for broker {}: {}", address, e.getMessage());
} }
} }
return resultAclList; aclListForBroker = new ArrayList<>(uniqueAclsForBroker);
} catch (Exception ex) {
logger.error("Failed to list ACLs from broker: {}", address, ex);
throw new RuntimeException("Failed to list ACLs", ex);
}
List<org.apache.rocketmq.dashboard.model.AclInfo> aclInfoList = new ArrayList<>();
aclListForBroker.forEach(acl -> {
org.apache.rocketmq.dashboard.model.AclInfo aclInfo = new org.apache.rocketmq.dashboard.model.AclInfo();
aclInfo.copyFrom(acl);
aclInfoList.add(aclInfo);
});
if (!aclListForBroker.isEmpty()) {
Set<org.apache.rocketmq.dashboard.model.AclInfo> currentAcls = new HashSet<>(aclInfoList);
if (firstIteration[0]) {
commonAcls.addAll(currentAcls);
firstIteration[0] = false;
} else {
commonAcls.retainAll(currentAcls);
}
} else {
logger.warn("No ACLs found for broker: {}", address);
if (firstIteration[0]) {
firstIteration[0] = false;
} else {
commonAcls.clear(); // If any broker has no ACLs, the common set will be empty
}
}
});
return new ArrayList<>(commonAcls);
} }
@Override @Override
public List<String> createAcl(PolicyRequest policyRequest) { public Object createAcl(PolicyRequest policyRequest) {
List<String> successfulResources = new ArrayList<>(); List<String> successfulResources = new ArrayList<>();
if (policyRequest == null || policyRequest.getPolicies() == null || policyRequest.getPolicies().isEmpty()) { if (policyRequest == null || policyRequest.getPolicies() == null || policyRequest.getPolicies().isEmpty()) {
@@ -107,11 +168,13 @@ public class AclServiceImpl implements AclService {
} }
String subject = policyRequest.getSubject(); String subject = policyRequest.getSubject();
if (subject == null || subject.isEmpty()) { if (subject == null || subject.isEmpty()) {
throw new IllegalArgumentException("Subject cannot be null or empty."); throw new IllegalArgumentException("Subject cannot be null or empty.");
} }
// Get the broker address list for creating ACLs on all relevant brokers
List<String> brokerAddrList = getBrokerAddressList(policyRequest.getClusterName(), policyRequest.getBrokerName());
for (Policy policy : policyRequest.getPolicies()) { for (Policy policy : policyRequest.getPolicies()) {
if (policy.getEntries() != null && !policy.getEntries().isEmpty()) { if (policy.getEntries() != null && !policy.getEntries().isEmpty()) {
for (Entry entry : policy.getEntries()) { for (Entry entry : policy.getEntries()) {
@@ -136,90 +199,110 @@ public class AclServiceImpl implements AclService {
aclInfo.setPolicies(aclPolicies); aclInfo.setPolicies(aclPolicies);
aclInfo.setSubject(subject); aclInfo.setSubject(subject);
for (String brokerAddress : brokerAddrList) {
try { try {
logger.info("Attempting to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, policyRequest.getBrokerAddress()); logger.info("Attempting to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress);
mqAdminExt.createAcl(policyRequest.getBrokerAddress(), aclInfo); mqAdminExt.createAcl(brokerAddress, aclInfo);
successfulResources.add(resource); logger.info("Successfully created ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress);
logger.info("Successfully created ACL for subject: {}, resource: {}", subject, resource);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to create ACL for subject: {}, resource: {} on broker: {}", subject, resource, policyRequest.getBrokerAddress(), ex); throw new RuntimeException("Failed to create ACL on broker " + brokerAddress + ex.getMessage());
throw new RuntimeException("Failed to create ACL", ex);
} }
} }
} }
} }
} }
} }
return successfulResources; }
return true;
} }
@Override @Override
public void deleteUser(String brokerAddress, String username) { public void deleteUser(String clusterName, String brokerName, String username) {
List<String> brokerAddrList = getBrokerAddressList(clusterName, brokerName);
for (String address : brokerAddrList) {
try { try {
mqAdminExt.deleteUser(brokerAddress, username); mqAdminExt.deleteUser(address, username);
logger.info("Successfully deleted user: {} from broker: {}", username, address);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to delete user: {} from broker: {}", username, brokerAddress, ex); logger.error("Failed to delete user: {} from broker: {}", username, address, ex);
throw new RuntimeException("Failed to delete user", ex); throw new RuntimeException("Failed to delete user on broker " + address + ex.getMessage());
}
} }
} }
@Override @Override
public void updateUser(String brokerAddress, UserInfoParam userParam) { public void updateUser(String clusterName, String brokerName, UserInfoParam userParam) {
UserInfo user = new UserInfo(); UserInfo user = new UserInfo();
user.setUsername(userParam.getUsername()); user.setUsername(userParam.getUsername());
user.setPassword(userParam.getPassword()); user.setPassword(userParam.getPassword());
user.setUserStatus(userParam.getUserStatus()); user.setUserStatus(userParam.getUserStatus());
user.setUserType(userParam.getUserType()); user.setUserType(userParam.getUserType());
List<String> brokerAddrList = getBrokerAddressList(clusterName, brokerName);
for (String address : brokerAddrList) {
try { try {
mqAdminExt.updateUser(brokerAddress, user); mqAdminExt.updateUser(address, user);
logger.info("Successfully updated user: {} on broker: {}", userParam.getUsername(), address);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to update user: {} on broker: {}", userParam.getUsername(), brokerAddress, ex); logger.error("Failed to update user: {} on broker: {}", userParam.getUsername(), address, ex);
throw new RuntimeException("Failed to update user", ex); throw new RuntimeException("Failed to update user on broker " + address + ex.getMessage());
}
} }
} }
@Override @Override
public void createUser(String brokerAddress, UserInfoParam userParam) { public void createUser(String clusterName, String brokerName, UserInfoParam userParam) {
UserInfo user = new UserInfo(); UserInfo user = new UserInfo();
user.setUsername(userParam.getUsername()); user.setUsername(userParam.getUsername());
user.setPassword(userParam.getPassword()); user.setPassword(userParam.getPassword());
user.setUserStatus(userParam.getUserStatus()); user.setUserStatus(userParam.getUserStatus());
user.setUserType(userParam.getUserType()); user.setUserType(userParam.getUserType());
List<String> brokerAddrList = getBrokerAddressList(clusterName, brokerName);
for (String address : brokerAddrList) {
try { try {
mqAdminExt.createUser(brokerAddress, user); mqAdminExt.createUser(address, user);
logger.info("Successfully created user: {} on broker: {}", userParam.getUsername(), address);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to create user: {} on broker: {}", userParam.getUsername(), brokerAddress, ex); logger.error("Failed to create user: {} on broker: {}", userParam.getUsername(), address, ex);
throw new RuntimeException("Failed to create user", ex); throw new RuntimeException("Failed to create user on broker " + address + ex.getMessage());
}
} }
} }
@Override @Override
public void deleteAcl(String brokerAddress, String subject, String resource) { public void deleteAcl(String clusterName, String brokerName, String subject, String resource) {
try { List<String> brokerAddrList = getBrokerAddressList(clusterName, brokerName);
String res = resource != null ? resource : ""; String res = resource != null ? resource : "";
mqAdminExt.deleteAcl(brokerAddress, subject, res);
for (String address : brokerAddrList) {
try {
mqAdminExt.deleteAcl(address, subject, res);
logger.info("Successfully deleted ACL for subject: {} and resource: {} on broker: {}", subject, resource, address);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to delete ACL for subject: {} and resource: {} on broker: {}", subject, resource, brokerAddress, ex); logger.error("Failed to delete ACL for subject: {} and resource: {} on broker: {}", subject, resource, address, ex);
throw new RuntimeException("Failed to delete ACL", ex); throw new RuntimeException("Failed to delete ACL on broker " + address + ex.getMessage());
}
} }
} }
@Override @Override
public void updateAcl(PolicyRequest policyRequest) { public void updateAcl(PolicyRequest policyRequest) {
if (policyRequest == null || policyRequest.getPolicies() == null || policyRequest.getPolicies().isEmpty()) { if (policyRequest == null || policyRequest.getPolicies() == null || policyRequest.getPolicies().isEmpty()) {
logger.warn("Policy request is null or policies list is empty. No ACLs to update."); logger.warn("Policy request is null or policies list is empty. No ACLs to update.");
return;
} }
assert policyRequest != null;
String brokerAddress = policyRequest.getBrokerAddress();
String subject = policyRequest.getSubject(); String subject = policyRequest.getSubject();
if (subject == null || subject.isEmpty()) { if (subject == null || subject.isEmpty()) {
throw new IllegalArgumentException("Subject cannot be null or empty."); throw new IllegalArgumentException("Subject cannot be null or empty.");
} }
List<String> brokerAddrList = getBrokerAddressList(policyRequest.getClusterName(), policyRequest.getBrokerName());
for (Policy policy : policyRequest.getPolicies()) { for (Policy policy : policyRequest.getPolicies()) {
if (policy.getEntries() != null && !policy.getEntries().isEmpty()) { if (policy.getEntries() != null && !policy.getEntries().isEmpty()) {
for (Entry entry : policy.getEntries()) { for (Entry entry : policy.getEntries()) {
@@ -244,11 +327,15 @@ public class AclServiceImpl implements AclService {
aclInfo.setPolicies(aclPolicies); aclInfo.setPolicies(aclPolicies);
aclInfo.setSubject(subject); aclInfo.setSubject(subject);
for (String brokerAddress : brokerAddrList) {
try { try {
mqAdminExt.updateAcl(brokerAddress, aclInfo); mqAdminExt.updateAcl(brokerAddress, aclInfo);
logger.info("Successfully updated ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Failed to update ACL for subject: {} on broker: {}", subject, brokerAddress, ex); logger.error("Failed to update ACL for subject: {}, resource: {} on broker: {}", subject, resource, brokerAddress, ex);
throw new RuntimeException("Failed to update ACL", ex); throw new RuntimeException("Failed to update ACL on broker " + brokerAddress + ex.getMessage());
}
}
} }
} }
} }
@@ -256,6 +343,36 @@ public class AclServiceImpl implements AclService {
} }
} }
public List<String> getBrokerAddressList(String clusterName, String brokerName) {
ClusterInfo clusterInfo = clusterInfoService.get();
List<String> brokerAddressList = new ArrayList<>();
if (brokerName != null) {
for (String brokerNameKey : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
new ArrayList<>(), List.of(brokerName))) {
clusterInfo.getBrokerAddrTable()
.get(brokerNameKey)
.getBrokerAddrs()
.forEach((Long key, String value) -> brokerAddressList.add(value));
}
} else {
if (clusterName == null || clusterName.isEmpty()) {
logger.warn("Cluster name is null or empty. Cannot retrieve broker addresses.");
throw new IllegalArgumentException("Cluster name cannot be null or empty.");
}
if (clusterInfo == null || clusterInfo.getBrokerAddrTable() == null || clusterInfo.getBrokerAddrTable().isEmpty()) {
logger.warn("Cluster information is not available or has no broker addresses.");
throw new RuntimeException("Cluster information is not available or has no broker addresses.");
}
for (String brokerNameKey : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
List.of(clusterName), new ArrayList<>())) {
clusterInfo.getBrokerAddrTable()
.get(brokerNameKey)
.getBrokerAddrs()
.forEach((Long key, String value) -> brokerAddressList.add(value));
}
}
return brokerAddressList;
} }
} }

View File

@@ -210,7 +210,15 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
if (SYSTEM_GROUP_SET.contains(consumerGroup)) { if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
consumeInfo.setSubGroupType("SYSTEM"); consumeInfo.setSubGroupType("SYSTEM");
} else { } else {
try {
consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly() ? "FIFO" : "NORMAL"); consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly() ? "FIFO" : "NORMAL");
} catch (NullPointerException e) {
logger.warn("SubscriptionGroupConfig not found for consumer group: {}", consumerGroup);
boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup)
.stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig)
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
consumeInfo.setSubGroupType(isFifoType ? "FIFO" : "NORMAL");
}
} }
consumeInfo.setUpdateTime(new Date()); consumeInfo.setUpdateTime(new Date());
groupConsumeInfoList.add(consumeInfo); groupConsumeInfoList.add(consumeInfo);
@@ -275,17 +283,24 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override @Override
public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) { public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
ConsumeStats consumeStats; List<ConsumeStats> consumeStatses = new ArrayList<>();
String topic = null; String topic = null;
try { try {
String[] addresses = address.split(","); String[] addresses = address.split(",");
String addr = addresses[0]; for (String addr : addresses) {
consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000); consumeStatses.add(mqAdminExt.examineConsumeStats(addr, groupName, null, 3000));
}
} catch (Exception e) { } catch (Exception e) {
Throwables.throwIfUnchecked(e); Throwables.throwIfUnchecked(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return toTopicConsumerInfoList(topic, consumeStats, groupName); List<TopicConsumerInfo> res = new ArrayList<>();
consumeStatses.forEach(consumeStats -> {
if (consumeStats != null && consumeStats.getOffsetTable() != null && !consumeStats.getOffsetTable().isEmpty()) {
res.addAll(toTopicConsumerInfoList(topic, consumeStats, groupName));
}
});
return res;
} }
@Override @Override

View File

@@ -16,15 +16,16 @@
*/ */
package org.apache.rocketmq.dashboard.service.provider; package org.apache.rocketmq.dashboard.service.provider;
import org.apache.rocketmq.dashboard.service.ClusterInfoService; import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service @Service
public class UserInfoProviderImpl implements UserInfoProvider { public class UserInfoProviderImpl implements UserInfoProvider {

View File

@@ -17,32 +17,33 @@
package org.apache.rocketmq.dashboard.controller; package org.apache.rocketmq.dashboard.controller;
import com.google.gson.Gson;
import org.apache.rocketmq.auth.authentication.enums.UserStatus; import org.apache.rocketmq.auth.authentication.enums.UserStatus;
import org.apache.rocketmq.auth.authentication.enums.UserType; import org.apache.rocketmq.auth.authentication.enums.UserType;
import org.apache.rocketmq.auth.authorization.enums.Decision; import org.apache.rocketmq.dashboard.model.UserInfoDto;
import org.apache.rocketmq.dashboard.model.Policy;
import org.apache.rocketmq.dashboard.model.PolicyRequest;
import org.apache.rocketmq.dashboard.model.request.UserCreateRequest; import org.apache.rocketmq.dashboard.model.request.UserCreateRequest;
import org.apache.rocketmq.dashboard.model.request.UserInfoParam; import org.apache.rocketmq.dashboard.model.request.UserInfoParam;
import org.apache.rocketmq.dashboard.model.request.UserUpdateRequest; import org.apache.rocketmq.dashboard.model.request.UserUpdateRequest;
import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl; import org.apache.rocketmq.dashboard.service.impl.AclServiceImpl;
import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler; import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public class AclControllerTest extends BaseControllerTest { public class AclControllerTest extends BaseControllerTest {
@@ -52,189 +53,193 @@ public class AclControllerTest extends BaseControllerTest {
@InjectMocks @InjectMocks
private AclController aclController; private AclController aclController;
private final Gson gson = new Gson();
@Before @Before
public void init() { public void init() {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
mockMvc = MockMvcBuilders.standaloneSetup(aclController).setControllerAdvice(GlobalExceptionHandler.class).build(); mockMvc = MockMvcBuilders.standaloneSetup(aclController).setControllerAdvice(GlobalExceptionHandler.class).build();
} }
@Test @Test
public void testListUsers() { public void testListUsers() throws Exception {
// Prepare test data // Prepare test data
String brokerAddress = "localhost:10911"; String clusterName = "test-cluster";
List<UserInfo> expectedUsers = Arrays.asList( String brokerName = "localhost:10911";
UserInfo.of("user1", "password1", "super"), List<UserInfoDto> expectedUsers = Arrays.asList(
UserInfo.of("user2", "password2", "super") new UserInfoDto("user1", "password1", "super","enable"),
new UserInfoDto("user2", "password2", "super","enable")
); );
// Mock service behavior // Mock service behavior
when(aclService.listUsers(brokerAddress)).thenReturn(expectedUsers); when(aclService.listUsers(clusterName, brokerName)).thenReturn(expectedUsers);
// Call controller method // Call controller method via MockMVC
List<UserInfo> result = aclController.listUsers(brokerAddress); mockMvc.perform(MockMvcRequestBuilders.get("/acl/users.query")
.param("clusterName", clusterName)
.param("brokerName", brokerName))
.andExpect(status().isOk())
.andExpect(result -> {
List<UserInfoDto> actualUsers = gson.fromJson(result.getResponse().getContentAsString(), List.class);
// Due to Gson's deserialization of List to LinkedTreeMap, direct assertEquals on List<UserInfo> won't work easily.
// A more robust comparison would involve iterating or using a custom matcher if UserInfoDto doesn't override equals/hashCode.
// For simplicity, let's assume UserInfoDto has proper equals/hashCode for now or convert to JSON string for comparison.
assertEquals(gson.toJson(expectedUsers), result.getResponse().getContentAsString());
});
// Verify // Verify
assertEquals(expectedUsers, result); verify(aclService, times(1)).listUsers(clusterName, brokerName);
verify(aclService, times(1)).listUsers(brokerAddress);
} }
@Test @Test
public void testListUsersWithoutBrokerAddress() { public void testListUsersWithoutBrokerAddressAndClusterName() throws Exception {
// Prepare test data // Prepare test data
List<UserInfo> expectedUsers = Arrays.asList( List<UserInfoDto> expectedUsers = Arrays.asList(
UserInfo.of("user1", "password1", "super") new UserInfoDto("user2", "password2", "super","enable")
); );
// Mock service behavior // Mock service behavior
when(aclService.listUsers(null)).thenReturn(expectedUsers); when(aclService.listUsers(null, null)).thenReturn(expectedUsers);
// Call controller method
List<UserInfo> result = aclController.listUsers(null);
// Verify
assertEquals(expectedUsers, result);
verify(aclService, times(1)).listUsers(null);
}
@Test // Call controller method via MockMVC
public void testListAcls() { mockMvc.perform(MockMvcRequestBuilders.get("/acl/users.query"))
// Prepare test data .andExpect(status().isOk())
String brokerAddress = "localhost:9092"; .andExpect(result -> assertEquals(gson.toJson(expectedUsers), result.getResponse().getContentAsString()));
String searchParam = "user1";
Object expectedAcls = Arrays.asList(
AclInfo.of("user1", List.of("READ", "test"), List.of("TOPIC:test"), List.of("localhost:10911"), Decision.ALLOW.getName())
);
// Mock service behavior
when(aclService.listAcls(brokerAddress, searchParam)).thenReturn(expectedAcls);
// Call controller method
Object result = aclController.listAcls(brokerAddress, searchParam);
// Verify // Verify
assertEquals(expectedAcls, result); verify(aclService, times(1)).listUsers(null, null);
verify(aclService, times(1)).listAcls(brokerAddress, searchParam);
} }
@Test
public void testCreateAcl() {
// Prepare test data
PolicyRequest request = new PolicyRequest();
request.setBrokerAddress("localhost:9092");
request.setSubject("user1");
request.setPolicies(List.of(
new Policy()
));
// Call controller method
Object result = aclController.createAcl(request);
// Verify
assertEquals(true, result);
verify(aclService, times(1)).createAcl(request);
}
@Test @Test
public void testDeleteUser() { public void testDeleteUser() throws Exception {
// Prepare test data // Prepare test data
String brokerAddress = "localhost:9092"; String clusterName = "test-cluster";
String brokerName = "localhost:9092";
String username = "user1"; String username = "user1";
// Call controller method // Mock service behavior (void method)
Object result = aclController.deleteUser(brokerAddress, username); doNothing().when(aclService).deleteUser(clusterName, brokerName, username);
// Call controller method via MockMVC
mockMvc.perform(delete("/acl/deleteUser.do")
.param("clusterName", clusterName)
.param("brokerName", brokerName)
.param("username", username))
.andExpect(status().isOk())
.andExpect(result -> assertEquals("true", result.getResponse().getContentAsString()));
// Verify // Verify
assertEquals(true, result); verify(aclService, times(1)).deleteUser(clusterName, brokerName, username);
verify(aclService, times(1)).deleteUser(brokerAddress, username);
} }
@Test @Test
public void testDeleteUserWithoutBrokerAddress() { public void testDeleteUserWithoutBrokerAddressAndClusterName() throws Exception {
// Prepare test data // Prepare test data
String username = "user1"; String username = "user1";
// Call controller method // Mock service behavior (void method)
Object result = aclController.deleteUser(null, username); doNothing().when(aclService).deleteUser(null, null, username);
// Call controller method via MockMVC
mockMvc.perform(delete("/acl/deleteUser.do")
.param("username", username))
.andExpect(status().isOk())
.andExpect(result -> assertEquals("true", result.getResponse().getContentAsString()));
// Verify // Verify
assertEquals(true, result); verify(aclService, times(1)).deleteUser(null, null, username);
verify(aclService, times(1)).deleteUser(null, username);
} }
@Test @Test
public void testUpdateUser() { public void testUpdateUser() throws Exception {
// Prepare test data // Prepare test data
UserUpdateRequest request = new UserUpdateRequest(); UserUpdateRequest request = new UserUpdateRequest();
request.setBrokerAddress("localhost:9092"); request.setClusterName("test-cluster");
request.setBrokerName("localhost:9092");
request.setUserInfo(new UserInfoParam("user1", "newPassword", UserStatus.ENABLE.getName(), UserType.SUPER.getName())); request.setUserInfo(new UserInfoParam("user1", "newPassword", UserStatus.ENABLE.getName(), UserType.SUPER.getName()));
// Call controller method // Mock service behavior (void method)
Object result = aclController.updateUser(request); doNothing().when(aclService).updateUser(request.getClusterName(), request.getBrokerName(), request.getUserInfo());
// Call controller method via MockMVC
mockMvc.perform(MockMvcRequestBuilders.post("/acl/updateUser.do")
.contentType(MediaType.APPLICATION_JSON)
.content(gson.toJson(request)))
.andExpect(status().isOk())
.andExpect(result -> assertEquals("true", result.getResponse().getContentAsString()));
// Verify // Verify
assertEquals(true, result); verify(aclService, times(1)).updateUser(request.getClusterName(), request.getBrokerName(), request.getUserInfo());
verify(aclService, times(1)).updateUser(request.getBrokerAddress(), request.getUserInfo());
} }
@Test @Test
public void testCreateUser() { public void testCreateUser() throws Exception {
// Prepare test data // Prepare test data
UserCreateRequest request = new UserCreateRequest(); UserCreateRequest request = new UserCreateRequest();
request.setBrokerAddress("localhost:9092"); request.setClusterName("test-cluster");
request.setBrokerName("localhost:9092");
request.setUserInfo(new UserInfoParam("user1", "newPassword", UserStatus.ENABLE.getName(), UserType.SUPER.getName())); request.setUserInfo(new UserInfoParam("user1", "newPassword", UserStatus.ENABLE.getName(), UserType.SUPER.getName()));
// Call controller method // Mock service behavior (void method)
Object result = aclController.createUser(request); doNothing().when(aclService).createUser(request.getClusterName(), request.getBrokerName(), request.getUserInfo());
// Call controller method via MockMVC
mockMvc.perform(MockMvcRequestBuilders.post("/acl/createUser.do")
.contentType(MediaType.APPLICATION_JSON)
.content(gson.toJson(request)))
.andExpect(status().isOk())
.andExpect(result -> assertEquals("true", result.getResponse().getContentAsString()));
// Verify // Verify
assertEquals(true, result); verify(aclService, times(1)).createUser(request.getClusterName(), request.getBrokerName(), request.getUserInfo());
verify(aclService, times(1)).createUser(request.getBrokerAddress(), request.getUserInfo());
} }
@Test @Test
public void testDeleteAcl() { public void testDeleteAcl() throws Exception {
// Prepare test data // Prepare test data
String brokerAddress = "localhost:9092"; String clusterName = "test-cluster";
String brokerName = "localhost:9092";
String subject = "user1"; String subject = "user1";
String resource = "TOPIC:test"; String resource = "TOPIC:test";
// Call controller method // Mock service behavior (void method)
Object result = aclController.deleteAcl(brokerAddress, subject, resource); doNothing().when(aclService).deleteAcl(clusterName, brokerName, subject, resource);
// Call controller method via MockMVC
mockMvc.perform(delete("/acl/deleteAcl.do")
.param("clusterName", clusterName)
.param("brokerName", brokerName)
.param("subject", subject)
.param("resource", resource))
.andExpect(status().isOk())
.andExpect(result -> assertEquals("true", result.getResponse().getContentAsString()));
// Verify // Verify
assertEquals(true, result); verify(aclService, times(1)).deleteAcl(clusterName, brokerName, subject, resource);
verify(aclService, times(1)).deleteAcl(brokerAddress, subject, resource);
} }
@Test @Test
public void testDeleteAclWithoutBrokerAddressAndResource() { public void testDeleteAclWithoutBrokerAddressAndResourceAndClusterName() throws Exception {
// Prepare test data // Prepare test data
String subject = "user1"; String subject = "user1";
// Call controller method // Mock service behavior (void method)
Object result = aclController.deleteAcl(null, subject, null); doNothing().when(aclService).deleteAcl(null, null, subject, null);
// Call controller method via MockMVC
mockMvc.perform(delete("/acl/deleteAcl.do")
.param("subject", subject))
.andExpect(status().isOk())
.andExpect(result -> assertEquals("true", result.getResponse().getContentAsString()));
// Verify // Verify
assertEquals(true, result); verify(aclService, times(1)).deleteAcl(null, null, subject, null);
verify(aclService, times(1)).deleteAcl(null, subject, null);
} }
@Test
public void testUpdateAcl() {
// Prepare test data
PolicyRequest request = new PolicyRequest();
request.setBrokerAddress("localhost:9092");
request.setSubject("user1");
request.setPolicies(List.of(
new Policy()
));
// Call controller method
Object result = aclController.updateAcl(request);
// Verify
assertEquals(true, result);
verify(aclService, times(1)).updateAcl(request);
}
@Override @Override
protected Object getTestController() { protected Object getTestController() {