Files

574 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 企微 API 封装服务
# =============================================================================
# 说明:封装所有与企微服务器的交互逻辑,包括:
# 1. access_token 管理(Redis 缓存 + 自动刷新)
# 2. 发送消息(文本/图片/文件)
# 3. 获取员工信息(通讯录 API)
# 4. 上传临时素材
# 5. OAuth2 授权换算用户身份
# =============================================================================
import json
import logging
from typing import Any, Dict, List, Optional
import httpx
import redis.asyncio as aioredis
from app.config import settings
logger = logging.getLogger(__name__)
class WecomService:
"""企微 API 调用服务。
封装所有与企微服务器的 HTTP 交互,提供异步方法。
access_token 通过 Redis 缓存管理,避免频繁调用获取接口。
Attributes:
redis: Redis 异步客户端(用于缓存 access_token
client: httpx 异步 HTTP 客户端
"""
def __init__(self, redis_client: Optional[aioredis.Redis] = None):
"""初始化企微服务。
Args:
redis_client: Redis 异步客户端实例(可为 None,本地开发时 Redis 不可用)
"""
self.redis = redis_client
# 创建 httpx 异步客户端
# timeout: 连接超时5秒,读取超时10秒
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=5.0, read=10.0, write=10.0, pool=5.0)
)
# 内存缓存(Redis 不可用时的降级方案)
self._token_cache: Optional[str] = None
# --------------------------------------------------------------------------
# access_token 管理
# --------------------------------------------------------------------------
async def get_access_token(self) -> str:
"""获取企微 access_token。
优先从 Redis 缓存获取,如果缓存不存在或即将过期则重新获取。
access_token 有效期 7200 秒,缓存 TTL 设为 6900 秒(提前 300 秒刷新)。
对应企微API:
GET https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET
Returns:
str: access_token 字符串
Raises:
Exception: 获取 access_token 失败
"""
# Redis 缓存 key
cache_key = "wecom:access_token"
# 1. 尝试从 Redis 缓存获取
if self.redis:
try:
cached_token = await self.redis.get(cache_key)
if cached_token:
logger.debug("从缓存获取 access_token")
return cached_token.decode("utf-8")
except Exception as e:
logger.warning(f"Redis 读取失败(降级): {e}")
# 1b. 尝试从内存缓存获取
if self._token_cache:
logger.debug("从内存缓存获取 access_token")
return self._token_cache
# 2. 缓存未命中,调用企微 API 获取
logger.info("缓存未命中,调用企微API获取 access_token")
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": settings.wecom_corp_id,
"corpsecret": settings.wecom_secret,
}
try:
response = await self.client.get(url, params=params)
result = response.json()
# 检查企微API返回码
if result.get("errcode") != 0:
error_msg = result.get("errmsg", "未知错误")
logger.error(f"获取 access_token 失败: errcode={result.get('errcode')}, errmsg={error_msg}")
raise Exception(f"企微API错误: {error_msg}")
access_token = result["access_token"]
expires_in = result.get("expires_in", 7200)
# 3. 缓存到 RedisTTL = 有效期 - 300秒(提前刷新)
buffer_seconds = 300
cache_ttl = max(expires_in - buffer_seconds, 60) # 至少缓存 60 秒
if self.redis:
try:
await self.redis.setex(cache_key, cache_ttl, access_token)
except Exception as e:
logger.warning(f"Redis 写入失败(降级): {e}")
# 3b. 同时缓存到内存
self._token_cache = access_token
logger.info(f"access_token 获取成功,缓存 TTL={cache_ttl}")
return access_token
except httpx.HTTPError as e:
logger.error(f"获取 access_token 网络错误: {e}")
raise Exception(f"企微API网络错误: {e}") from e
# --------------------------------------------------------------------------
# 发送文本消息
# --------------------------------------------------------------------------
async def send_text_message(
self, user_id: str, content: str
) -> Dict[str, Any]:
"""向员工发送文本消息。
对应企微API:
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
请求体:
{
"touser": "UserID",
"msgtype": "text",
"agentid": 1000002,
"text": {"content": "消息内容"}
}
Args:
user_id: 员工的企微 UserID
content: 消息内容(纯文本)
Returns:
Dict[str, Any]: 企微API返回结果
"""
access_token = await self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
payload = {
"touser": user_id,
"msgtype": "text",
"agentid": int(settings.wecom_agent_id),
"text": {"content": content},
}
try:
response = await self.client.post(url, json=payload)
result = response.json()
if result.get("errcode") != 0:
logger.error(
f"发送文本消息失败: user_id={user_id}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
else:
logger.info(f"发送文本消息成功: user_id={user_id}")
return result
except httpx.HTTPError as e:
logger.error(f"发送文本消息网络错误: user_id={user_id}, error={e}")
raise Exception(f"发送消息网络错误: {e}") from e
# --------------------------------------------------------------------------
# 发送卡片消息
# --------------------------------------------------------------------------
async def send_card_message(
self,
user_id: str,
title: str,
description: str,
url: str = "",
btntxt: str = "详情",
) -> Dict[str, Any]:
"""向员工发送文本卡片消息。
对应企微API:
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
请求体:
{
"touser": "UserID",
"msgtype": "textcard",
"agentid": 1000002,
"textcard": {
"title": "标题",
"description": "描述",
"url": "链接",
"btntxt": "按钮文字"
}
}
Args:
user_id: 员工的企微 UserID
title: 卡片标题
description: 卡片描述
url: 卡片点击跳转链接
btntxt: 按钮文字(默认"详情"
Returns:
Dict[str, Any]: 企微API返回结果
"""
access_token = await self.get_access_token()
url_api = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
payload = {
"touser": user_id,
"msgtype": "textcard",
"agentid": int(settings.wecom_agent_id),
"textcard": {
"title": title,
"description": description,
"url": url,
"btntxt": btntxt,
},
}
try:
response = await self.client.post(url_api, json=payload)
result = response.json()
if result.get("errcode") != 0:
logger.error(
f"发送卡片消息失败: user_id={user_id}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
else:
logger.info(f"发送卡片消息成功: user_id={user_id}")
return result
except httpx.HTTPError as e:
logger.error(f"发送卡片消息网络错误: user_id={user_id}, error={e}")
raise Exception(f"发送消息网络错误: {e}") from e
# --------------------------------------------------------------------------
# 发送图片消息
# --------------------------------------------------------------------------
async def send_image_message(
self, user_id: str, media_id: str
) -> Dict[str, Any]:
"""向员工发送图片消息。
对应企微API:
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
请求体:
{
"touser": "UserID",
"msgtype": "image",
"agentid": 1000002,
"image": {"media_id": "MEDIA_ID"}
}
注意:发送图片前需要先通过 upload_temp_media 上传图片获取 media_id。
Args:
user_id: 员工的企微 UserID
media_id: 图片媒体ID(通过上传临时素材获取)
Returns:
Dict[str, Any]: 企微API返回结果
"""
access_token = await self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
payload = {
"touser": user_id,
"msgtype": "image",
"agentid": int(settings.wecom_agent_id),
"image": {"media_id": media_id},
}
try:
response = await self.client.post(url, json=payload)
result = response.json()
if result.get("errcode") != 0:
logger.error(
f"发送图片消息失败: user_id={user_id}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
else:
logger.info(f"发送图片消息成功: user_id={user_id}")
return result
except httpx.HTTPError as e:
logger.error(f"发送图片消息网络错误: user_id={user_id}, error={e}")
raise Exception(f"发送消息网络错误: {e}") from e
# --------------------------------------------------------------------------
# 发送文件消息
# --------------------------------------------------------------------------
async def send_file_message(
self, user_id: str, media_id: str
) -> Dict[str, Any]:
"""向员工发送文件消息。
对应企微API:
POST https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=TOKEN
请求体:
{
"touser": "UserID",
"msgtype": "file",
"agentid": 1000002,
"file": {"media_id": "MEDIA_ID"}
}
注意:发送文件前需要先通过 upload_temp_media 上传文件获取 media_id。
Args:
user_id: 员工的企微 UserID
media_id: 文件媒体ID(通过上传临时素材获取)
Returns:
Dict[str, Any]: 企微API返回结果
"""
access_token = await self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
payload = {
"touser": user_id,
"msgtype": "file",
"agentid": int(settings.wecom_agent_id),
"file": {"media_id": media_id},
}
try:
response = await self.client.post(url, json=payload)
result = response.json()
if result.get("errcode") != 0:
logger.error(
f"发送文件消息失败: user_id={user_id}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
else:
logger.info(f"发送文件消息成功: user_id={user_id}")
return result
except httpx.HTTPError as e:
logger.error(f"发送文件消息网络错误: user_id={user_id}, error={e}")
raise Exception(f"发送消息网络错误: {e}") from e
# --------------------------------------------------------------------------
# 获取员工通讯录信息
# --------------------------------------------------------------------------
async def get_user_info(self, user_id: str) -> Dict[str, Any]:
"""获取员工通讯录详细信息(用于 VIP 判断)。
对应企微API:
GET https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token=TOKEN&userid=USERID
返回数据包含:
- userid: 员工UserID
- name: 员工姓名
- department: 部门ID列表
- position: 岗位
- mobile: 手机号
- email: 邮箱
- status: 激活状态
需要企微通讯录只读权限。
Args:
user_id: 员工的企微 UserID
Returns:
Dict[str, Any]: 员工信息字典
Raises:
Exception: 获取失败
"""
access_token = await self.get_access_token()
url = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
params = {
"access_token": access_token,
"userid": user_id,
}
try:
response = await self.client.get(url, params=params)
result = response.json()
if result.get("errcode", 0) != 0:
logger.error(
f"获取员工信息失败: user_id={user_id}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
raise Exception(f"获取员工信息失败: {result.get('errmsg')}")
logger.info(f"获取员工信息成功: user_id={user_id}, name={result.get('name', '')}")
return result
except httpx.HTTPError as e:
logger.error(f"获取员工信息网络错误: user_id={user_id}, error={e}")
raise Exception(f"获取员工信息网络错误: {e}") from e
# --------------------------------------------------------------------------
# 获取部门成员列表
# --------------------------------------------------------------------------
async def get_department_members(
self, department_id: int = 1, fetch_child: int = 1
) -> List[Dict[str, Any]]:
"""获取部门成员列表。
对应企微API:
GET https://qyapi.weixin.qq.com/cgi-bin/user/list?access_token=TOKEN&department_id=ID&fetch_child=1
Args:
department_id: 部门ID(默认1为根部门)
fetch_child: 是否递归获取子部门(1=是, 0=否)
Returns:
List[Dict[str, Any]]: 部门成员列表
Raises:
Exception: 获取失败
"""
access_token = await self.get_access_token()
url = "https://qyapi.weixin.qq.com/cgi-bin/user/list"
params = {
"access_token": access_token,
"department_id": department_id,
"fetch_child": fetch_child,
}
try:
response = await self.client.get(url, params=params)
result = response.json()
if result.get("errcode", 0) != 0:
logger.error(
f"获取部门成员失败: dept_id={department_id}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
raise Exception(f"获取部门成员失败: {result.get('errmsg')}")
userlist = result.get("userlist", [])
logger.info(f"获取部门成员成功: dept_id={department_id}, count={len(userlist)}")
return userlist
except httpx.HTTPError as e:
logger.error(f"获取部门成员网络错误: dept_id={department_id}, error={e}")
raise Exception(f"获取部门成员网络错误: {e}") from e
# --------------------------------------------------------------------------
# 上传临时素材
# --------------------------------------------------------------------------
async def upload_temp_media(
self, media_type: str, file_data: bytes, filename: str = "upload"
) -> str:
"""上传临时素材(图片/文件/语音),获取 media_id。
对应企微API:
POST https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token=TOKEN&type=TYPE
临时素材有效期 3 天,适用于发送图片/文件消息。
Args:
media_type: 媒体类型(image/file/voice
file_data: 文件二进制数据
filename: 文件名
Returns:
str: media_id(用于发送图片/文件消息时引用)
Raises:
Exception: 上传失败
"""
access_token = await self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
try:
# 使用 multipart 上传文件
files = {"media": (filename, file_data)}
response = await self.client.post(url, files=files)
result = response.json()
if result.get("errcode", 0) != 0:
logger.error(
f"上传临时素材失败: type={media_type}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
raise Exception(f"上传临时素材失败: {result.get('errmsg')}")
media_id = result.get("media_id", "")
logger.info(f"上传临时素材成功: type={media_type}, media_id={media_id}")
return media_id
except httpx.HTTPError as e:
logger.error(f"上传临时素材网络错误: type={media_type}, error={e}")
raise Exception(f"上传临时素材网络错误: {e}") from e
# --------------------------------------------------------------------------
# OAuth2 授权换算用户身份
# --------------------------------------------------------------------------
async def get_oauth_user_info(self, code: str) -> Dict[str, str]:
"""通过 OAuth2 授权码换取员工身份信息。
对应企微API:
GET https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo?access_token=TOKEN&code=CODE
H5 页面通过企微 OAuth2 静默授权获取 code,后端用 code 换取员工 UserID。
适用于 H5 用户端身份识别。
Args:
code: 企微 OAuth2 授权码
Returns:
Dict[str, str]: 包含 userid 和 user_ticket 的字典
Raises:
Exception: 换取失败
"""
access_token = await self.get_access_token()
url = "https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo"
params = {
"access_token": access_token,
"code": code,
}
try:
response = await self.client.get(url, params=params)
result = response.json()
if result.get("errcode", 0) != 0:
logger.error(
f"OAuth2换取用户身份失败: code={code}, "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
raise Exception(f"OAuth2换取用户身份失败: {result.get('errmsg')}")
user_id = result.get("userid", "")
logger.info(f"OAuth2换取用户身份成功: userid={user_id}")
return {
"userid": user_id,
"user_ticket": result.get("user_ticket", ""),
}
except httpx.HTTPError as e:
logger.error(f"OAuth2换取用户身份网络错误: code={code}, error={e}")
raise Exception(f"OAuth2换取用户身份网络错误: {e}") from e
# --------------------------------------------------------------------------
# 关闭客户端
# --------------------------------------------------------------------------
async def close(self) -> None:
"""关闭 HTTP 客户端连接池。
应用关闭时调用,释放资源。
"""
await self.client.aclose()
logger.info("WecomService HTTP 客户端已关闭")