G41_TAPD_BUG_SYNC/src2/sync_service.py
2026-01-13 15:59:40 +08:00

606 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
任务二同步服务模块
整合链接解析、TAPD查询、表格回写实现完整的同步流程
功能:
1. 单次同步流程
2. 执行统计
3. 错误处理
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.logger import get_task2_logger
from src2.tapd_api import TAPDStoryApi, TERMINAL_STATUSES
from src2.smartsheet_sync import SmartSheetSync, REQUIRED_FIELDS
class SyncService:
"""TAPD状态同步服务支持多表格同步"""
def __init__(self, config_manager: Task2ConfigManager = None,
access_token: str = None, test_mode: bool = False):
"""
初始化同步服务
Args:
config_manager: 配置管理器如果为None则自动创建
access_token: 企业微信access_token如果为None则自动获取
test_mode: 是否启用测试模式
"""
self.test_mode = test_mode
self.logger = get_task2_logger()
# 初始化配置管理器
if config_manager is None:
self.config_manager = Task2ConfigManager()
else:
self.config_manager = config_manager
# 获取配置
self.config = self.config_manager.get_all_config()
self.workspace_id = self.config['tapd']['workspace_id']
# 获取所有docid列表
self.docid_list = self.config['smartsheet']['docid_list']
print(f" 配置了 {len(self.docid_list)} 个智能表格")
# 获取access_token
if access_token is None:
token_manager = TokenManager()
self.access_token = token_manager.get_token()
else:
self.access_token = access_token
# 初始化TAPD API所有表格共用
self.tapd_api = TAPDStoryApi(self.workspace_id, test_mode=test_mode)
# 获取计划字段映射(每次同步时实时获取,所有表格共用)
print(f" 正在获取计划字段映射...")
try:
self.plan_mapping = self.tapd_api.get_plan_mapping()
print(f" ✓ 计划字段映射获取完成,共 {len(self.plan_mapping)} 个选项")
except Exception as e:
print(f" ⚠ 计划字段映射获取失败: {e},将使用空映射")
self.plan_mapping = {}
print(f" ✓ 同步服务初始化完成")
def sync_once(self) -> Dict[str, Any]:
"""
执行一次完整的同步流程(遍历所有配置的智能表格)
Returns:
Dict: 同步结果统计
"""
result = {
"success": False,
"start_time": datetime.now().isoformat(),
"end_time": None,
"docs_total": len(self.docid_list),
"docs_success": 0,
"docs_failed": 0,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"doc_results": [] # 每个表格的详细结果
}
all_failed_records = [] # 汇总所有表格的失败记录
# 遍历所有配置的智能表格
for doc_index, docid in enumerate(self.docid_list, 1):
doc_result = self._sync_single_doc(docid, doc_index, len(self.docid_list))
result["doc_results"].append(doc_result)
if doc_result["success"]:
result["docs_success"] += 1
# 累加统计数据
result["sheets_processed"] += doc_result["sheets_processed"]
result["sheets_skipped"] += doc_result["sheets_skipped"]
result["total_records"] += doc_result["total_records"]
result["records_with_link"] += doc_result["records_with_link"]
result["records_synced"] += doc_result["records_synced"]
result["records_updated"] += doc_result["records_updated"]
result["records_failed"] += doc_result["records_failed"]
# 收集失败记录
all_failed_records.extend(doc_result.get("all_failed_records", []))
else:
result["docs_failed"] += 1
# 判断整体是否成功(至少有一个表格成功)
result["success"] = result["docs_success"] > 0
# 发送失败通知(汇总所有表格的失败记录)
if all_failed_records:
self._send_failure_notification(all_failed_records)
result["end_time"] = datetime.now().isoformat()
return result
def _sync_single_doc(self, docid: str, doc_index: int, total_docs: int) -> Dict[str, Any]:
"""
同步单个智能表格
Args:
docid: 智能表格文档ID
doc_index: 当前表格序号从1开始
total_docs: 表格总数
Returns:
Dict: 单个表格的同步结果
"""
# 显示docid的前16个字符便于识别
display_id = docid[:16] + "..." if len(docid) > 16 else docid
print(f"\n{'#'*70}")
print(f"# [表格 {doc_index}/{total_docs}] docid: {display_id}")
print(f"{'#'*70}")
doc_result = {
"docid": docid,
"doc_index": doc_index,
"success": False,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"sheet_results": [],
"all_failed_records": []
}
try:
# 为当前表格创建SmartSheetSync实例
smartsheet = SmartSheetSync(self.access_token, docid, test_mode=self.test_mode)
self.current_smartsheet = smartsheet # 供_process_sheet等方法使用
# 获取所有子表
print("\n正在获取子表列表...")
sheets = smartsheet.api.get_sheet_list()
print(f" ✓ 找到 {len(sheets)} 个子表")
# 处理每个子表
for sheet in sheets:
sheet_id = sheet.get('sheet_id', '')
sheet_title = sheet.get('title', '未命名')
sheet_result = self._process_sheet(sheet_id, sheet_title, doc_index)
doc_result["sheet_results"].append(sheet_result)
if sheet_result["skipped"]:
doc_result["sheets_skipped"] += 1
else:
doc_result["sheets_processed"] += 1
doc_result["total_records"] += sheet_result["total_records"]
doc_result["records_with_link"] += sheet_result["records_with_link"]
doc_result["records_synced"] += sheet_result["records_synced"]
doc_result["records_updated"] += sheet_result["records_updated"]
doc_result["records_failed"] += sheet_result["records_failed"]
# 收集失败记录(添加表格标识)
for failed in sheet_result.get("failed_records", []):
failed["doc_index"] = doc_index
failed["docid_short"] = display_id
doc_result["all_failed_records"].append(failed)
doc_result["success"] = True
print(f"\n✓ [表格 {doc_index}/{total_docs}] 同步完成")
except Exception as e:
doc_result["error_message"] = str(e)
print(f"\n✗ [表格 {doc_index}/{total_docs}] 同步失败: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
return doc_result
def _process_sheet(self, sheet_id: str, sheet_title: str, doc_index: int = 1) -> Dict[str, Any]:
"""
处理单个子表的同步
Args:
sheet_id: 子表ID
sheet_title: 子表标题
doc_index: 表格序号(用于日志显示)
Returns:
Dict: 子表处理结果
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title} (表格{doc_index})")
print(f"{'='*60}")
sheet_result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"details": [],
"failed_records": [] # 收集失败记录的详细信息
}
try:
# 1. 获取字段信息并检查必要字段
fields = self.current_smartsheet.api.get_fields(sheet_id)
all_present, missing_fields, field_mapping = self.current_smartsheet.check_required_fields(fields)
if not all_present:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {sheet_result['skip_reason']}")
return sheet_result
# 2. 获取所有记录(只获取一次,供新记录同步和持续同步共用)
print(f"正在获取所有记录...")
all_records = self.current_smartsheet.get_all_records(sheet_id)
# 3. 获取包含TAPD链接的新记录同步状态为空
records_with_link = self.current_smartsheet.get_records_with_tapd_link(
sheet_id, all_records=all_records
)
sheet_result["records_with_link"] = len(records_with_link)
# 4. 处理新记录
if records_with_link:
print(f"\n--- 新记录同步 ---")
success_records = [] # 成功同步的记录
failed_record_ids = [] # 失败的记录ID列表
for record_info in records_with_link:
record_result = self._process_record(record_info)
sheet_result["details"].append(record_result)
if record_result["success"]:
sheet_result["records_synced"] += 1
if record_result["update_record"]:
success_records.append(record_result["update_record"])
sheet_result["records_updated"] += 1
else:
sheet_result["records_failed"] += 1
failed_record_ids.append(record_info["record_id"])
# 收集失败记录的详细信息用于推送
failed_record = {
"sheet_title": sheet_title,
"record_id": record_info["record_id"],
"tapd_link": record_info.get("tapd_link", "(无链接)"),
"error_message": record_result.get("error_message", "未知错误")
}
sheet_result["failed_records"].append(failed_record)
# 4. 批量回写成功记录
if success_records:
print(f"\n正在回写 {len(success_records)} 条成功记录...")
try:
self.current_smartsheet.batch_update_records(sheet_id, success_records)
print(f" ✓ 成功记录回写完成")
except Exception as e:
print(f" ✗ 成功记录回写失败: {e}")
# 5. 批量回写失败记录
if failed_record_ids:
print(f"\n正在回写 {len(failed_record_ids)} 条失败记录的状态...")
try:
failed_updates = [
self.current_smartsheet.build_update_record(
record_id=record_id,
sync_status="失败"
)
for record_id in failed_record_ids
]
self.current_smartsheet.batch_update_records(sheet_id, failed_updates)
print(f" ✓ 失败记录状态回写完成")
except Exception as e:
print(f" ✗ 失败记录状态回写失败: {e}")
else:
print(f" 没有新记录需要同步")
# 7. 持续同步:更新已同步记录的最新状态
print(f"\n--- 持续同步 ---")
update_result = self._process_synced_records_update(sheet_id, all_records=all_records)
sheet_result["continuous_sync"] = update_result
sheet_result["total_records"] = len(records_with_link)
except Exception as e:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"处理异常: {str(e)}"
print(f" ✗ 处理子表异常: {e}")
return sheet_result
def _process_record(self, record_info: Dict) -> Dict[str, Any]:
"""
处理单条记录的同步
Args:
record_info: 记录信息(包含解析结果)
Returns:
Dict: 记录处理结果
"""
record_result = {
"record_id": record_info["record_id"],
"tapd_link": record_info["tapd_link"],
"success": False,
"update_record": None,
"error_message": None,
"story_info": None
}
# 检查链接解析结果
if not record_info["parse_success"]:
record_result["error_message"] = record_info.get("parse_error", "链接解析失败")
return record_result
story_id = record_info["story_id"]
try:
# 查询TAPD获取需求信息
story_info = self.tapd_api.get_story(story_id)
record_result["story_info"] = story_info
# 提取需要同步的字段
status = story_info.get('status', '')
owner = story_info.get('owner', '')
begin_date = story_info.get('begin', '')
due_date = story_info.get('due', '')
# 提取计划字段并转换为中文名称
plan_id = story_info.get('custom_plan_field_1', '')
plan_name = self.tapd_api.map_plan_id_to_name(plan_id)
# 获取当前字段值,判断是否需要更新
current_values = self.current_smartsheet.get_current_field_values(record_info["record"])
needs_update = self._check_needs_update(
current_values, status, owner, begin_date, due_date, plan_name
)
# 构造更新记录(包含业务字段 + 同步状态=成功)
# 即使业务字段没有变化,也要写入同步状态
update_record = self.current_smartsheet.build_update_record(
record_id=record_info["record_id"],
status=status,
owner=owner,
begin_date=begin_date,
due_date=due_date,
plan=plan_name,
sync_status="成功"
)
record_result["update_record"] = update_record
record_result["success"] = True
except Exception as e:
record_result["error_message"] = str(e)
return record_result
def _check_needs_update(self, current_values: Dict,
status: str, owner: str,
begin_date: str, due_date: str,
plan: str = "") -> bool:
"""
检查是否需要更新记录
Args:
current_values: 当前字段值
status: 新状态
owner: 新处理人
begin_date: 新开始日期
due_date: 新结束日期
plan: 新计划
Returns:
bool: 是否需要更新
"""
# 提取当前值的文本内容
def extract_text(value):
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, list) and len(value) > 0:
first = value[0]
if isinstance(first, dict):
return first.get('text', '')
return str(first)
if isinstance(value, dict):
return value.get('text', '')
return str(value)
current_status = extract_text(current_values.get('TAPD状态'))
current_owner = extract_text(current_values.get('处理人'))
current_begin = extract_text(current_values.get('TAPD预计开始日期'))
current_due = extract_text(current_values.get('TAPD预计完成日期'))
current_plan = extract_text(current_values.get('计划'))
# 比较是否有变化
if current_status != status:
return True
if current_owner != owner:
return True
if current_begin != begin_date:
return True
if current_due != due_date:
return True
if current_plan != plan:
return True
return False
def _process_synced_records_update(self, sheet_id: str,
all_records: List[Dict] = None) -> Dict[str, Any]:
"""
处理已同步记录的状态更新(持续同步)
Args:
sheet_id: 子表ID
all_records: 可选,已获取的所有记录列表
Returns:
Dict: 更新结果统计
"""
result = {
"checked": 0,
"updated": 0,
"failed": 0,
}
# 获取需要持续同步的记录
records = self.current_smartsheet.get_synced_records_for_update(
sheet_id, TERMINAL_STATUSES, all_records=all_records
)
if not records:
print(f" 没有需要持续同步的记录")
return result
result["checked"] = len(records)
updates = []
for record_info in records:
try:
# 查询TAPD最新状态
story_info = self.tapd_api.get_story(record_info["story_id"])
# 提取最新字段值
new_status = story_info.get('status', '')
new_owner = story_info.get('owner', '')
new_begin = story_info.get('begin', '')
new_due = story_info.get('due', '')
plan_id = story_info.get('custom_plan_field_1', '')
new_plan = self.tapd_api.map_plan_id_to_name(plan_id)
# 获取当前值并比较
current = self.current_smartsheet.get_current_field_values(record_info["record"])
needs_update = self._check_needs_update(
current, new_status, new_owner, new_begin, new_due, new_plan
)
if needs_update:
update_record = self.current_smartsheet.build_update_record(
record_id=record_info["record_id"],
status=new_status,
owner=new_owner,
begin_date=new_begin,
due_date=new_due,
plan=new_plan
)
updates.append(update_record)
except Exception as e:
result["failed"] += 1
if self.test_mode:
print(f" ✗ 记录 {record_info['record_id']} 更新失败: {e}")
# 批量更新
if updates:
print(f" 正在更新 {len(updates)} 条记录...")
self.current_smartsheet.batch_update_records(sheet_id, updates)
result["updated"] = len(updates)
print(f" ✓ 持续同步更新完成")
else:
print(f" ✓ 所有记录状态均未变化")
return result
def _send_failure_notification(self, failed_records: List[Dict]) -> None:
"""
发送同步失败通知
Args:
failed_records: 失败记录列表
"""
try:
# 获取企业微信配置
wework_config = self.config.get('wework')
if not wework_config:
print(" 未配置企业微信推送,跳过失败通知")
return
agentid = wework_config.get('agentid')
receivers = wework_config.get('receivers')
if not agentid or not receivers:
print(" ⚠ 企业微信推送配置不完整,跳过失败通知")
return
# 发送推送通知
print(f"\n正在发送同步失败通知...")
from src2.notifier import send_sync_failure_notification
success = send_sync_failure_notification(
self.access_token,
agentid,
receivers,
failed_records
)
if success:
print(f" ✓ 失败通知已发送")
else:
print(f" ✗ 失败通知发送失败")
except Exception as e:
print(f" ✗ 发送失败通知时出错: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
def run_once(config_manager: Task2ConfigManager = None,
access_token: str = None,
test_mode: bool = False) -> Dict[str, Any]:
"""
执行一次同步流程(供调度器调用)
Args:
config_manager: 配置管理器
access_token: access_token
test_mode: 测试模式
Returns:
Dict: 同步结果
"""
service = SyncService(
config_manager=config_manager,
access_token=access_token,
test_mode=test_mode
)
return service.sync_once()
if __name__ == "__main__":
print("=== 任务二同步服务测试 ===\n")
print("请使用 main.py 或 scheduler.py 运行同步服务")
print("或使用 test_phase4.py 进行测试")