138 lines
4.3 KiB
Python
138 lines
4.3 KiB
Python
"""
|
||
任务二企业微信消息推送模块
|
||
用于发送同步失败通知
|
||
"""
|
||
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import List, Dict
|
||
from datetime import datetime
|
||
|
||
# 将项目根目录添加到 Python 路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from src.wework_notifier import WeWorkNotifier
|
||
from src2.logger import get_task2_logger
|
||
|
||
|
||
def send_sync_failure_notification(access_token: str, agentid: str,
|
||
receivers: str, failed_records: List[Dict]) -> bool:
|
||
"""
|
||
发送同步失败通知
|
||
|
||
Args:
|
||
access_token: 企业微信access_token
|
||
agentid: 应用ID
|
||
receivers: 接收人列表
|
||
failed_records: 失败记录列表,每条记录包含:
|
||
- sheet_title: 子表标题
|
||
- record_id: 记录ID
|
||
- tapd_link: TAPD链接
|
||
- error_message: 失败原因
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
if not failed_records:
|
||
return True
|
||
|
||
# 构造消息内容
|
||
content = _build_sync_failure_message(failed_records)
|
||
|
||
# 使用任务一的推送器发送消息
|
||
notifier = WeWorkNotifier(access_token, agentid, receivers, logger=get_task2_logger())
|
||
return notifier._send_text_message(content)
|
||
|
||
|
||
def _build_sync_failure_message(failed_records: List[Dict]) -> str:
|
||
"""
|
||
构造同步失败消息内容(支持多表格、多子表分组)
|
||
|
||
Args:
|
||
failed_records: 失败记录列表,每条记录可包含:
|
||
- doc_index: 表格序号(可选)
|
||
- docid_short: 表格ID简写(可选)
|
||
- sheet_title: 子表标题
|
||
- record_id: 记录ID
|
||
- tapd_link: TAPD链接
|
||
- error_message: 失败原因
|
||
|
||
Returns:
|
||
str: 格式化的消息内容
|
||
"""
|
||
# 按表格和子表分组
|
||
records_by_doc = {}
|
||
for record in failed_records:
|
||
doc_index = record.get('doc_index', 1)
|
||
docid_short = record.get('docid_short', '')
|
||
doc_key = (doc_index, docid_short)
|
||
|
||
if doc_key not in records_by_doc:
|
||
records_by_doc[doc_key] = {}
|
||
|
||
sheet_title = record.get('sheet_title', '未知子表')
|
||
if sheet_title not in records_by_doc[doc_key]:
|
||
records_by_doc[doc_key][sheet_title] = []
|
||
records_by_doc[doc_key][sheet_title].append(record)
|
||
|
||
# 消息头部
|
||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
total_count = len(failed_records)
|
||
doc_count = len(records_by_doc)
|
||
|
||
lines = [
|
||
"【autoTAPD 同步失败通知】",
|
||
f"时间: {timestamp}",
|
||
f"失败数量: {total_count} 条",
|
||
]
|
||
|
||
# 如果有多个表格,显示表格数量
|
||
if doc_count > 1:
|
||
lines.append(f"涉及表格: {doc_count} 个")
|
||
|
||
lines.extend([
|
||
"",
|
||
"以下记录同步失败,请检查:",
|
||
"=" * 40
|
||
])
|
||
|
||
# 按表格和子表分组显示失败记录
|
||
global_idx = 1
|
||
for (doc_index, docid_short), sheets in sorted(records_by_doc.items()):
|
||
# 如果有多个表格,显示表格标识
|
||
if doc_count > 1:
|
||
doc_label = f"表格{doc_index}"
|
||
if docid_short:
|
||
doc_label += f" ({docid_short})"
|
||
lines.append(f"\n{'#'*20}")
|
||
lines.append(f"# {doc_label}")
|
||
lines.append(f"{'#'*20}")
|
||
|
||
for sheet_title, sheet_records in sheets.items():
|
||
lines.append(f"\n【子表:{sheet_title}】")
|
||
lines.append("")
|
||
|
||
for record in sheet_records:
|
||
record_id = record.get('record_id', '未知')
|
||
tapd_link = record.get('tapd_link', '(无链接)')
|
||
error_message = record.get('error_message', '未知错误')
|
||
|
||
lines.append(f"[{global_idx}] 记录ID: {record_id}")
|
||
lines.append(f"TAPD链接: {tapd_link}")
|
||
lines.append(f"失败原因: {error_message}")
|
||
lines.append("")
|
||
|
||
global_idx += 1
|
||
|
||
lines.append("=" * 40)
|
||
lines.append("系统将在下次同步时自动重试失败记录。")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("=== 任务二推送模块 ===")
|
||
print("此模块提供同步失败通知功能")
|
||
print("请通过 sync_service 或 main.py 调用")
|