153 lines
4.8 KiB
Python
153 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
为用户分配角色
|
||
|
||
运行方式:
|
||
cd backend
|
||
python scripts/assign_role.py <employee_id> <role_name>
|
||
|
||
示例:
|
||
python scripts/assign_role.py zhangsan agent
|
||
python scripts/assign_role.py lisi admin
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
# 添加 backend 目录到 Python 路径
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from sqlalchemy import create_engine, select
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.config import settings
|
||
from app.models import Role, UserRole
|
||
|
||
|
||
def assign_role(employee_id: str, role_name: str):
|
||
"""为指定用户分配角色"""
|
||
|
||
# 本地开发使用 aiosqlite 异步驱动,脚本是同步的,需要替换
|
||
db_url = settings.database_url.replace("sqlite+aiosqlite://", "sqlite://")
|
||
engine = create_engine(db_url)
|
||
|
||
with Session(engine) as session:
|
||
# 1. 查找角色
|
||
role = session.execute(select(Role).where(Role.name == role_name)).scalars().first()
|
||
if not role:
|
||
print(f"[FAIL] 角色 '{role_name}' 不存在")
|
||
print("可用角色: user, agent, admin")
|
||
return False
|
||
|
||
# 2. 检查是否已有该角色
|
||
existing = session.execute(
|
||
select(UserRole).where(
|
||
UserRole.employee_id == employee_id,
|
||
UserRole.role_id == role.id,
|
||
)
|
||
).scalars().first()
|
||
|
||
if existing:
|
||
print(f"[WARN] 用户 {employee_id} 已拥有角色 {role_name}")
|
||
return True
|
||
|
||
# 3. 分配角色
|
||
user_role = UserRole(
|
||
id=str(uuid.uuid4()),
|
||
employee_id=employee_id,
|
||
role_id=role.id,
|
||
source="manual", # 手动分配
|
||
assigned_at=datetime.now(),
|
||
)
|
||
session.add(user_role)
|
||
session.commit()
|
||
|
||
print(f"[OK] 已为用户 {employee_id} 分配角色 {role.display_name} ({role_name})")
|
||
return True
|
||
|
||
|
||
def remove_role(employee_id: str, role_name: str):
|
||
"""移除用户的指定角色"""
|
||
|
||
db_url = settings.database_url.replace("sqlite+aiosqlite://", "sqlite://")
|
||
engine = create_engine(db_url)
|
||
|
||
with Session(engine) as session:
|
||
# 查找角色
|
||
role = session.execute(select(Role).where(Role.name == role_name)).scalars().first()
|
||
if not role:
|
||
print(f"[FAIL] 角色 '{role_name}' 不存在")
|
||
return False
|
||
|
||
# 查找用户角色关联
|
||
user_role = session.execute(
|
||
select(UserRole).where(
|
||
UserRole.employee_id == employee_id,
|
||
UserRole.role_id == role.id,
|
||
)
|
||
).scalars().first()
|
||
|
||
if not user_role:
|
||
print(f"[WARN] 用户 {employee_id} 未拥有角色 {role_name}")
|
||
return True
|
||
|
||
# 移除角色
|
||
session.delete(user_role)
|
||
session.commit()
|
||
|
||
print(f"[OK] 已移除用户 {employee_id} 的角色 {role.display_name} ({role_name})")
|
||
return True
|
||
|
||
|
||
def list_user_roles(employee_id: str):
|
||
"""列出用户的所有角色"""
|
||
|
||
db_url = settings.database_url.replace("sqlite+aiosqlite://", "sqlite://")
|
||
engine = create_engine(db_url)
|
||
|
||
with Session(engine) as session:
|
||
# 查询用户的所有角色
|
||
user_roles = session.execute(
|
||
select(UserRole, Role)
|
||
.join(Role, UserRole.role_id == Role.id)
|
||
.where(UserRole.employee_id == employee_id)
|
||
).all()
|
||
|
||
if not user_roles:
|
||
print(f"用户 {employee_id} 暂无分配角色(默认为 user)")
|
||
return
|
||
|
||
print(f"用户 {employee_id} 的角色列表:")
|
||
for user_role, role in user_roles:
|
||
print(f" - {role.name}: {role.display_name} (分配方式: {user_role.source})")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) < 2:
|
||
print("用法:")
|
||
print(" 分配角色: python assign_role.py <employee_id> <role_name>")
|
||
print(" 移除角色: python assign_role.py <employee_id> <role_name> --remove")
|
||
print(" 查看角色: python assign_role.py <employee_id> --list")
|
||
print("")
|
||
print("示例:")
|
||
print(" python assign_role.py zhangsan agent")
|
||
print(" python assign_role.py lisi admin")
|
||
print(" python assign_role.py zhangsan --list")
|
||
sys.exit(1)
|
||
|
||
employee_id = sys.argv[1]
|
||
|
||
if "--list" in sys.argv:
|
||
list_user_roles(employee_id)
|
||
elif "--remove" in sys.argv and len(sys.argv) >= 4:
|
||
role_name = sys.argv[2]
|
||
remove_role(employee_id, role_name)
|
||
elif len(sys.argv) >= 3 and not sys.argv[2].startswith("--"):
|
||
role_name = sys.argv[2]
|
||
assign_role(employee_id, role_name)
|
||
else:
|
||
print("[FAIL] 参数错误,请查看用法")
|
||
sys.exit(1)
|