# ============================================================================= # 企微IT智能服务台 — RBAC 细粒度权限服务 (v0.7.1 task #86) # ============================================================================= # 设计: 5 角色 × 4 资源 × 4 操作 × 3 数据范围 # # 角色: # 1. user — 普通员工(默认, 无管理权限) # 2. agent — 坐席(处理会话) # 3. team_lead — 团队主管(团队管理 + 报告) # 4. auditor — 审计员(只读跨部门) # 5. admin — 超级管理员(全权限) # # 资源 (resource): # 1. conversation — 会话 # 2. agent — 坐席 # 3. system_config — 系统配置 # 4. audit_log — 审计日志 # # 操作 (action): # 1. read — 查看 # 2. create — 创建 # 3. update — 修改 # 4. delete — 删除 # # 数据范围 (scope): # 1. own — 自己的(agent 只能看自己接的会话) # 2. department — 部门的 # 3. all — 全部(管理员 / 审计员) # # 权限字符串格式: "resource:action:scope" # 例: "conversation:read:all" # 通配符: "*:*:all" 表示全权限(仅 admin) # ============================================================================= import logging from typing import Dict, FrozenSet, List, Set, Tuple logger = logging.getLogger(__name__) # 5 角色的权限矩阵 # 格式: role_name -> Set[(resource, action, scope)] ROLE_PERMISSIONS: Dict[str, Set[Tuple[str, str, str]]] = { # 普通员工 — 仅创建自己的会话 "user": { ("conversation", "create", "own"), ("conversation", "read", "own"), }, # 坐席 — 处理分配给自己的会话,可读所有未分配的 "agent": { ("conversation", "read", "own"), ("conversation", "read", "all"), # 看所有未分配的会话(坐席工作台需要) ("conversation", "update", "own"), ("conversation", "create", "all"), }, # 团队主管 — 坐席权限 + 看本部门 + 管本部门坐席 "team_lead": { ("conversation", "read", "department"), ("conversation", "update", "department"), ("conversation", "create", "all"), ("agent", "read", "department"), ("agent", "update", "department"), # 改本部门坐席状态 }, # 审计员 — 只读,跨部门 "auditor": { ("conversation", "read", "all"), ("agent", "read", "all"), ("system_config", "read", "all"), ("audit_log", "read", "all"), }, # 超级管理员 — 全权限 "admin": { ("*", "*", "all"), # 通配符,表示所有 (resource, action, all) }, } # 角色元数据(显示名 + 描述) ROLE_METADATA: Dict[str, Dict[str, str]] = { "user": { "display_name": "普通员工", "description": "提交工单、查看自己的会话", "is_default": "true", }, "agent": { "display_name": "IT 坐席", "description": "处理分配给自己的会话,可读所有未分配会话", "is_default": "false", }, "team_lead": { "display_name": "团队主管", "description": "管理本部门坐席,看本部门所有会话", "is_default": "false", }, "auditor": { "display_name": "审计员", "description": "只读跨部门数据,合规审计专用", "is_default": "false", }, "admin": { "display_name": "超级管理员", "description": "全权限,需 MFA 二次验证执行高危操作", "is_default": "false", }, } def permissions_to_strings(perms: Set[Tuple[str, str, str]]) -> List[str]: """把权限元组集合转字符串列表(用于存 JSON)。""" return [f"{r}:{a}:{s}" for (r, a, s) in sorted(perms)] def strings_to_permissions(items: List[str]) -> Set[Tuple[str, str, str]]: """把字符串列表(从 JSON 读)转回元组集合。""" result = set() for item in items or []: parts = item.split(":") if len(parts) == 3: result.add((parts[0], parts[1], parts[2])) return result def check_permission( user_roles: List[str], user_permissions: Dict[str, List[str]], required_resource: str, required_action: str, required_scope: str = "own", ) -> bool: """检查用户是否拥有所需权限(细粒度)。 规则: 1. 用户所有角色中,任一角色的 permissions 包含所需权限 → 通过 2. admin 角色拥有 *:*:all → 永远通过 3. scope 比较: own < department < all (更高的 scope 满足更低的) 例: 用户有 department 权限, 申请 own → 通过 用户有 all 权限, 申请 department → 通过 Args: user_roles: 用户的角色列表(角色名) user_permissions: {role_name: [perm_string]} 角色权限字典 required_resource: 所需资源 required_action: 所需操作 required_scope: 所需数据范围(own/department/all) Returns: bool: 是否通过 """ SCOPE_RANK = {"own": 1, "department": 2, "all": 3} required_rank = SCOPE_RANK.get(required_scope, 1) for role in user_roles: perms = strings_to_permissions(user_permissions.get(role, [])) for (r, a, s) in perms: # 1. admin 通配符 if r == "*" and a == "*" and s == "all": return True # 2. 资源/操作必须精确匹配(通配符不向下展开,避免误授权) if r != required_resource or a != required_action: continue # 3. scope 满足"≥"即可(更高的 scope 满足更低的) actual_rank = SCOPE_RANK.get(s, 0) if actual_rank >= required_rank: return True return False def get_role_default_permissions(role_name: str) -> List[str]: """获取角色的默认权限列表(用于种子数据初始化)。""" perms = ROLE_PERMISSIONS.get(role_name, set()) return permissions_to_strings(perms) # 资源/操作/范围的合法值(用于前端下拉框 + 后端校验) VALID_RESOURCES = ["conversation", "agent", "system_config", "audit_log"] VALID_ACTIONS = ["read", "create", "update", "delete"] VALID_SCOPES = ["own", "department", "all"] def validate_permission_string(perm: str) -> bool: """校验权限字符串格式是否合法。 例: "conversation:read:all" → True "foo:bar:baz" → False """ parts = perm.split(":") if len(parts) != 3: return False r, a, s = parts # 资源: 支持通配符 * 或合法值 if r != "*" and r not in VALID_RESOURCES: return False # 操作: 支持通配符 * 或合法值 if a != "*" and a not in VALID_ACTIONS: return False # 范围: 不支持通配符,必须是合法值 if s not in VALID_SCOPES: return False return True