857 lines
32 KiB
Python
857 lines
32 KiB
Python
"""
|
||
任务二同步服务模块
|
||
整合链接解析、TAPD查询、表格回写,实现完整的同步流程
|
||
|
||
功能:
|
||
1. 单次同步流程
|
||
2. 执行统计
|
||
3. 错误处理
|
||
"""
|
||
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Optional
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
# 将项目根目录添加到 Python 路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from src.token_manager import TokenManager
|
||
from src2.config import Task2ConfigManager
|
||
from src2.logger import get_task2_logger
|
||
from src2.tapd_api import TAPDStoryApi, TERMINAL_STATUSES, StoryNotFoundException
|
||
from src2.smartsheet_sync import SmartSheetSync, REQUIRED_FIELDS
|
||
|
||
|
||
class CacheEntry:
|
||
"""TAPD查询缓存条目"""
|
||
def __init__(self, success: bool, data: Optional[Dict] = None,
|
||
error: Optional[Exception] = None):
|
||
self.success = success
|
||
self.data = data
|
||
self.error = error
|
||
self.timestamp = datetime.now()
|
||
|
||
|
||
class SyncService:
|
||
"""TAPD状态同步服务(支持多表格同步)"""
|
||
|
||
def __init__(self, config_manager: Task2ConfigManager = None,
|
||
access_token: str = None, test_mode: bool = False):
|
||
"""
|
||
初始化同步服务
|
||
|
||
Args:
|
||
config_manager: 配置管理器,如果为None则自动创建
|
||
access_token: 企业微信access_token,如果为None则自动获取
|
||
test_mode: 是否启用测试模式
|
||
"""
|
||
self.test_mode = test_mode
|
||
self.logger = get_task2_logger()
|
||
|
||
# 初始化配置管理器
|
||
if config_manager is None:
|
||
self.config_manager = Task2ConfigManager()
|
||
else:
|
||
self.config_manager = config_manager
|
||
|
||
# 获取配置
|
||
self.config = self.config_manager.get_all_config()
|
||
self.workspace_id = self.config['tapd']['workspace_id']
|
||
|
||
# 获取所有docid列表
|
||
self.docid_list = self.config['smartsheet']['docid_list']
|
||
print(f" 配置了 {len(self.docid_list)} 个智能表格")
|
||
|
||
# 获取access_token
|
||
if access_token is None:
|
||
token_manager = TokenManager(logger=self.logger)
|
||
self.access_token = token_manager.get_token()
|
||
else:
|
||
self.access_token = access_token
|
||
|
||
# 初始化TAPD API(所有表格共用)
|
||
self.tapd_api = TAPDStoryApi(self.workspace_id, test_mode=test_mode)
|
||
|
||
# 获取计划字段映射(每次同步时实时获取,所有表格共用)
|
||
print(f" 正在获取计划字段映射...")
|
||
try:
|
||
self.plan_mapping = self.tapd_api.get_plan_mapping()
|
||
print(f" ✓ 计划字段映射获取完成,共 {len(self.plan_mapping)} 个选项")
|
||
except Exception as e:
|
||
print(f" ⚠ 计划字段映射获取失败: {e},将使用空映射")
|
||
self.plan_mapping = {}
|
||
|
||
# TAPD查询缓存(在sync_once中初始化)
|
||
self._story_cache: Dict[str, CacheEntry] = {}
|
||
self._cache_stats = {
|
||
"total_queries": 0,
|
||
"cache_hits": 0,
|
||
"cache_misses": 0,
|
||
"api_calls": 0,
|
||
"cached_failures": 0
|
||
}
|
||
|
||
print(f" ✓ 同步服务初始化完成")
|
||
|
||
def _get_beijing_time_str(self) -> str:
|
||
"""
|
||
获取北京时间格式字符串(MM-dd HH:mm)
|
||
|
||
Returns:
|
||
str: 格式化的北京时间字符串,例如 "01-14 15:30"
|
||
"""
|
||
# 北京时间是 UTC+8
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
beijing_time = datetime.now(beijing_tz)
|
||
return beijing_time.strftime("%m-%d %H:%M")
|
||
|
||
def sync_once(self) -> Dict[str, Any]:
|
||
"""
|
||
执行一次完整的同步流程(遍历所有配置的智能表格)
|
||
|
||
Returns:
|
||
Dict: 同步结果统计
|
||
"""
|
||
# 初始化本次同步的缓存
|
||
self._story_cache = {}
|
||
self._cache_stats = {
|
||
"total_queries": 0,
|
||
"cache_hits": 0,
|
||
"cache_misses": 0,
|
||
"api_calls": 0,
|
||
"cached_failures": 0
|
||
}
|
||
|
||
result = {
|
||
"success": False,
|
||
"start_time": datetime.now().isoformat(),
|
||
"end_time": None,
|
||
"docs_total": len(self.docid_list),
|
||
"docs_success": 0,
|
||
"docs_failed": 0,
|
||
"sheets_processed": 0,
|
||
"sheets_skipped": 0,
|
||
"total_records": 0,
|
||
"records_with_link": 0,
|
||
"records_synced": 0,
|
||
"records_updated": 0,
|
||
"records_failed": 0,
|
||
"error_message": None,
|
||
"doc_results": [] # 每个表格的详细结果
|
||
}
|
||
|
||
all_failed_records = [] # 汇总所有表格的失败记录
|
||
|
||
# 遍历所有配置的智能表格
|
||
for doc_index, docid in enumerate(self.docid_list, 1):
|
||
doc_result = self._sync_single_doc(docid, doc_index, len(self.docid_list))
|
||
result["doc_results"].append(doc_result)
|
||
|
||
if doc_result["success"]:
|
||
result["docs_success"] += 1
|
||
# 累加统计数据
|
||
result["sheets_processed"] += doc_result["sheets_processed"]
|
||
result["sheets_skipped"] += doc_result["sheets_skipped"]
|
||
result["total_records"] += doc_result["total_records"]
|
||
result["records_with_link"] += doc_result["records_with_link"]
|
||
result["records_synced"] += doc_result["records_synced"]
|
||
result["records_updated"] += doc_result["records_updated"]
|
||
result["records_failed"] += doc_result["records_failed"]
|
||
# 收集失败记录
|
||
all_failed_records.extend(doc_result.get("all_failed_records", []))
|
||
else:
|
||
result["docs_failed"] += 1
|
||
|
||
# 判断整体是否成功(至少有一个表格成功)
|
||
result["success"] = result["docs_success"] > 0
|
||
|
||
# 发送失败通知(汇总所有表格的失败记录)
|
||
if all_failed_records:
|
||
self._send_failure_notification(all_failed_records)
|
||
|
||
# 记录缓存统计
|
||
self._log_cache_statistics()
|
||
result["cache_stats"] = self._cache_stats.copy()
|
||
|
||
result["end_time"] = datetime.now().isoformat()
|
||
return result
|
||
|
||
def _sync_single_doc(self, docid: str, doc_index: int, total_docs: int) -> Dict[str, Any]:
|
||
"""
|
||
同步单个智能表格
|
||
|
||
Args:
|
||
docid: 智能表格文档ID
|
||
doc_index: 当前表格序号(从1开始)
|
||
total_docs: 表格总数
|
||
|
||
Returns:
|
||
Dict: 单个表格的同步结果
|
||
"""
|
||
# 显示docid的前16个字符便于识别
|
||
display_id = docid[:16] + "..." if len(docid) > 16 else docid
|
||
|
||
print(f"\n{'#'*70}")
|
||
print(f"# [表格 {doc_index}/{total_docs}] docid: {display_id}")
|
||
print(f"{'#'*70}")
|
||
|
||
doc_result = {
|
||
"docid": docid,
|
||
"doc_index": doc_index,
|
||
"success": False,
|
||
"sheets_processed": 0,
|
||
"sheets_skipped": 0,
|
||
"total_records": 0,
|
||
"records_with_link": 0,
|
||
"records_synced": 0,
|
||
"records_updated": 0,
|
||
"records_failed": 0,
|
||
"error_message": None,
|
||
"sheet_results": [],
|
||
"all_failed_records": []
|
||
}
|
||
|
||
try:
|
||
# 为当前表格创建SmartSheetSync实例
|
||
smartsheet = SmartSheetSync(self.access_token, docid, test_mode=self.test_mode)
|
||
self.current_smartsheet = smartsheet # 供_process_sheet等方法使用
|
||
|
||
# 获取所有子表
|
||
print("\n正在获取子表列表...")
|
||
sheets = smartsheet.api.get_sheet_list()
|
||
print(f" ✓ 找到 {len(sheets)} 个子表")
|
||
|
||
# 处理每个子表
|
||
for sheet in sheets:
|
||
sheet_id = sheet.get('sheet_id', '')
|
||
sheet_title = sheet.get('title', '未命名')
|
||
|
||
sheet_result = self._process_sheet(sheet_id, sheet_title, doc_index)
|
||
doc_result["sheet_results"].append(sheet_result)
|
||
|
||
if sheet_result["skipped"]:
|
||
doc_result["sheets_skipped"] += 1
|
||
else:
|
||
doc_result["sheets_processed"] += 1
|
||
doc_result["total_records"] += sheet_result["total_records"]
|
||
doc_result["records_with_link"] += sheet_result["records_with_link"]
|
||
doc_result["records_synced"] += sheet_result["records_synced"]
|
||
doc_result["records_updated"] += sheet_result["records_updated"]
|
||
doc_result["records_failed"] += sheet_result["records_failed"]
|
||
|
||
# 收集失败记录(添加表格标识)
|
||
for failed in sheet_result.get("failed_records", []):
|
||
failed["doc_index"] = doc_index
|
||
failed["docid_short"] = display_id
|
||
doc_result["all_failed_records"].append(failed)
|
||
|
||
doc_result["success"] = True
|
||
print(f"\n✓ [表格 {doc_index}/{total_docs}] 同步完成")
|
||
|
||
except Exception as e:
|
||
doc_result["error_message"] = str(e)
|
||
print(f"\n✗ [表格 {doc_index}/{total_docs}] 同步失败: {e}")
|
||
|
||
if self.test_mode:
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
return doc_result
|
||
|
||
def _process_sheet(self, sheet_id: str, sheet_title: str, doc_index: int = 1) -> Dict[str, Any]:
|
||
"""
|
||
处理单个子表的同步
|
||
|
||
Args:
|
||
sheet_id: 子表ID
|
||
sheet_title: 子表标题
|
||
doc_index: 表格序号(用于日志显示)
|
||
|
||
Returns:
|
||
Dict: 子表处理结果
|
||
"""
|
||
print(f"\n{'='*60}")
|
||
print(f"处理子表: {sheet_title} (表格{doc_index})")
|
||
print(f"{'='*60}")
|
||
|
||
sheet_result = {
|
||
"sheet_id": sheet_id,
|
||
"sheet_title": sheet_title,
|
||
"skipped": False,
|
||
"skip_reason": None,
|
||
"total_records": 0,
|
||
"records_with_link": 0,
|
||
"records_synced": 0,
|
||
"records_updated": 0,
|
||
"records_failed": 0,
|
||
"details": [],
|
||
"failed_records": [] # 收集失败记录的详细信息
|
||
}
|
||
|
||
try:
|
||
# 1. 获取字段信息并检查必要字段
|
||
fields = self.current_smartsheet.api.get_fields(sheet_id)
|
||
all_present, missing_fields, field_mapping = self.current_smartsheet.check_required_fields(fields)
|
||
|
||
if not all_present:
|
||
sheet_result["skipped"] = True
|
||
sheet_result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
|
||
print(f" ⚠ 跳过此子表: {sheet_result['skip_reason']}")
|
||
return sheet_result
|
||
|
||
# 2. 获取所有记录(只获取一次,供新记录同步和持续同步共用)
|
||
print(f"正在获取所有记录...")
|
||
all_records = self.current_smartsheet.get_all_records(sheet_id)
|
||
|
||
# 3. 获取包含TAPD链接的新记录(同步状态为空)
|
||
records_with_link = self.current_smartsheet.get_records_with_tapd_link(
|
||
sheet_id, all_records=all_records
|
||
)
|
||
sheet_result["records_with_link"] = len(records_with_link)
|
||
|
||
# 4. 处理新记录
|
||
if records_with_link:
|
||
print(f"\n--- 新记录同步 ---")
|
||
success_records = [] # 成功同步的记录
|
||
failed_records_info = [] # 失败的记录信息列表(包含ID和状态)
|
||
|
||
for record_info in records_with_link:
|
||
record_result = self._process_record(record_info)
|
||
sheet_result["details"].append(record_result)
|
||
|
||
if record_result["success"]:
|
||
sheet_result["records_synced"] += 1
|
||
|
||
if record_result["update_record"]:
|
||
success_records.append(record_result["update_record"])
|
||
sheet_result["records_updated"] += 1
|
||
else:
|
||
sheet_result["records_failed"] += 1
|
||
|
||
# 收集失败记录的ID和对应的同步状态
|
||
failed_records_info.append({
|
||
"record_id": record_info["record_id"],
|
||
"sync_status": record_result.get("sync_status", "⚠️ 同步失败,请联系PM")
|
||
})
|
||
|
||
# 收集失败记录的详细信息用于推送
|
||
failed_record = {
|
||
"sheet_title": sheet_title,
|
||
"record_id": record_info["record_id"],
|
||
"tapd_link": record_info.get("tapd_link", "(无链接)"),
|
||
"error_message": record_result.get("error_message", "未知错误")
|
||
}
|
||
sheet_result["failed_records"].append(failed_record)
|
||
|
||
# 4. 批量回写成功记录
|
||
if success_records:
|
||
print(f"\n正在回写 {len(success_records)} 条成功记录...")
|
||
try:
|
||
self.current_smartsheet.batch_update_records(sheet_id, success_records)
|
||
print(f" ✓ 成功记录回写完成")
|
||
except Exception as e:
|
||
print(f" ✗ 成功记录回写失败: {e}")
|
||
|
||
# 5. 批量回写失败记录(根据不同失败原因回写不同状态)
|
||
if failed_records_info:
|
||
print(f"\n正在回写 {len(failed_records_info)} 条失败记录的状态...")
|
||
try:
|
||
failed_updates = [
|
||
self.current_smartsheet.build_update_record(
|
||
record_id=info["record_id"],
|
||
sync_status=info["sync_status"]
|
||
)
|
||
for info in failed_records_info
|
||
]
|
||
self.current_smartsheet.batch_update_records(sheet_id, failed_updates)
|
||
print(f" ✓ 失败记录状态回写完成")
|
||
except Exception as e:
|
||
print(f" ✗ 失败记录状态回写失败: {e}")
|
||
else:
|
||
print(f" ℹ 没有新记录需要同步")
|
||
|
||
# 7. 持续同步:更新已同步记录的最新状态
|
||
print(f"\n--- 持续同步 ---")
|
||
update_result = self._process_synced_records_update(sheet_id, all_records=all_records)
|
||
sheet_result["continuous_sync"] = update_result
|
||
|
||
sheet_result["total_records"] = len(records_with_link)
|
||
|
||
except Exception as e:
|
||
sheet_result["skipped"] = True
|
||
sheet_result["skip_reason"] = f"处理异常: {str(e)}"
|
||
print(f" ✗ 处理子表异常: {e}")
|
||
|
||
return sheet_result
|
||
|
||
def _get_story_with_cache(self, story_id: str) -> Dict:
|
||
"""
|
||
带缓存的获取需求详情
|
||
|
||
Args:
|
||
story_id: 需求ID
|
||
|
||
Returns:
|
||
Dict: 需求详细信息
|
||
|
||
Raises:
|
||
StoryNotFoundException: 需求不存在
|
||
Exception: 其他API错误
|
||
"""
|
||
self._cache_stats["total_queries"] += 1
|
||
|
||
# 检查缓存
|
||
if story_id in self._story_cache:
|
||
cache_entry = self._story_cache[story_id]
|
||
self._cache_stats["cache_hits"] += 1
|
||
|
||
if cache_entry.success:
|
||
# 缓存命中 - 成功记录
|
||
if self.test_mode:
|
||
print(f" [缓存命中] story_id={story_id}")
|
||
return cache_entry.data
|
||
else:
|
||
# 缓存命中 - 失败记录
|
||
self._cache_stats["cached_failures"] += 1
|
||
if self.test_mode:
|
||
print(f" [缓存命中-失败] story_id={story_id}")
|
||
raise cache_entry.error
|
||
|
||
# 缓存未命中,调用API
|
||
self._cache_stats["cache_misses"] += 1
|
||
self._cache_stats["api_calls"] += 1
|
||
|
||
if self.test_mode:
|
||
print(f" [API调用] story_id={story_id}")
|
||
|
||
try:
|
||
story_info = self.tapd_api.get_story(story_id)
|
||
# 缓存成功结果
|
||
self._story_cache[story_id] = CacheEntry(success=True, data=story_info)
|
||
return story_info
|
||
except StoryNotFoundException as e:
|
||
# 缓存确定性失败(单号无效)
|
||
self._story_cache[story_id] = CacheEntry(success=False, error=e)
|
||
raise
|
||
except Exception as e:
|
||
# 不缓存非确定性失败(网络错误、API限流等)
|
||
raise
|
||
|
||
def _process_record(self, record_info: Dict) -> Dict[str, Any]:
|
||
"""
|
||
处理单条记录的同步
|
||
|
||
Args:
|
||
record_info: 记录信息(包含解析结果)
|
||
|
||
Returns:
|
||
Dict: 记录处理结果
|
||
"""
|
||
record_result = {
|
||
"record_id": record_info["record_id"],
|
||
"tapd_link": record_info["tapd_link"],
|
||
"success": False,
|
||
"update_record": None,
|
||
"error_message": None,
|
||
"story_info": None,
|
||
"sync_status": None # 用于失败记录的状态回写
|
||
}
|
||
|
||
# 检查链接解析结果
|
||
if not record_info["parse_success"]:
|
||
record_result["error_message"] = record_info.get("parse_error", "链接解析失败")
|
||
record_result["sync_status"] = "❌ 单号无效"
|
||
# 记录链接解析失败日志
|
||
self.logger.log_api_call(
|
||
api_type="smartsheet",
|
||
operation="link_parse_failure",
|
||
request_data={
|
||
"record_id": record_info["record_id"],
|
||
"tapd_link": record_info["tapd_link"]
|
||
},
|
||
response_data={},
|
||
success=False,
|
||
error_message=record_result["error_message"]
|
||
)
|
||
return record_result
|
||
|
||
story_id = record_info["story_id"]
|
||
|
||
try:
|
||
# 查询TAPD获取需求信息(使用缓存)
|
||
story_info = self._get_story_with_cache(story_id)
|
||
record_result["story_info"] = story_info
|
||
|
||
# 提取需要同步的字段(None 转为空字符串)
|
||
status = story_info.get('status') or ''
|
||
owner = story_info.get('owner') or ''
|
||
begin_date = story_info.get('begin') or ''
|
||
due_date = story_info.get('due') or ''
|
||
|
||
# 提取发布计划字段并转换为中文名称
|
||
plan_id = story_info.get('release_id') or ''
|
||
plan_name = self.tapd_api.map_plan_id_to_name(plan_id)
|
||
|
||
# 获取当前字段值,判断是否需要更新
|
||
current_values = self.current_smartsheet.get_current_field_values(record_info["record"])
|
||
|
||
needs_update = self._check_needs_update(
|
||
current_values, status, owner, begin_date, due_date, plan_name
|
||
)
|
||
|
||
# 生成同步成功状态(包含时间戳)
|
||
time_str = self._get_beijing_time_str()
|
||
sync_status_success = f"✅ 同步成功 {time_str}"
|
||
|
||
# 构造更新记录(包含业务字段 + 同步状态=成功+时间戳)
|
||
# 即使业务字段没有变化,也要写入同步状态
|
||
update_record = self.current_smartsheet.build_update_record(
|
||
record_id=record_info["record_id"],
|
||
status=status,
|
||
owner=owner,
|
||
begin_date=begin_date,
|
||
due_date=due_date,
|
||
plan=plan_name,
|
||
sync_status=sync_status_success
|
||
)
|
||
record_result["update_record"] = update_record
|
||
|
||
record_result["success"] = True
|
||
|
||
except StoryNotFoundException as e:
|
||
# 单号无效(TAPD中未找到该需求)
|
||
record_result["error_message"] = str(e)
|
||
record_result["sync_status"] = "❌ 单号无效"
|
||
# 记录TAPD查询失败日志
|
||
self.logger.log_api_call(
|
||
api_type="tapd",
|
||
operation="sync_record_failure",
|
||
request_data={
|
||
"record_id": record_info["record_id"],
|
||
"story_id": story_id
|
||
},
|
||
response_data={},
|
||
success=False,
|
||
error_message=record_result["error_message"]
|
||
)
|
||
|
||
except Exception as e:
|
||
# 其他异常(API调用失败等)
|
||
record_result["error_message"] = str(e)
|
||
record_result["sync_status"] = "⚠️ 同步失败,请联系PM"
|
||
# 记录TAPD查询失败日志
|
||
self.logger.log_api_call(
|
||
api_type="tapd",
|
||
operation="sync_record_failure",
|
||
request_data={
|
||
"record_id": record_info["record_id"],
|
||
"story_id": story_id
|
||
},
|
||
response_data={},
|
||
success=False,
|
||
error_message=record_result["error_message"]
|
||
)
|
||
|
||
return record_result
|
||
|
||
def _check_needs_update(self, current_values: Dict,
|
||
status: str, owner: str,
|
||
begin_date: str, due_date: str,
|
||
plan: str = "") -> bool:
|
||
"""
|
||
检查是否需要更新记录
|
||
|
||
Args:
|
||
current_values: 当前字段值
|
||
status: 新状态
|
||
owner: 新处理人
|
||
begin_date: 新开始日期
|
||
due_date: 新结束日期
|
||
plan: 新计划
|
||
|
||
Returns:
|
||
bool: 是否需要更新
|
||
"""
|
||
# 提取当前值的文本内容
|
||
def extract_text(value):
|
||
if value is None:
|
||
return ""
|
||
if isinstance(value, str):
|
||
return value
|
||
if isinstance(value, list) and len(value) > 0:
|
||
first = value[0]
|
||
if isinstance(first, dict):
|
||
return first.get('text', '')
|
||
return str(first)
|
||
if isinstance(value, dict):
|
||
return value.get('text', '')
|
||
return str(value)
|
||
|
||
current_status = extract_text(current_values.get('TAPD状态'))
|
||
current_owner = extract_text(current_values.get('处理人'))
|
||
current_begin = extract_text(current_values.get('TAPD预计开始日期'))
|
||
current_due = extract_text(current_values.get('TAPD预计完成日期'))
|
||
current_plan = extract_text(current_values.get('计划'))
|
||
|
||
# 比较是否有变化
|
||
if current_status != status:
|
||
return True
|
||
if current_owner != owner:
|
||
return True
|
||
if current_begin != begin_date:
|
||
return True
|
||
if current_due != due_date:
|
||
return True
|
||
if current_plan != plan:
|
||
return True
|
||
|
||
return False
|
||
|
||
def _process_synced_records_update(self, sheet_id: str,
|
||
all_records: List[Dict] = None) -> Dict[str, Any]:
|
||
"""
|
||
处理已同步记录的状态更新(持续同步)
|
||
|
||
Args:
|
||
sheet_id: 子表ID
|
||
all_records: 可选,已获取的所有记录列表
|
||
|
||
Returns:
|
||
Dict: 更新结果统计
|
||
"""
|
||
result = {
|
||
"checked": 0,
|
||
"updated": 0,
|
||
"failed": 0,
|
||
}
|
||
|
||
# 获取需要持续同步的记录
|
||
records = self.current_smartsheet.get_synced_records_for_update(
|
||
sheet_id, TERMINAL_STATUSES, all_records=all_records
|
||
)
|
||
|
||
if not records:
|
||
print(f" ℹ 没有需要持续同步的记录")
|
||
return result
|
||
|
||
result["checked"] = len(records)
|
||
updates = []
|
||
|
||
for record_info in records:
|
||
try:
|
||
# 查询TAPD最新状态(使用缓存)
|
||
story_info = self._get_story_with_cache(record_info["story_id"])
|
||
|
||
# 提取最新字段值(None 转为空字符串)
|
||
new_status = story_info.get('status') or ''
|
||
new_owner = story_info.get('owner') or ''
|
||
new_begin = story_info.get('begin') or ''
|
||
new_due = story_info.get('due') or ''
|
||
plan_id = story_info.get('release_id') or ''
|
||
new_plan = self.tapd_api.map_plan_id_to_name(plan_id)
|
||
|
||
# 获取当前值并比较
|
||
current = self.current_smartsheet.get_current_field_values(record_info["record"])
|
||
|
||
# 调试:打印当前值和新值(含类型)
|
||
print(f"\n [DEBUG] 记录 {record_info['record_id']} 字段比较:")
|
||
print(f" 结束日期: 当前='{current.get('TAPD预计完成日期')}' (type={type(current.get('TAPD预计完成日期'))}) vs 新='{new_due}' (type={type(new_due)})")
|
||
|
||
needs_update = self._check_needs_update(
|
||
current, new_status, new_owner, new_begin, new_due, new_plan
|
||
)
|
||
print(f" needs_update = {needs_update}")
|
||
|
||
if needs_update:
|
||
# 生成同步成功状态(包含时间戳)
|
||
time_str = self._get_beijing_time_str()
|
||
sync_status_success = f"✅ 同步成功 {time_str}"
|
||
|
||
update_record = self.current_smartsheet.build_update_record(
|
||
record_id=record_info["record_id"],
|
||
status=new_status,
|
||
owner=new_owner,
|
||
begin_date=new_begin,
|
||
due_date=new_due,
|
||
plan=new_plan,
|
||
sync_status=sync_status_success
|
||
)
|
||
updates.append(update_record)
|
||
|
||
except Exception as e:
|
||
result["failed"] += 1
|
||
error_msg = str(e)
|
||
# 记录持续同步失败日志
|
||
self.logger.log_api_call(
|
||
api_type="tapd",
|
||
operation="continuous_sync_failure",
|
||
request_data={
|
||
"record_id": record_info["record_id"],
|
||
"story_id": record_info["story_id"]
|
||
},
|
||
response_data={},
|
||
success=False,
|
||
error_message=error_msg
|
||
)
|
||
if self.test_mode:
|
||
print(f" ✗ 记录 {record_info['record_id']} 更新失败: {e}")
|
||
|
||
# 批量更新
|
||
if updates:
|
||
print(f" 正在更新 {len(updates)} 条记录...")
|
||
self.current_smartsheet.batch_update_records(sheet_id, updates)
|
||
result["updated"] = len(updates)
|
||
print(f" ✓ 持续同步更新完成")
|
||
else:
|
||
print(f" ✓ 所有记录状态均未变化")
|
||
|
||
return result
|
||
|
||
def _log_cache_statistics(self):
|
||
"""记录缓存统计信息到日志"""
|
||
stats = self._cache_stats
|
||
|
||
if stats["total_queries"] == 0:
|
||
return
|
||
|
||
hit_rate = (stats["cache_hits"] / stats["total_queries"]) * 100
|
||
|
||
print(f"\n{'='*60}")
|
||
print(f"TAPD查询缓存统计")
|
||
print(f"{'='*60}")
|
||
print(f" 总查询次数: {stats['total_queries']}")
|
||
print(f" 缓存命中: {stats['cache_hits']} ({hit_rate:.1f}%)")
|
||
print(f" 缓存未命中: {stats['cache_misses']}")
|
||
print(f" 实际API调用: {stats['api_calls']}")
|
||
print(f" 缓存失败记录命中: {stats['cached_failures']}")
|
||
print(f" 缓存条目数: {len(self._story_cache)}")
|
||
print(f"{'='*60}\n")
|
||
|
||
# 记录到日志文件
|
||
self.logger.log_api_call(
|
||
api_type="tapd",
|
||
operation="cache_statistics",
|
||
request_data={},
|
||
response_data=stats,
|
||
success=True,
|
||
error_message=None
|
||
)
|
||
|
||
def _send_failure_notification(self, failed_records: List[Dict]) -> None:
|
||
"""
|
||
发送同步失败通知
|
||
|
||
Args:
|
||
failed_records: 失败记录列表
|
||
"""
|
||
try:
|
||
# 获取企业微信配置
|
||
wework_config = self.config.get('wework')
|
||
|
||
if not wework_config:
|
||
print(" ℹ 未配置企业微信推送,跳过失败通知")
|
||
return
|
||
|
||
agentid = wework_config.get('agentid')
|
||
receivers = wework_config.get('receivers')
|
||
|
||
if not agentid or not receivers:
|
||
print(" ⚠ 企业微信推送配置不完整,跳过失败通知")
|
||
return
|
||
|
||
# 发送推送通知
|
||
print(f"\n正在发送同步失败通知...")
|
||
from src2.notifier import send_sync_failure_notification
|
||
|
||
success = send_sync_failure_notification(
|
||
self.access_token,
|
||
agentid,
|
||
receivers,
|
||
failed_records
|
||
)
|
||
|
||
if success:
|
||
print(f" ✓ 失败通知已发送")
|
||
else:
|
||
print(f" ✗ 失败通知发送失败")
|
||
|
||
except Exception as e:
|
||
print(f" ✗ 发送失败通知时出错: {e}")
|
||
if self.test_mode:
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
|
||
def run_once(config_manager: Task2ConfigManager = None,
|
||
access_token: str = None,
|
||
test_mode: bool = False) -> Dict[str, Any]:
|
||
"""
|
||
执行一次同步流程(供调度器调用)
|
||
|
||
Args:
|
||
config_manager: 配置管理器
|
||
access_token: access_token
|
||
test_mode: 测试模式
|
||
|
||
Returns:
|
||
Dict: 同步结果
|
||
"""
|
||
service = SyncService(
|
||
config_manager=config_manager,
|
||
access_token=access_token,
|
||
test_mode=test_mode
|
||
)
|
||
|
||
logger = service.logger
|
||
started_sync_here = False
|
||
if not logger.get_active_sync_id():
|
||
logger.start_sync(
|
||
trigger="task2_run_once_manual",
|
||
metadata={
|
||
"entry": "src2/sync_service.py:run_once",
|
||
"test_mode": test_mode,
|
||
},
|
||
)
|
||
started_sync_here = True
|
||
|
||
try:
|
||
result = service.sync_once()
|
||
if started_sync_here:
|
||
logger.end_sync_with_stats(
|
||
stats={
|
||
"docs_total": result.get("docs_total", 0),
|
||
"docs_success": result.get("docs_success", 0),
|
||
"docs_failed": result.get("docs_failed", 0),
|
||
"sheets_processed": result.get("sheets_processed", 0),
|
||
"sheets_skipped": result.get("sheets_skipped", 0),
|
||
"total_records": result.get("total_records", 0),
|
||
"records_with_link": result.get("records_with_link", 0),
|
||
"records_synced": result.get("records_synced", 0),
|
||
"records_updated": result.get("records_updated", 0),
|
||
"records_failed": result.get("records_failed", 0),
|
||
},
|
||
success=result.get("success", False),
|
||
error_message=result.get("error_message"),
|
||
extra={"source": "task2_run_once_manual"},
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
if started_sync_here and logger.get_active_sync_id():
|
||
logger.end_sync_with_stats(
|
||
stats={},
|
||
success=False,
|
||
error_message=str(e),
|
||
extra={
|
||
"source": "task2_run_once_manual",
|
||
"exception_type": type(e).__name__,
|
||
},
|
||
)
|
||
raise
|
||
|
||
if __name__ == "__main__":
|
||
print("=== 任务二同步服务测试 ===\n")
|
||
print("请使用 main.py 或 scheduler.py 运行同步服务")
|
||
print("或使用 test_phase4.py 进行测试")
|