574 lines
21 KiB
Python
574 lines
21 KiB
Python
# =============================================================================
|
||
# 企微IT智能服务台 — 企微 API 封装服务
|
||
# =============================================================================
|
||
# 说明:封装所有与企微服务器的交互逻辑,包括:
|
||
# 1. access_token 管理(Redis 缓存 + 自动刷新)
|
||
# 2. 发送消息(文本/图片/文件)
|
||
# 3. 获取员工信息(通讯录 API)
|
||
# 4. 上传临时素材
|
||
# 5. OAuth2 授权换算用户身份
|
||
# =============================================================================
|
||
|
||
import json
|
||
import logging
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import httpx
|
||
import redis.asyncio as aioredis
|
||
|
||
from app.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class WecomService:
|
||
"""企微 API 调用服务。
|
||
|
||
封装所有与企微服务器的 HTTP 交互,提供异步方法。
|
||
access_token 通过 Redis 缓存管理,避免频繁调用获取接口。
|
||
|
||
Attributes:
|
||
redis: Redis 异步客户端(用于缓存 access_token)
|
||
client: httpx 异步 HTTP 客户端
|
||
"""
|
||
|
||
def __init__(self, redis_client: Optional[aioredis.Redis] = None):
|
||
"""初始化企微服务。
|
||
|
||
Args:
|
||
redis_client: Redis 异步客户端实例(可为 None,本地开发时 Redis 不可用)
|
||
"""
|
||
self.redis = redis_client
|
||
# 创建 httpx 异步客户端
|
||
# timeout: 连接超时5秒,读取超时10秒
|
||
self.client = httpx.AsyncClient(
|
||
timeout=httpx.Timeout(connect=5.0, read=10.0, write=10.0, pool=5.0)
|
||
)
|
||
# 内存缓存(Redis 不可用时的降级方案)
|
||
self._token_cache: Optional[str] = None
|
||
|
||
# --------------------------------------------------------------------------
|
||
# access_token 管理
|
||
# --------------------------------------------------------------------------
|
||
async def get_access_token(self) -> str:
|
||
"""获取企微 access_token。
|
||
|
||
优先从 Redis 缓存获取,如果缓存不存在或即将过期则重新获取。
|
||
access_token 有效期 7200 秒,缓存 TTL 设为 6900 秒(提前 300 秒刷新)。
|
||
|
||
对应企微API:
|
||
GET https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET
|
||
|
||
Returns:
|
||
str: access_token 字符串
|
||
|
||
Raises:
|
||
Exception: 获取 access_token 失败
|
||
"""
|
||
# Redis 缓存 key
|
||
cache_key = "wecom:access_token"
|
||
|
||
# 1. 尝试从 Redis 缓存获取
|
||
if self.redis:
|
||
try:
|
||
cached_token = await self.redis.get(cache_key)
|
||
if cached_token:
|
||
logger.debug("从缓存获取 access_token")
|
||
return cached_token.decode("utf-8")
|
||
except Exception as e:
|
||
logger.warning(f"Redis 读取失败(降级): {e}")
|
||
|
||
# 1b. 尝试从内存缓存获取
|
||
if self._token_cache:
|
||
logger.debug("从内存缓存获取 access_token")
|
||
return self._token_cache
|
||
|
||
# 2. 缓存未命中,调用企微 API 获取
|
||
logger.info("缓存未命中,调用企微API获取 access_token")
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
|
||
params = {
|
||
"corpid": settings.wecom_corp_id,
|
||
"corpsecret": settings.wecom_secret,
|
||
}
|
||
|
||
try:
|
||
response = await self.client.get(url, params=params)
|
||
result = response.json()
|
||
|
||
# 检查企微API返回码
|
||
if result.get("errcode") != 0:
|
||
error_msg = result.get("errmsg", "未知错误")
|
||
logger.error(f"获取 access_token 失败: errcode={result.get('errcode')}, errmsg={error_msg}")
|
||
raise Exception(f"企微API错误: {error_msg}")
|
||
|
||
access_token = result["access_token"]
|
||
expires_in = result.get("expires_in", 7200)
|
||
|
||
# 3. 缓存到 Redis,TTL = 有效期 - 300秒(提前刷新)
|
||
buffer_seconds = 300
|
||
cache_ttl = max(expires_in - buffer_seconds, 60) # 至少缓存 60 秒
|
||
if self.redis:
|
||
try:
|
||
await self.redis.setex(cache_key, cache_ttl, access_token)
|
||
except Exception as e:
|
||
logger.warning(f"Redis 写入失败(降级): {e}")
|
||
|
||
# 3b. 同时缓存到内存
|
||
self._token_cache = access_token
|
||
|
||
logger.info(f"access_token 获取成功,缓存 TTL={cache_ttl}秒")
|
||
return access_token
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"获取 access_token 网络错误: {e}")
|
||
raise Exception(f"企微API网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 发送文本消息
|
||
# --------------------------------------------------------------------------
|
||
async def send_text_message(
|
||
self, user_id: str, content: str
|
||
) -> Dict[str, Any]:
|
||
"""向员工发送文本消息。
|
||
|
||
对应企微API:
|
||
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
|
||
|
||
请求体:
|
||
{
|
||
"touser": "UserID",
|
||
"msgtype": "text",
|
||
"agentid": 1000002,
|
||
"text": {"content": "消息内容"}
|
||
}
|
||
|
||
Args:
|
||
user_id: 员工的企微 UserID
|
||
content: 消息内容(纯文本)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 企微API返回结果
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
|
||
payload = {
|
||
"touser": user_id,
|
||
"msgtype": "text",
|
||
"agentid": int(settings.wecom_agent_id),
|
||
"text": {"content": content},
|
||
}
|
||
|
||
try:
|
||
response = await self.client.post(url, json=payload)
|
||
result = response.json()
|
||
|
||
if result.get("errcode") != 0:
|
||
logger.error(
|
||
f"发送文本消息失败: user_id={user_id}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
else:
|
||
logger.info(f"发送文本消息成功: user_id={user_id}")
|
||
|
||
return result
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"发送文本消息网络错误: user_id={user_id}, error={e}")
|
||
raise Exception(f"发送消息网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 发送卡片消息
|
||
# --------------------------------------------------------------------------
|
||
async def send_card_message(
|
||
self,
|
||
user_id: str,
|
||
title: str,
|
||
description: str,
|
||
url: str = "",
|
||
btntxt: str = "详情",
|
||
) -> Dict[str, Any]:
|
||
"""向员工发送文本卡片消息。
|
||
|
||
对应企微API:
|
||
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
|
||
|
||
请求体:
|
||
{
|
||
"touser": "UserID",
|
||
"msgtype": "textcard",
|
||
"agentid": 1000002,
|
||
"textcard": {
|
||
"title": "标题",
|
||
"description": "描述",
|
||
"url": "链接",
|
||
"btntxt": "按钮文字"
|
||
}
|
||
}
|
||
|
||
Args:
|
||
user_id: 员工的企微 UserID
|
||
title: 卡片标题
|
||
description: 卡片描述
|
||
url: 卡片点击跳转链接
|
||
btntxt: 按钮文字(默认"详情")
|
||
|
||
Returns:
|
||
Dict[str, Any]: 企微API返回结果
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url_api = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
|
||
payload = {
|
||
"touser": user_id,
|
||
"msgtype": "textcard",
|
||
"agentid": int(settings.wecom_agent_id),
|
||
"textcard": {
|
||
"title": title,
|
||
"description": description,
|
||
"url": url,
|
||
"btntxt": btntxt,
|
||
},
|
||
}
|
||
|
||
try:
|
||
response = await self.client.post(url_api, json=payload)
|
||
result = response.json()
|
||
|
||
if result.get("errcode") != 0:
|
||
logger.error(
|
||
f"发送卡片消息失败: user_id={user_id}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
else:
|
||
logger.info(f"发送卡片消息成功: user_id={user_id}")
|
||
|
||
return result
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"发送卡片消息网络错误: user_id={user_id}, error={e}")
|
||
raise Exception(f"发送消息网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 发送图片消息
|
||
# --------------------------------------------------------------------------
|
||
async def send_image_message(
|
||
self, user_id: str, media_id: str
|
||
) -> Dict[str, Any]:
|
||
"""向员工发送图片消息。
|
||
|
||
对应企微API:
|
||
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
|
||
|
||
请求体:
|
||
{
|
||
"touser": "UserID",
|
||
"msgtype": "image",
|
||
"agentid": 1000002,
|
||
"image": {"media_id": "MEDIA_ID"}
|
||
}
|
||
|
||
注意:发送图片前需要先通过 upload_temp_media 上传图片获取 media_id。
|
||
|
||
Args:
|
||
user_id: 员工的企微 UserID
|
||
media_id: 图片媒体ID(通过上传临时素材获取)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 企微API返回结果
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
|
||
payload = {
|
||
"touser": user_id,
|
||
"msgtype": "image",
|
||
"agentid": int(settings.wecom_agent_id),
|
||
"image": {"media_id": media_id},
|
||
}
|
||
|
||
try:
|
||
response = await self.client.post(url, json=payload)
|
||
result = response.json()
|
||
|
||
if result.get("errcode") != 0:
|
||
logger.error(
|
||
f"发送图片消息失败: user_id={user_id}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
else:
|
||
logger.info(f"发送图片消息成功: user_id={user_id}")
|
||
|
||
return result
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"发送图片消息网络错误: user_id={user_id}, error={e}")
|
||
raise Exception(f"发送消息网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 发送文件消息
|
||
# --------------------------------------------------------------------------
|
||
async def send_file_message(
|
||
self, user_id: str, media_id: str
|
||
) -> Dict[str, Any]:
|
||
"""向员工发送文件消息。
|
||
|
||
对应企微API:
|
||
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
|
||
|
||
请求体:
|
||
{
|
||
"touser": "UserID",
|
||
"msgtype": "file",
|
||
"agentid": 1000002,
|
||
"file": {"media_id": "MEDIA_ID"}
|
||
}
|
||
|
||
注意:发送文件前需要先通过 upload_temp_media 上传文件获取 media_id。
|
||
|
||
Args:
|
||
user_id: 员工的企微 UserID
|
||
media_id: 文件媒体ID(通过上传临时素材获取)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 企微API返回结果
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
|
||
payload = {
|
||
"touser": user_id,
|
||
"msgtype": "file",
|
||
"agentid": int(settings.wecom_agent_id),
|
||
"file": {"media_id": media_id},
|
||
}
|
||
|
||
try:
|
||
response = await self.client.post(url, json=payload)
|
||
result = response.json()
|
||
|
||
if result.get("errcode") != 0:
|
||
logger.error(
|
||
f"发送文件消息失败: user_id={user_id}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
else:
|
||
logger.info(f"发送文件消息成功: user_id={user_id}")
|
||
|
||
return result
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"发送文件消息网络错误: user_id={user_id}, error={e}")
|
||
raise Exception(f"发送消息网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 获取员工通讯录信息
|
||
# --------------------------------------------------------------------------
|
||
async def get_user_info(self, user_id: str) -> Dict[str, Any]:
|
||
"""获取员工通讯录详细信息(用于 VIP 判断)。
|
||
|
||
对应企微API:
|
||
GET https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token=TOKEN&userid=USERID
|
||
|
||
返回数据包含:
|
||
- userid: 员工UserID
|
||
- name: 员工姓名
|
||
- department: 部门ID列表
|
||
- position: 岗位
|
||
- mobile: 手机号
|
||
- email: 邮箱
|
||
- status: 激活状态
|
||
|
||
需要企微通讯录只读权限。
|
||
|
||
Args:
|
||
user_id: 员工的企微 UserID
|
||
|
||
Returns:
|
||
Dict[str, Any]: 员工信息字典
|
||
|
||
Raises:
|
||
Exception: 获取失败
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
|
||
params = {
|
||
"access_token": access_token,
|
||
"userid": user_id,
|
||
}
|
||
|
||
try:
|
||
response = await self.client.get(url, params=params)
|
||
result = response.json()
|
||
|
||
if result.get("errcode", 0) != 0:
|
||
logger.error(
|
||
f"获取员工信息失败: user_id={user_id}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
raise Exception(f"获取员工信息失败: {result.get('errmsg')}")
|
||
|
||
logger.info(f"获取员工信息成功: user_id={user_id}, name={result.get('name', '')}")
|
||
return result
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"获取员工信息网络错误: user_id={user_id}, error={e}")
|
||
raise Exception(f"获取员工信息网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 获取部门成员列表
|
||
# --------------------------------------------------------------------------
|
||
async def get_department_members(
|
||
self, department_id: int = 1, fetch_child: int = 1
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取部门成员列表。
|
||
|
||
对应企微API:
|
||
GET https://qyapi.weixin.qq.com/cgi-bin/user/list?access_token=TOKEN&department_id=ID&fetch_child=1
|
||
|
||
Args:
|
||
department_id: 部门ID(默认1为根部门)
|
||
fetch_child: 是否递归获取子部门(1=是, 0=否)
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 部门成员列表
|
||
|
||
Raises:
|
||
Exception: 获取失败
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/user/list"
|
||
params = {
|
||
"access_token": access_token,
|
||
"department_id": department_id,
|
||
"fetch_child": fetch_child,
|
||
}
|
||
|
||
try:
|
||
response = await self.client.get(url, params=params)
|
||
result = response.json()
|
||
|
||
if result.get("errcode", 0) != 0:
|
||
logger.error(
|
||
f"获取部门成员失败: dept_id={department_id}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
raise Exception(f"获取部门成员失败: {result.get('errmsg')}")
|
||
|
||
userlist = result.get("userlist", [])
|
||
logger.info(f"获取部门成员成功: dept_id={department_id}, count={len(userlist)}")
|
||
return userlist
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"获取部门成员网络错误: dept_id={department_id}, error={e}")
|
||
raise Exception(f"获取部门成员网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 上传临时素材
|
||
# --------------------------------------------------------------------------
|
||
async def upload_temp_media(
|
||
self, media_type: str, file_data: bytes, filename: str = "upload"
|
||
) -> str:
|
||
"""上传临时素材(图片/文件/语音),获取 media_id。
|
||
|
||
对应企微API:
|
||
POST https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token=TOKEN&type=TYPE
|
||
|
||
临时素材有效期 3 天,适用于发送图片/文件消息。
|
||
|
||
Args:
|
||
media_type: 媒体类型(image/file/voice)
|
||
file_data: 文件二进制数据
|
||
filename: 文件名
|
||
|
||
Returns:
|
||
str: media_id(用于发送图片/文件消息时引用)
|
||
|
||
Raises:
|
||
Exception: 上传失败
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
|
||
|
||
try:
|
||
# 使用 multipart 上传文件
|
||
files = {"media": (filename, file_data)}
|
||
response = await self.client.post(url, files=files)
|
||
result = response.json()
|
||
|
||
if result.get("errcode", 0) != 0:
|
||
logger.error(
|
||
f"上传临时素材失败: type={media_type}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
raise Exception(f"上传临时素材失败: {result.get('errmsg')}")
|
||
|
||
media_id = result.get("media_id", "")
|
||
logger.info(f"上传临时素材成功: type={media_type}, media_id={media_id}")
|
||
return media_id
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"上传临时素材网络错误: type={media_type}, error={e}")
|
||
raise Exception(f"上传临时素材网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# OAuth2 授权换算用户身份
|
||
# --------------------------------------------------------------------------
|
||
async def get_oauth_user_info(self, code: str) -> Dict[str, str]:
|
||
"""通过 OAuth2 授权码换取员工身份信息。
|
||
|
||
对应企微API:
|
||
GET https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo?access_token=TOKEN&code=CODE
|
||
|
||
H5 页面通过企微 OAuth2 静默授权获取 code,后端用 code 换取员工 UserID。
|
||
适用于 H5 用户端身份识别。
|
||
|
||
Args:
|
||
code: 企微 OAuth2 授权码
|
||
|
||
Returns:
|
||
Dict[str, str]: 包含 userid 和 user_ticket 的字典
|
||
|
||
Raises:
|
||
Exception: 换取失败
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo"
|
||
params = {
|
||
"access_token": access_token,
|
||
"code": code,
|
||
}
|
||
|
||
try:
|
||
response = await self.client.get(url, params=params)
|
||
result = response.json()
|
||
|
||
if result.get("errcode", 0) != 0:
|
||
logger.error(
|
||
f"OAuth2换取用户身份失败: code={code}, "
|
||
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
|
||
)
|
||
raise Exception(f"OAuth2换取用户身份失败: {result.get('errmsg')}")
|
||
|
||
user_id = result.get("userid", "")
|
||
logger.info(f"OAuth2换取用户身份成功: userid={user_id}")
|
||
return {
|
||
"userid": user_id,
|
||
"user_ticket": result.get("user_ticket", ""),
|
||
}
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"OAuth2换取用户身份网络错误: code={code}, error={e}")
|
||
raise Exception(f"OAuth2换取用户身份网络错误: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 关闭客户端
|
||
# --------------------------------------------------------------------------
|
||
async def close(self) -> None:
|
||
"""关闭 HTTP 客户端连接池。
|
||
|
||
应用关闭时调用,释放资源。
|
||
"""
|
||
await self.client.aclose()
|
||
logger.info("WecomService HTTP 客户端已关闭")
|