G41_TAPD_BUG_SYNC/src2/smartsheet_sync.py

438 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
任务二智能表格同步模块
负责智能表格的数据读取和回写
功能:
1. 检测必要字段是否存在
2. 读取所有记录
3. 提取TAPD链接
4. 构造更新记录
5. 批量回写状态信息
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.smartsheet import SmartSheetAPI
from src2.link_parser import parse_tapd_link, extract_story_id
from src2.logger import get_task2_logger
# ============================================================
# 字段名称常量(与智能表格列名完全一致)
# ============================================================
FIELD_TAPD_LINK = "TAPD链接" # 用户填写,解析单号
FIELD_TAPD_STATUS = "TAPD状态" # 工具回写
FIELD_OWNER = "处理人" # 工具回写
FIELD_BEGIN_DATE = "TAPD预计开始日期" # 工具回写
FIELD_DUE_DATE = "TAPD预计完成日期" # 工具回写
FIELD_PLAN = "计划" # 工具回写TAPD计划字段
FIELD_SYNC_STATUS = "同步状态" # 工具回写,标记同步结果
# 必要字段列表
REQUIRED_FIELDS = [
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
FIELD_PLAN,
FIELD_SYNC_STATUS,
]
class SmartSheetSync:
"""智能表格同步类"""
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格同步模块
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式
"""
self.api = SmartSheetAPI(access_token, docid, test_mode)
self.logger = get_task2_logger()
self.test_mode = test_mode
def check_required_fields(self, fields: List[Dict]) -> Tuple[bool, List[str], Dict[str, str]]:
"""
检测必要字段是否存在
Args:
fields: 字段列表从get_fields获取
Returns:
Tuple[bool, List[str], Dict[str, str]]:
- 是否所有必要字段都存在
- 缺失的字段列表
- 字段名称到字段ID的映射
"""
# 构建字段映射
field_mapping = {}
for field in fields:
field_title = field.get('field_title', '')
field_id = field.get('field_id', '')
if field_title and field_id:
field_mapping[field_title] = field_id
# 检查必要字段
missing_fields = []
for required_field in REQUIRED_FIELDS:
if required_field not in field_mapping:
missing_fields.append(required_field)
all_present = len(missing_fields) == 0
if all_present:
print(f" ✓ 所有必要字段都存在")
else:
print(f" ⚠ 缺少必要字段: {', '.join(missing_fields)}")
return (all_present, missing_fields, field_mapping)
def get_all_records(self, sheet_id: str) -> List[Dict]:
"""
获取子表的所有记录(支持分页)
Args:
sheet_id: 子表ID
Returns:
List[Dict]: 所有记录列表
"""
print(f"正在获取所有记录...")
all_records = []
offset = 0
limit = 100
while True:
result = self.api.get_records(sheet_id, limit=limit, offset=offset)
records = result['records']
total = result['total']
all_records.extend(records)
print(f" - 已获取 {len(all_records)}/{total} 条记录")
if len(all_records) >= total:
break
offset += limit
print(f" ✓ 共获取 {len(all_records)} 条记录")
return all_records
def extract_tapd_link(self, record: Dict) -> Optional[str]:
"""
从记录中提取TAPD链接
Args:
record: 记录对象
Returns:
Optional[str]: TAPD链接字符串如果不存在则返回None
"""
link_value = self.api.get_field_value_by_title(record, FIELD_TAPD_LINK)
if not link_value:
return None
# 链接字段可能是字符串或包含url的对象
if isinstance(link_value, str):
return link_value
elif isinstance(link_value, dict):
# 可能是 {url: "...", text: "..."} 格式
return link_value.get('url') or link_value.get('text')
elif isinstance(link_value, list):
# 可能是列表格式
if len(link_value) > 0:
first_item = link_value[0]
if isinstance(first_item, dict):
return first_item.get('url') or first_item.get('text')
elif isinstance(first_item, str):
return first_item
return None
def build_update_record(self, record_id: str, status: str = None,
owner: str = None, begin_date: str = None,
due_date: str = None, plan: str = None,
sync_status: str = None) -> Dict:
"""
构造更新记录的数据结构
Args:
record_id: 记录ID
status: TAPD状态中文
owner: 处理人
begin_date: 预计开始日期
due_date: 预计完成日期
plan: 计划(中文名称)
sync_status: 同步状态("成功""失败"
Returns:
Dict: 更新记录的数据结构
"""
values = {}
# 只添加非空的字段,每个字段值需要包含 type 和 text
# 跳过 None 和空字符串
if status is not None and status != "":
values[FIELD_TAPD_STATUS] = [{"type": "text", "text": status}]
if owner is not None and owner != "":
values[FIELD_OWNER] = [{"type": "text", "text": owner}]
if begin_date is not None and begin_date != "":
values[FIELD_BEGIN_DATE] = [{"type": "text", "text": begin_date}]
if due_date is not None and due_date != "":
values[FIELD_DUE_DATE] = [{"type": "text", "text": due_date}]
if plan is not None and plan != "":
values[FIELD_PLAN] = [{"type": "text", "text": plan}]
if sync_status is not None and sync_status != "":
values[FIELD_SYNC_STATUS] = [{"type": "text", "text": sync_status}]
return {
"record_id": record_id,
"values": values
}
def batch_update_records(self, sheet_id: str, update_records: List[Dict]) -> Dict:
"""
批量回写状态信息使用任务一的API带debug参数
Args:
sheet_id: 子表ID
update_records: 需要更新的记录列表
Returns:
Dict: 更新结果
"""
if not update_records:
print(" ⚠ 没有需要更新的记录")
return {"records": []}
# 直接使用任务一的 update_records 方法已添加debug=1
return self.api.update_records(sheet_id, update_records)
def get_records_with_tapd_link(self, sheet_id: str,
all_records: List[Dict] = None) -> List[Dict]:
"""
获取所有包含TAPD链接的新记录同步状态为空
Args:
sheet_id: 子表ID
all_records: 可选,已获取的所有记录列表,避免重复获取
Returns:
List[Dict]: 包含TAPD链接的记录列表
"""
print(f"正在获取包含TAPD链接的新记录...")
if all_records is None:
all_records = self.get_all_records(sheet_id)
records_with_link = []
skipped_synced_count = 0
for record in all_records:
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
# 检查同步状态字段,如果不为空则跳过
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
if sync_status is not None and sync_status != "":
skipped_synced_count += 1
continue
record_id = record.get('record_id', '')
# 解析链接
success, result, link_type = parse_tapd_link(tapd_link)
record_info = {
"record": record,
"record_id": record_id,
"tapd_link": tapd_link,
"parse_success": success,
}
if success:
record_info["story_id"] = result
record_info["link_type"] = link_type
else:
record_info["story_id"] = None
record_info["parse_error"] = result
records_with_link.append(record_info)
# 统计
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" ✓ 找到 {len(records_with_link)} 条包含TAPD链接的记录")
if skipped_synced_count > 0:
print(f" - 跳过已同步记录: {skipped_synced_count}")
print(f" - 链接解析成功: {success_count}")
if fail_count > 0:
print(f" - 链接解析失败: {fail_count}")
return records_with_link
def get_current_field_values(self, record: Dict) -> Dict[str, Any]:
"""
获取记录当前的字段值
Args:
record: 记录对象
Returns:
Dict: 当前字段值
"""
return {
FIELD_TAPD_STATUS: self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS),
FIELD_OWNER: self.api.get_field_value_by_title(record, FIELD_OWNER),
FIELD_BEGIN_DATE: self.api.get_field_value_by_title(record, FIELD_BEGIN_DATE),
FIELD_DUE_DATE: self.api.get_field_value_by_title(record, FIELD_DUE_DATE),
FIELD_PLAN: self.api.get_field_value_by_title(record, FIELD_PLAN),
}
def get_synced_records_for_update(self, sheet_id: str,
terminal_statuses: List[str],
all_records: List[Dict] = None) -> List[Dict]:
"""
获取需要持续同步的已同步记录
筛选条件:
- 同步状态 = "成功"
- TAPD状态 不在终态列表中
Args:
sheet_id: 子表ID
terminal_statuses: 终态列表(如 ['已完成', '取消']
all_records: 可选,已获取的所有记录列表,避免重复获取
Returns:
List[Dict]: 需要持续同步的记录列表
"""
print(f"正在获取需要持续同步的记录...")
if all_records is None:
all_records = self.get_all_records(sheet_id)
records_for_update = []
skipped_terminal_count = 0
for record in all_records:
# 检查同步状态是否为"成功"
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
if sync_status != "成功":
continue
# 检查TAPD链接是否存在
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
# 检查TAPD状态是否为终态
tapd_status = self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS)
if tapd_status in terminal_statuses:
skipped_terminal_count += 1
continue
# 解析链接获取story_id
success, result, link_type = parse_tapd_link(tapd_link)
if not success:
continue
record_info = {
"record": record,
"record_id": record.get('record_id', ''),
"tapd_link": tapd_link,
"story_id": result,
"current_status": tapd_status,
}
records_for_update.append(record_info)
print(f" ✓ 找到 {len(records_for_update)} 条需要持续同步的记录")
if skipped_terminal_count > 0:
print(f" - 跳过终态记录: {skipped_terminal_count}")
return records_for_update
def process_sheet(api: SmartSheetSync, sheet_id: str, sheet_title: str) -> Dict:
"""
处理单个子表的同步流程
Args:
api: SmartSheetSync实例
sheet_id: 子表ID
sheet_title: 子表标题
Returns:
Dict: 处理结果统计
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title}")
print(f"{'='*60}")
result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"success": False,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"parse_success": 0,
"parse_fail": 0,
}
# 1. 获取字段信息
fields = api.api.get_fields(sheet_id)
# 2. 检查必要字段
all_present, missing_fields, field_mapping = api.check_required_fields(fields)
if not all_present:
result["skipped"] = True
result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {result['skip_reason']}")
return result
# 3. 获取包含TAPD链接的记录
records_with_link = api.get_records_with_tapd_link(sheet_id)
result["records_with_link"] = len(records_with_link)
result["parse_success"] = sum(1 for r in records_with_link if r["parse_success"])
result["parse_fail"] = result["records_with_link"] - result["parse_success"]
result["success"] = True
return result
if __name__ == "__main__":
print("=== 智能表格同步模块测试 ===\n")
print("此模块提供以下功能:")
print("1. check_required_fields() - 检测必要字段")
print("2. get_all_records() - 获取所有记录")
print("3. extract_tapd_link() - 提取TAPD链接")
print("4. build_update_record() - 构造更新记录")
print("5. batch_update_records() - 批量回写")
print("6. get_records_with_tapd_link() - 获取包含链接的记录")
print("\n请运行 test_phase3.py 进行完整测试")