Files

249 lines
7.8 KiB
Python

"""
从 IT支持知识库.docx 提取结构化内容,导入到 quick_reply_templates 表。
映射规则:
- Heading 1 → 文档一级分类(用于确定 category 字段)
- Heading 2 → 文档二级子分类(合并到 title 前缀)
- Heading 3 → 快速回复模板标题(title 字段)
- Normal → 模板内容(content 字段,多段合并)
Category 映射:
办公电脑 → 硬件
软件工具 → 软件
办公设备 → 硬件
办公网络 → 网络
终端安全 → 安全
资产管理 → 通用
其他业务 → 通用
"""
import uuid
import sqlite3
from datetime import datetime, timezone
from docx import Document
# =========================================================================
# 配置
# =========================================================================
DOCX_PATH = r"C:\Users\simon\Downloads\IT支持知识库2026-4-24.docx"
DB_PATH = r"C:\Users\simon\wecom_it_smart_desk\backend\it_smart_desk.db"
# Heading 1 → quick_reply category 映射
CATEGORY_MAP = {
"办公电脑": "硬件",
"软件工具": "软件",
"办公设备": "硬件",
"办公网络": "网络",
"终端安全": "安全", # 终端安全涉及账号/密码/安全策略,用"安全"
"资产管理": "通用",
"其他业务": "通用",
}
def extract_items(doc):
"""从文档中提取所有 Heading 3 条目,包含完整的层级上下文。
遍历流程:
1. 记录当前的 Heading 1、Heading 2(建立层级上下文)
2. 遇到 Heading 3 → 开始收集该条目下的所有 Normal 段落
3. 遇到下个 Heading 3 或 Heading 2/Heading 1 → 条目结束,存储
Returns:
List[dict]: 每个条目含 h1/h2/h3/content 字段
"""
items = []
current_h1 = None
current_h2 = None
current_item = None # 当前正在收集的条目 {h1, h2, h3, content_lines}
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
style = para.style.name if para.style else ""
# Heading 1 → 更新一级分类,结束当前条目
if style == "Heading 1":
current_h1 = text
if current_item and current_item["content_lines"]:
items.append(finalize_item(current_item))
current_item = None
continue
# Heading 2 → 更新二级分类,结束当前条目
if style == "Heading 2":
current_h2 = text
if current_item and current_item["content_lines"]:
items.append(finalize_item(current_item))
current_item = None
continue
# Heading 3 → 新条目开始,保存上一个,创建新的
if style == "Heading 3":
if current_item and current_item["content_lines"]:
items.append(finalize_item(current_item))
current_item = {
"h1": current_h1,
"h2": current_h2,
"h3": text,
"content_lines": [],
}
continue
# Normal / Normal (Web) 等 → 条目内容
if current_item:
current_item["content_lines"].append(text)
# 最后一个条目
if current_item and current_item["content_lines"]:
items.append(finalize_item(current_item))
return items
def finalize_item(item):
"""将 content_lines 合并为单个 content 字符串,并做格式化处理。"""
# 合并内容,用换行分隔多段
content = "\n".join(item["content_lines"])
# 清理多余空白
content = content.strip()
item["content"] = content
del item["content_lines"]
return item
def map_category(h1):
"""将文档一级分类映射到快速回复的 category 字段。"""
for key, cat in CATEGORY_MAP.items():
if key in h1 if h1 else False:
return cat
return "通用"
def to_title(item):
"""生成模板标题:Heading 3 本身作为标题。
如果 Heading 3 文字太长(>128 字符),截断。
"""
title = item["h3"]
if len(title) > 128:
title = title[:125] + "..."
return title
def check_existing(conn):
"""检查是否已有数据,避免重复导入。"""
count = conn.execute("SELECT COUNT(*) FROM quick_reply_templates").fetchone()[0]
return count
def import_items(conn, items):
"""将条目批量插入 quick_reply_templates 表。"""
now = datetime.now(timezone.utc).isoformat()
inserted = 0
skipped = 0
for i, item in enumerate(items):
category = map_category(item["h1"])
title = to_title(item)
content = item["content"]
# 跳过内容过短的条目(可能是误抓的标题)
if len(content) < 10:
skipped += 1
continue
# 检查是否重复(同标题+同分类)
existing = conn.execute(
"SELECT id FROM quick_reply_templates WHERE title = ? AND category = ?",
(title, category)
).fetchone()
if existing:
skipped += 1
continue
template_id = str(uuid.uuid4())
sort_order = i # 保持文档原始顺序
conn.execute(
"""INSERT INTO quick_reply_templates
(id, category, title, content, variables, sort_order, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
template_id,
category,
title,
content,
"[]", # variables: 空列表(JSON 字符串)
sort_order,
now,
now,
)
)
inserted += 1
conn.commit()
return inserted, skipped
# =========================================================================
# 主流程
# =========================================================================
if __name__ == "__main__":
print("=" * 60)
print(" IT 支持知识库 → 快速回复模板 导入工具")
print("=" * 60)
# 1. 读取文档
print(f"\n[1/4] 读取文档: {DOCX_PATH}")
doc = Document(DOCX_PATH)
# 2. 提取条目
print("[2/4] 提取结构化条目...")
items = extract_items(doc)
print(f" → 共提取 {len(items)} 个 Heading 3 条目")
# 统计分类分布
cat_counts = {}
for item in items:
cat = map_category(item["h1"])
cat_counts[cat] = cat_counts.get(cat, 0) + 1
print(f" → 分类分布: {dict(sorted(cat_counts.items()))}")
# 3. 连接数据库
print(f"\n[3/4] 连接数据库: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
existing = check_existing(conn)
if existing > 0:
print(f" ⚠ 数据库中已有 {existing} 条记录,将跳过重复标题。")
# 4. 批量导入
print("[4/4] 导入数据...")
inserted, skipped = import_items(conn, items)
# 统计
total = conn.execute("SELECT COUNT(*) FROM quick_reply_templates").fetchone()[0]
by_cat = conn.execute(
"SELECT category, COUNT(*) FROM quick_reply_templates GROUP BY category ORDER BY category"
).fetchall()
print(f"\n{'=' * 60}")
print(f" 导入完成!")
print(f" → 新增: {inserted}")
print(f" → 跳过(重复/内容过短): {skipped}")
print(f" → 数据库总计: {total}")
print(f"\n 按分类统计:")
for cat, cnt in by_cat:
print(f" {cat}: {cnt}")
print(f"{'=' * 60}")
# 展示前 5 条样例
print("\n 【导入样例】(前 5 条)")
samples = conn.execute(
"SELECT category, title, substr(content, 1, 80) FROM quick_reply_templates ORDER BY sort_order LIMIT 5"
).fetchall()
for cat, title, snippet in samples:
print(f" [{cat}] {title}")
print(f" {snippet}...")
print()
conn.close()