""" 从 IT支持知识库.docx 提取结构化内容,导入到 quick_reply_templates 表。 映射规则: - Heading 1 → 文档一级分类(用于确定 category 字段) - Heading 2 → 文档二级子分类(合并到 title 前缀) - Heading 3 → 快速回复模板标题(title 字段) - Normal → 模板内容(content 字段,多段合并) Category 映射: 办公电脑 → 硬件 软件工具 → 软件 办公设备 → 硬件 办公网络 → 网络 终端安全 → 安全 资产管理 → 通用 其他业务 → 通用 """ import uuid import sqlite3 from datetime import datetime, timezone from docx import Document # ========================================================================= # 配置 # ========================================================================= DOCX_PATH = r"C:\Users\simon\Downloads\IT支持知识库2026-4-24.docx" DB_PATH = r"C:\Users\simon\wecom_it_smart_desk\backend\it_smart_desk.db" # Heading 1 → quick_reply category 映射 CATEGORY_MAP = { "办公电脑": "硬件", "软件工具": "软件", "办公设备": "硬件", "办公网络": "网络", "终端安全": "安全", # 终端安全涉及账号/密码/安全策略,用"安全" "资产管理": "通用", "其他业务": "通用", } def extract_items(doc): """从文档中提取所有 Heading 3 条目,包含完整的层级上下文。 遍历流程: 1. 记录当前的 Heading 1、Heading 2(建立层级上下文) 2. 遇到 Heading 3 → 开始收集该条目下的所有 Normal 段落 3. 遇到下个 Heading 3 或 Heading 2/Heading 1 → 条目结束,存储 Returns: List[dict]: 每个条目含 h1/h2/h3/content 字段 """ items = [] current_h1 = None current_h2 = None current_item = None # 当前正在收集的条目 {h1, h2, h3, content_lines} for para in doc.paragraphs: text = para.text.strip() if not text: continue style = para.style.name if para.style else "" # Heading 1 → 更新一级分类,结束当前条目 if style == "Heading 1": current_h1 = text if current_item and current_item["content_lines"]: items.append(finalize_item(current_item)) current_item = None continue # Heading 2 → 更新二级分类,结束当前条目 if style == "Heading 2": current_h2 = text if current_item and current_item["content_lines"]: items.append(finalize_item(current_item)) current_item = None continue # Heading 3 → 新条目开始,保存上一个,创建新的 if style == "Heading 3": if current_item and current_item["content_lines"]: items.append(finalize_item(current_item)) current_item = { "h1": current_h1, "h2": current_h2, "h3": text, "content_lines": [], } continue # Normal / Normal (Web) 等 → 条目内容 if current_item: current_item["content_lines"].append(text) # 最后一个条目 if current_item and current_item["content_lines"]: items.append(finalize_item(current_item)) return items def finalize_item(item): """将 content_lines 合并为单个 content 字符串,并做格式化处理。""" # 合并内容,用换行分隔多段 content = "\n".join(item["content_lines"]) # 清理多余空白 content = content.strip() item["content"] = content del item["content_lines"] return item def map_category(h1): """将文档一级分类映射到快速回复的 category 字段。""" for key, cat in CATEGORY_MAP.items(): if key in h1 if h1 else False: return cat return "通用" def to_title(item): """生成模板标题:Heading 3 本身作为标题。 如果 Heading 3 文字太长(>128 字符),截断。 """ title = item["h3"] if len(title) > 128: title = title[:125] + "..." return title def check_existing(conn): """检查是否已有数据,避免重复导入。""" count = conn.execute("SELECT COUNT(*) FROM quick_reply_templates").fetchone()[0] return count def import_items(conn, items): """将条目批量插入 quick_reply_templates 表。""" now = datetime.now(timezone.utc).isoformat() inserted = 0 skipped = 0 for i, item in enumerate(items): category = map_category(item["h1"]) title = to_title(item) content = item["content"] # 跳过内容过短的条目(可能是误抓的标题) if len(content) < 10: skipped += 1 continue # 检查是否重复(同标题+同分类) existing = conn.execute( "SELECT id FROM quick_reply_templates WHERE title = ? AND category = ?", (title, category) ).fetchone() if existing: skipped += 1 continue template_id = str(uuid.uuid4()) sort_order = i # 保持文档原始顺序 conn.execute( """INSERT INTO quick_reply_templates (id, category, title, content, variables, sort_order, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( template_id, category, title, content, "[]", # variables: 空列表(JSON 字符串) sort_order, now, now, ) ) inserted += 1 conn.commit() return inserted, skipped # ========================================================================= # 主流程 # ========================================================================= if __name__ == "__main__": print("=" * 60) print(" IT 支持知识库 → 快速回复模板 导入工具") print("=" * 60) # 1. 读取文档 print(f"\n[1/4] 读取文档: {DOCX_PATH}") doc = Document(DOCX_PATH) # 2. 提取条目 print("[2/4] 提取结构化条目...") items = extract_items(doc) print(f" → 共提取 {len(items)} 个 Heading 3 条目") # 统计分类分布 cat_counts = {} for item in items: cat = map_category(item["h1"]) cat_counts[cat] = cat_counts.get(cat, 0) + 1 print(f" → 分类分布: {dict(sorted(cat_counts.items()))}") # 3. 连接数据库 print(f"\n[3/4] 连接数据库: {DB_PATH}") conn = sqlite3.connect(DB_PATH) existing = check_existing(conn) if existing > 0: print(f" ⚠ 数据库中已有 {existing} 条记录,将跳过重复标题。") # 4. 批量导入 print("[4/4] 导入数据...") inserted, skipped = import_items(conn, items) # 统计 total = conn.execute("SELECT COUNT(*) FROM quick_reply_templates").fetchone()[0] by_cat = conn.execute( "SELECT category, COUNT(*) FROM quick_reply_templates GROUP BY category ORDER BY category" ).fetchall() print(f"\n{'=' * 60}") print(f" 导入完成!") print(f" → 新增: {inserted} 条") print(f" → 跳过(重复/内容过短): {skipped} 条") print(f" → 数据库总计: {total} 条") print(f"\n 按分类统计:") for cat, cnt in by_cat: print(f" {cat}: {cnt}") print(f"{'=' * 60}") # 展示前 5 条样例 print("\n 【导入样例】(前 5 条)") samples = conn.execute( "SELECT category, title, substr(content, 1, 80) FROM quick_reply_templates ORDER BY sort_order LIMIT 5" ).fetchall() for cat, title, snippet in samples: print(f" [{cat}] {title}") print(f" {snippet}...") print() conn.close()