task2-phase4: 定时同步回写

This commit is contained in:
zelong 2026-01-09 11:45:03 +08:00
parent d984828777
commit 7cdd3e9390
6 changed files with 1190 additions and 11 deletions

156
src2/main.py Normal file
View File

@ -0,0 +1,156 @@
"""
任务二主程序入口
TAPD状态实时同步至腾讯智能表格
功能
1. 命令行参数解析
2. 单次执行模式
3. 执行结果统计
"""
import sys
import argparse
from pathlib import Path
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.sync_service import run_once
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='TAPD状态同步工具 - 将TAPD需求状态同步到腾讯智能表格',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
# 执行一次同步
python src2/main.py
# 指定配置文件
python src2/main.py --config /path/to/config.ini
# 测试模式(显示详细信息)
python src2/main.py --test
# 手动传入access_token
python src2/main.py --token YOUR_ACCESS_TOKEN
"""
)
parser.add_argument(
'-c', '--config',
default=None,
help='配置文件路径(默认: config/config_task2.ini'
)
parser.add_argument(
'-t', '--token',
default=None,
help='手动传入access_token默认自动获取'
)
parser.add_argument(
'--test',
action='store_true',
help='启用测试模式显示详细的API调用信息'
)
return parser.parse_args()
def print_result_summary(result: dict):
"""打印执行结果摘要"""
print("\n" + "=" * 60)
print("执行结果摘要")
print("=" * 60)
if result["success"]:
print("状态: ✓ 成功")
else:
print("状态: ✗ 失败")
if result.get("error_message"):
print(f"错误: {result['error_message']}")
print(f"\n子表统计:")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f"\n记录统计:")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
print(f" 同步失败: {result['records_failed']}")
# 显示各子表详情
if result.get("sheet_results"):
print(f"\n子表详情:")
for sheet in result["sheet_results"]:
status = "跳过" if sheet["skipped"] else "完成"
print(f" - {sheet['sheet_title']}: {status}")
if sheet["skipped"]:
print(f" 原因: {sheet['skip_reason']}")
else:
print(f" 链接: {sheet['records_with_link']} | "
f"更新: {sheet['records_updated']} | "
f"失败: {sheet['records_failed']}")
print("=" * 60)
def main():
"""主函数"""
print("\n" + "=" * 60)
print("TAPD状态同步工具 (任务二)")
print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
try:
# 解析命令行参数
args = parse_arguments()
# 初始化配置管理器
print("\n正在加载配置...")
config_manager = Task2ConfigManager(config_path=args.config)
config_manager.print_config()
# 获取access_token
access_token = args.token
if access_token is None:
print("正在获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" ✓ access_token获取成功")
# 执行同步
print("\n开始执行同步...")
result = run_once(
config_manager=config_manager,
access_token=access_token,
test_mode=args.test
)
# 打印结果摘要
print_result_summary(result)
# 返回状态码
return 0 if result["success"] else 1
except KeyboardInterrupt:
print("\n\n用户中断执行")
return 130
except Exception as e:
print(f"\n✗ 执行失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

354
src2/scheduler.py Normal file
View File

@ -0,0 +1,354 @@
"""
任务二调度器模块
负责定时执行TAPD状态同步任务
功能
1. 按配置频率定时执行
2. 优雅退出Ctrl+C
3. 运行统计
"""
import sys
import time
import signal
import argparse
from pathlib import Path
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 导入schedule库
try:
import schedule
except ImportError:
print("错误: 缺少schedule库")
print("请运行: pip install schedule")
sys.exit(1)
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.sync_service import run_once
class Task2Scheduler:
"""任务二调度器"""
def __init__(self, config_path=None, verbose=False):
"""
初始化调度器
Args:
config_path: 配置文件路径
verbose: 是否显示详细信息
"""
self.config_path = config_path
self.verbose = verbose
self.running = True
# 统计信息
self.stats = {
'start_time': None,
'total_runs': 0,
'success_runs': 0,
'failed_runs': 0,
'total_records_synced': 0,
'total_records_updated': 0,
'last_run_time': None
}
# 初始化配置管理器
self._init_config()
# 初始化TokenManager
self._init_token_manager()
# 获取调度配置
self.sync_interval = self.config['schedule']['sync_interval']
def _init_config(self):
"""初始化配置"""
try:
print("正在加载配置文件...")
self.config_manager = Task2ConfigManager(config_path=self.config_path)
self.config = self.config_manager.get_all_config()
print("✓ 配置文件加载成功")
except Exception as e:
print(f"✗ 配置文件加载失败: {e}")
raise
def _init_token_manager(self):
"""初始化TokenManager"""
try:
print("正在初始化TokenManager...")
self.token_manager = TokenManager()
print("✓ TokenManager初始化成功")
except Exception as e:
print(f"✗ TokenManager初始化失败: {e}")
raise
def job(self):
"""执行一次同步任务"""
print("\n" + "=" * 80)
print(f"开始执行同步任务 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
try:
# 获取access_token
access_token = self.token_manager.get_token()
# 执行一次同步流程
result = run_once(
config_manager=self.config_manager,
access_token=access_token,
test_mode=self.verbose
)
# 更新统计信息
self.stats['total_runs'] += 1
self.stats['last_run_time'] = datetime.now()
if result['success']:
self.stats['success_runs'] += 1
self.stats['total_records_synced'] += result['records_synced']
self.stats['total_records_updated'] += result['records_updated']
print("\n" + "-" * 80)
print("本次执行统计:")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
print(f" 同步失败: {result['records_failed']}")
print("-" * 80)
else:
self.stats['failed_runs'] += 1
print("\n" + "-" * 80)
print("本次执行失败:")
print(f" 错误信息: {result.get('error_message', '未知错误')}")
print("-" * 80)
except Exception as e:
self.stats['total_runs'] += 1
self.stats['failed_runs'] += 1
self.stats['last_run_time'] = datetime.now()
print("\n" + "-" * 80)
print("本次执行异常:")
print(f" 错误类型: {type(e).__name__}")
print(f" 错误信息: {e}")
print("-" * 80)
if self.verbose:
import traceback
print("\n详细错误信息:")
traceback.print_exc()
# 显示下次执行时间
self._show_next_run_time()
def _show_next_run_time(self):
"""显示下次执行时间"""
next_run = schedule.idle_seconds()
if next_run is not None:
next_run_time = datetime.now().timestamp() + next_run
next_run_str = datetime.fromtimestamp(next_run_time).strftime('%Y-%m-%d %H:%M:%S')
print(f"\n下次执行时间: {next_run_str} (约 {int(next_run / 60)} 分钟后)")
def _startup_check(self):
"""启动检查"""
print("\n" + "=" * 80)
print("启动检查")
print("=" * 80)
checks_passed = True
# 1. 检查配置文件
print("\n[1/3] 检查配置文件...")
try:
print(f" ✓ 配置文件已加载: {self.config_path or '默认路径'}")
print(f" ✓ TAPD workspace_id: {self.config['tapd']['workspace_id']}")
print(f" ✓ SmartSheet docid: {self.config['smartsheet']['docid'][:20]}...")
print(f" ✓ 同步间隔: {self.sync_interval} 分钟")
except Exception as e:
print(f" ✗ 配置检查失败: {e}")
checks_passed = False
# 2. 检查环境变量
print("\n[2/3] 检查环境变量...")
import os
required_env_vars = {
'CORPID': '企业微信CorpID',
'CORPSECRET': '企业微信CorpSecret',
'TAPD_API_USER': 'TAPD API用户名',
'TAPD_API_PASSWORD': 'TAPD API密码'
}
for env_var, description in required_env_vars.items():
if os.environ.get(env_var):
print(f"{description} ({env_var}): 已设置")
else:
print(f"{description} ({env_var}): 未设置")
checks_passed = False
# 3. 测试access_token获取
print("\n[3/3] 测试access_token获取...")
try:
access_token = self.token_manager.get_token()
print(f" ✓ access_token获取成功: {access_token[:10]}...(已隐藏)")
except Exception as e:
print(f" ✗ access_token获取失败: {e}")
checks_passed = False
print("\n" + "=" * 80)
if checks_passed:
print("启动检查通过 ✓")
else:
print("启动检查失败 ✗")
print("\n请检查上述错误并修复后重试")
return False
print("=" * 80)
return True
def start(self):
"""启动调度器"""
print("\n" + "=" * 80)
print("TAPD状态同步调度器 (任务二)")
print("版本: 1.0.0 (第四阶段)")
print("=" * 80)
# 执行启动检查
if not self._startup_check():
print("\n调度器启动失败")
sys.exit(1)
# 记录启动时间
self.stats['start_time'] = datetime.now()
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# 配置定时任务
schedule.every(self.sync_interval).minutes.do(self.job)
print(f"\n调度器已启动:")
print(f" - 同步任务: 立即执行一次,然后每 {self.sync_interval} 分钟执行一次")
print("按 Ctrl+C 停止调度器")
print()
# 立即执行一次同步任务
self.job()
# 进入调度循环
while self.running:
schedule.run_pending()
time.sleep(1)
def _signal_handler(self, signum, frame):
"""信号处理函数"""
print("\n\n" + "=" * 80)
print("收到停止信号,正在优雅退出...")
print("=" * 80)
self.running = False
self._print_final_stats()
def _print_final_stats(self):
"""打印最终统计信息"""
print("\n" + "=" * 80)
print("运行统计")
print("=" * 80)
if self.stats['start_time']:
start_time_str = self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')
print(f"启动时间: {start_time_str}")
if self.stats['last_run_time']:
last_run_str = self.stats['last_run_time'].strftime('%Y-%m-%d %H:%M:%S')
print(f"最后执行: {last_run_str}")
if self.stats['start_time']:
running_time = datetime.now() - self.stats['start_time']
hours = int(running_time.total_seconds() // 3600)
minutes = int((running_time.total_seconds() % 3600) // 60)
print(f"运行时长: {hours} 小时 {minutes} 分钟")
print(f"\n同步任务统计:")
print(f" 总执行次数: {self.stats['total_runs']}")
print(f" 成功次数: {self.stats['success_runs']}")
print(f" 失败次数: {self.stats['failed_runs']}")
print(f" 总同步记录: {self.stats['total_records_synced']}")
print(f" 总更新记录: {self.stats['total_records_updated']}")
if self.stats['total_runs'] > 0:
success_rate = (self.stats['success_runs'] / self.stats['total_runs']) * 100
print(f" 成功率: {success_rate:.1f}%")
print("=" * 80)
print("调度器已停止")
print("=" * 80)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='TAPD状态同步调度器 - 定时执行同步任务',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
# 使用默认配置启动调度器
python src2/scheduler.py
# 指定配置文件路径
python src2/scheduler.py --config /path/to/config.ini
# 显示详细信息
python src2/scheduler.py --verbose
"""
)
parser.add_argument(
'-c', '--config',
default=None,
help='配置文件路径(默认: config/config_task2.ini'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='显示详细输出信息'
)
return parser.parse_args()
def main():
"""主函数"""
try:
# 解析命令行参数
args = parse_arguments()
# 创建并启动调度器
scheduler = Task2Scheduler(
config_path=args.config,
verbose=args.verbose
)
scheduler.start()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
print(f"\n✗ 调度器启动失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -31,6 +31,7 @@ FIELD_TAPD_STATUS = "TAPD状态" # 工具回写
FIELD_OWNER = "处理人" # 工具回写
FIELD_BEGIN_DATE = "TAPD预计开始日期" # 工具回写
FIELD_DUE_DATE = "TAPD预计完成日期" # 工具回写
FIELD_SYNC_STATUS = "同步状态" # 工具回写,标记同步结果
# 必要字段列表
REQUIRED_FIELDS = [
@ -39,6 +40,7 @@ REQUIRED_FIELDS = [
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
FIELD_SYNC_STATUS,
]
@ -161,7 +163,7 @@ class SmartSheetSync:
def build_update_record(self, record_id: str, status: str = None,
owner: str = None, begin_date: str = None,
due_date: str = None) -> Dict:
due_date: str = None, sync_status: str = None) -> Dict:
"""
构造更新记录的数据结构
@ -171,24 +173,29 @@ class SmartSheetSync:
owner: 处理人
begin_date: 预计开始日期
due_date: 预计完成日期
sync_status: 同步状态"成功" "失败"
Returns:
Dict: 更新记录的数据结构
"""
values = {}
# 只添加非空的字段
if status is not None:
values[FIELD_TAPD_STATUS] = [{"text": status}]
# 只添加非空的字段,每个字段值需要包含 type 和 text
# 跳过 None 和空字符串
if status is not None and status != "":
values[FIELD_TAPD_STATUS] = [{"type": "text", "text": status}]
if owner is not None:
values[FIELD_OWNER] = [{"text": owner}]
if owner is not None and owner != "":
values[FIELD_OWNER] = [{"type": "text", "text": owner}]
if begin_date is not None:
values[FIELD_BEGIN_DATE] = [{"text": begin_date}]
if begin_date is not None and begin_date != "":
values[FIELD_BEGIN_DATE] = [{"type": "text", "text": begin_date}]
if due_date is not None:
values[FIELD_DUE_DATE] = [{"text": due_date}]
if due_date is not None and due_date != "":
values[FIELD_DUE_DATE] = [{"type": "text", "text": due_date}]
if sync_status is not None and sync_status != "":
values[FIELD_SYNC_STATUS] = [{"type": "text", "text": sync_status}]
return {
"record_id": record_id,
@ -197,7 +204,7 @@ class SmartSheetSync:
def batch_update_records(self, sheet_id: str, update_records: List[Dict]) -> Dict:
"""
批量回写状态信息
批量回写状态信息使用任务一的API带debug参数
Args:
sheet_id: 子表ID
@ -210,6 +217,7 @@ class SmartSheetSync:
print(" ⚠ 没有需要更新的记录")
return {"records": []}
# 直接使用任务一的 update_records 方法已添加debug=1
return self.api.update_records(sheet_id, update_records)
def get_records_with_tapd_link(self, sheet_id: str) -> List[Dict]:
@ -232,6 +240,7 @@ class SmartSheetSync:
all_records = self.get_all_records(sheet_id)
records_with_link = []
skipped_synced_count = 0 # 统计跳过的已同步记录数
for record in all_records:
tapd_link = self.extract_tapd_link(record)
@ -239,6 +248,12 @@ class SmartSheetSync:
if not tapd_link:
continue
# 检查同步状态字段,如果不为空则跳过
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
if sync_status is not None and sync_status != "":
skipped_synced_count += 1
continue
record_id = record.get('record_id', '')
# 解析链接
@ -265,6 +280,8 @@ class SmartSheetSync:
fail_count = len(records_with_link) - success_count
print(f" ✓ 找到 {len(records_with_link)} 条包含TAPD链接的记录")
if skipped_synced_count > 0:
print(f" - 跳过已同步记录: {skipped_synced_count}")
print(f" - 链接解析成功: {success_count}")
if fail_count > 0:
print(f" - 链接解析失败: {fail_count}")

362
src2/sync_service.py Normal file
View File

@ -0,0 +1,362 @@
"""
任务二同步服务模块
整合链接解析TAPD查询表格回写实现完整的同步流程
功能
1. 单次同步流程
2. 执行统计
3. 错误处理
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.logger import get_task2_logger
from src2.tapd_api import TAPDStoryApi
from src2.smartsheet_sync import SmartSheetSync, REQUIRED_FIELDS
class SyncService:
"""TAPD状态同步服务"""
def __init__(self, config_manager: Task2ConfigManager = None,
access_token: str = None, test_mode: bool = False):
"""
初始化同步服务
Args:
config_manager: 配置管理器如果为None则自动创建
access_token: 企业微信access_token如果为None则自动获取
test_mode: 是否启用测试模式
"""
self.test_mode = test_mode
self.logger = get_task2_logger()
# 初始化配置管理器
if config_manager is None:
self.config_manager = Task2ConfigManager()
else:
self.config_manager = config_manager
# 获取配置
self.config = self.config_manager.get_all_config()
self.workspace_id = self.config['tapd']['workspace_id']
self.docid = self.config['smartsheet']['docid']
# 获取access_token
if access_token is None:
token_manager = TokenManager()
self.access_token = token_manager.get_token()
else:
self.access_token = access_token
# 初始化API模块
self.tapd_api = TAPDStoryApi(self.workspace_id, test_mode=test_mode)
self.smartsheet = SmartSheetSync(self.access_token, self.docid, test_mode=test_mode)
print(f" ✓ 同步服务初始化完成")
def sync_once(self) -> Dict[str, Any]:
"""
执行一次完整的同步流程
Returns:
Dict: 同步结果统计
"""
result = {
"success": False,
"start_time": datetime.now().isoformat(),
"end_time": None,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"sheet_results": []
}
try:
# 1. 获取所有子表
print("\n正在获取子表列表...")
sheets = self.smartsheet.api.get_sheet_list()
print(f" ✓ 找到 {len(sheets)} 个子表")
# 2. 处理每个子表
for sheet in sheets:
sheet_id = sheet.get('sheet_id', '')
sheet_title = sheet.get('title', '未命名')
sheet_result = self._process_sheet(sheet_id, sheet_title)
result["sheet_results"].append(sheet_result)
if sheet_result["skipped"]:
result["sheets_skipped"] += 1
else:
result["sheets_processed"] += 1
result["total_records"] += sheet_result["total_records"]
result["records_with_link"] += sheet_result["records_with_link"]
result["records_synced"] += sheet_result["records_synced"]
result["records_updated"] += sheet_result["records_updated"]
result["records_failed"] += sheet_result["records_failed"]
result["success"] = True
except Exception as e:
result["error_message"] = str(e)
print(f"\n✗ 同步失败: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
result["end_time"] = datetime.now().isoformat()
return result
def _process_sheet(self, sheet_id: str, sheet_title: str) -> Dict[str, Any]:
"""
处理单个子表的同步
Args:
sheet_id: 子表ID
sheet_title: 子表标题
Returns:
Dict: 子表处理结果
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title}")
print(f"{'='*60}")
sheet_result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"details": []
}
try:
# 1. 获取字段信息并检查必要字段
fields = self.smartsheet.api.get_fields(sheet_id)
all_present, missing_fields, field_mapping = self.smartsheet.check_required_fields(fields)
if not all_present:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {sheet_result['skip_reason']}")
return sheet_result
# 2. 获取包含TAPD链接的记录
records_with_link = self.smartsheet.get_records_with_tapd_link(sheet_id)
sheet_result["records_with_link"] = len(records_with_link)
if not records_with_link:
print(f" 没有包含TAPD链接的记录")
return sheet_result
# 3. 处理每条记录
success_records = [] # 成功同步的记录(包含业务字段 + 同步状态)
failed_record_ids = [] # 失败的记录ID列表
for record_info in records_with_link:
record_result = self._process_record(record_info)
sheet_result["details"].append(record_result)
if record_result["success"]:
sheet_result["records_synced"] += 1
if record_result["update_record"]:
success_records.append(record_result["update_record"])
sheet_result["records_updated"] += 1
else:
sheet_result["records_failed"] += 1
failed_record_ids.append(record_info["record_id"])
# 4. 批量回写成功记录(业务字段 + 同步状态=成功)
if success_records:
print(f"\n正在回写 {len(success_records)} 条成功记录...")
try:
self.smartsheet.batch_update_records(sheet_id, success_records)
print(f" ✓ 成功记录回写完成")
except Exception as e:
print(f" ✗ 成功记录回写失败: {e}")
# 5. 批量回写失败记录(只写入同步状态=失败)
if failed_record_ids:
print(f"\n正在回写 {len(failed_record_ids)} 条失败记录的状态...")
try:
failed_updates = [
self.smartsheet.build_update_record(
record_id=record_id,
sync_status="失败"
)
for record_id in failed_record_ids
]
self.smartsheet.batch_update_records(sheet_id, failed_updates)
print(f" ✓ 失败记录状态回写完成")
except Exception as e:
print(f" ✗ 失败记录状态回写失败: {e}")
sheet_result["total_records"] = len(records_with_link)
except Exception as e:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"处理异常: {str(e)}"
print(f" ✗ 处理子表异常: {e}")
return sheet_result
def _process_record(self, record_info: Dict) -> Dict[str, Any]:
"""
处理单条记录的同步
Args:
record_info: 记录信息包含解析结果
Returns:
Dict: 记录处理结果
"""
record_result = {
"record_id": record_info["record_id"],
"tapd_link": record_info["tapd_link"],
"success": False,
"update_record": None,
"error_message": None,
"story_info": None
}
# 检查链接解析结果
if not record_info["parse_success"]:
record_result["error_message"] = record_info.get("parse_error", "链接解析失败")
return record_result
story_id = record_info["story_id"]
try:
# 查询TAPD获取需求信息
story_info = self.tapd_api.get_story(story_id)
record_result["story_info"] = story_info
# 提取需要同步的字段
status = story_info.get('status', '')
owner = story_info.get('owner', '')
begin_date = story_info.get('begin', '')
due_date = story_info.get('due', '')
# 获取当前字段值,判断是否需要更新
current_values = self.smartsheet.get_current_field_values(record_info["record"])
needs_update = self._check_needs_update(
current_values, status, owner, begin_date, due_date
)
# 构造更新记录(包含业务字段 + 同步状态=成功)
# 即使业务字段没有变化,也要写入同步状态
update_record = self.smartsheet.build_update_record(
record_id=record_info["record_id"],
status=status,
owner=owner,
begin_date=begin_date,
due_date=due_date,
sync_status="成功"
)
record_result["update_record"] = update_record
record_result["success"] = True
except Exception as e:
record_result["error_message"] = str(e)
return record_result
def _check_needs_update(self, current_values: Dict,
status: str, owner: str,
begin_date: str, due_date: str) -> bool:
"""
检查是否需要更新记录
Args:
current_values: 当前字段值
status: 新状态
owner: 新处理人
begin_date: 新开始日期
due_date: 新结束日期
Returns:
bool: 是否需要更新
"""
# 提取当前值的文本内容
def extract_text(value):
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, list) and len(value) > 0:
first = value[0]
if isinstance(first, dict):
return first.get('text', '')
return str(first)
if isinstance(value, dict):
return value.get('text', '')
return str(value)
current_status = extract_text(current_values.get('TAPD状态'))
current_owner = extract_text(current_values.get('处理人'))
current_begin = extract_text(current_values.get('TAPD预计开始日期'))
current_due = extract_text(current_values.get('TAPD预计完成日期'))
# 比较是否有变化
if current_status != status:
return True
if current_owner != owner:
return True
if current_begin != begin_date:
return True
if current_due != due_date:
return True
return False
def run_once(config_manager: Task2ConfigManager = None,
access_token: str = None,
test_mode: bool = False) -> Dict[str, Any]:
"""
执行一次同步流程供调度器调用
Args:
config_manager: 配置管理器
access_token: access_token
test_mode: 测试模式
Returns:
Dict: 同步结果
"""
service = SyncService(
config_manager=config_manager,
access_token=access_token,
test_mode=test_mode
)
return service.sync_once()
if __name__ == "__main__":
print("=== 任务二同步服务测试 ===\n")
print("请使用 main.py 或 scheduler.py 运行同步服务")
print("或使用 test_phase4.py 进行测试")

160
src2/test_phase4.py Normal file
View File

@ -0,0 +1,160 @@
"""
任务二第四阶段验证脚本
验证同步服务与主程序的完整功能
验证项
1. 同步服务初始化
2. 单次同步流程
3. 调度器初始化
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_sync_service_init():
"""测试1: 同步服务初始化"""
print("\n" + "=" * 60)
print("测试1: 同步服务初始化")
print("=" * 60)
try:
from src2.sync_service import SyncService
from src2.config import Task2ConfigManager
# 初始化配置
config_manager = Task2ConfigManager()
print("✓ 配置管理器初始化成功")
# 初始化同步服务(测试模式)
service = SyncService(
config_manager=config_manager,
test_mode=True
)
print("✓ 同步服务初始化成功")
# 验证属性
assert service.workspace_id is not None
assert service.docid is not None
assert service.access_token is not None
print("✓ 服务属性验证通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_sync_once():
"""测试2: 单次同步流程"""
print("\n" + "=" * 60)
print("测试2: 单次同步流程")
print("=" * 60)
try:
from src2.sync_service import run_once
# 执行一次同步
print("正在执行同步...")
result = run_once(test_mode=False)
# 验证结果结构
assert 'success' in result
assert 'sheets_processed' in result
assert 'records_with_link' in result
assert 'records_synced' in result
print("✓ 结果结构验证通过")
# 打印结果摘要
print(f"\n同步结果:")
print(f" 成功: {result['success']}")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
return result['success']
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_scheduler_init():
"""测试3: 调度器初始化"""
print("\n" + "=" * 60)
print("测试3: 调度器初始化")
print("=" * 60)
try:
from src2.scheduler import Task2Scheduler
# 初始化调度器(不启动)
scheduler = Task2Scheduler(verbose=False)
print("✓ 调度器初始化成功")
# 验证属性
assert scheduler.config is not None
assert scheduler.sync_interval > 0
print(f"✓ 同步间隔: {scheduler.sync_interval} 分钟")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("\n" + "=" * 60)
print("任务二第四阶段验证")
print("=" * 60)
results = {}
# 测试1: 同步服务初始化
results['sync_service_init'] = test_sync_service_init()
# 测试2: 单次同步流程
results['sync_once'] = test_sync_once()
# 测试3: 调度器初始化
results['scheduler_init'] = test_scheduler_init()
# 打印总结
print("\n" + "=" * 60)
print("验证结果总结")
print("=" * 60)
all_passed = True
for test_name, passed in results.items():
status = "✓ 通过" if passed else "✗ 失败"
print(f" {test_name}: {status}")
if not passed:
all_passed = False
print("=" * 60)
if all_passed:
print("所有测试通过!第四阶段验收完成。")
else:
print("部分测试失败,请检查错误信息。")
print("=" * 60)
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())

130
src2/test_update_records.py Normal file
View File

@ -0,0 +1,130 @@
"""
测试 update_records 功能的独立脚本
用于调试智能表格的记录更新功能
"""
import sys
import json
import requests
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
def test_update_records():
"""测试更新记录功能"""
print("=" * 60)
print("测试 update_records 功能")
print("=" * 60)
# 1. 获取 access_token
print("\n[1/3] 获取 access_token...")
try:
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" ✓ access_token: {access_token[:20]}...(已隐藏)")
except Exception as e:
print(f" ✗ 获取失败: {e}")
return False
# 2. 准备测试数据
print("\n[2/3] 准备测试数据...")
BASE_URL = "https://qyapi.weixin.qq.com/cgi-bin/wedoc"
# 测试数据
data = {
"docid": "dcOsT3czWy0YEDg38vlDqwVCTjv0kzwC_GU2XmT9wSZctQ0ZJQUAV7vMQ3ljZx-n_NqxzEEYG2DiLAvNdNsHJwgQ",
"sheet_id": "56YEeR",
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": [
{
"record_id": "rNrk6o",
"values": {
"TAPD状态": [
{
"type": "text",
"text": "已完成"
}
],
"处理人": [
{
"type": "text",
"text": ""
}
],
"TAPD预计完成日期": [
{
"type": "text",
"text": "2025-11-28"
}
]
}
}
]
}
print(" ✓ 测试数据已准备")
print(f" - docid: {data['docid'][:20]}...")
print(f" - sheet_id: {data['sheet_id']}")
print(f" - 记录数: {len(data['records'])}")
print(f" - record_id: {data['records'][0]['record_id']}")
# 3. 发送请求
print("\n[3/3] 发送 update_records 请求...")
# 构造完整URL带debug=1
url = f"{BASE_URL}/smartsheet/update_records?access_token={access_token}&debug=1"
print(f"\n请求信息:")
print(f" URL: {BASE_URL}/smartsheet/update_records?access_token=***&debug=1")
print(f" Method: POST")
print(f"\n请求数据:")
print(json.dumps(data, ensure_ascii=False, indent=2))
try:
# 发送POST请求
print("\n正在发送请求...")
response = requests.post(url, json=data, timeout=30)
# 打印响应信息
print(f"\n响应信息:")
print(f" 状态码: {response.status_code}")
print(f"\n响应数据:")
result = response.json()
print(json.dumps(result, ensure_ascii=False, indent=2))
# 检查结果
print("\n" + "=" * 60)
if result.get('errcode', 0) == 0:
print("✓ 测试成功!")
updated_records = result.get('records', [])
print(f" 更新记录数: {len(updated_records)}")
return True
else:
print("✗ 测试失败!")
print(f" 错误码: {result.get('errcode')}")
print(f" 错误信息: {result.get('errmsg')}")
return False
except Exception as e:
print(f"\n✗ 请求异常: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
success = test_update_records()
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())