task2-phase3: 表格记录获取

This commit is contained in:
zelong 2026-01-08 16:26:57 +08:00
parent 397c14faee
commit d984828777
4 changed files with 685 additions and 3 deletions

View File

@ -6,7 +6,7 @@ workspace_id = 58335167
[SmartSheet]
# 智能表格文档ID任务二专用
docid = your_task2_doc_id
docid = dcOsT3czWy0YEDg38vlDqwVCTjv0kzwC_GU2XmT9wSZctQ0ZJQUAV7vMQ3ljZx-n_NqxzEEYG2DiLAvNdNsHJwgQ
[Schedule]
# 同步频率(分钟)

View File

@ -1,4 +1,4 @@
{
"access_token": "GKRSU-6KIm-q1lKGtlImu8cSa1HpyEJLFlYq9FS-Kqfc-T9cNd25A07qTG7BhaCbZS7eii7nselgxqWoczbj3zg8f-t_jCAILpJ2uCwZSXwQBLasB0c5SedemVyA59n4rK7fwqHMOd9j1LFVtxgjfnAcQeIv6xacYE9ZLVppImHBL-ZOl2yb8NCv43j51CzEOGEryTjLH4AhNAREXKYm1mQ64cIYD3vXuLpcAG9f-cU",
"fetch_time": 1767852793.0838747
"access_token": "OadV3tQWg6mWhvIU4fO5d56EkPyHxcygjX31jTd222norNXqKA9WRp7IzCiAMKmYfAp00N3esAoSx6SR3V0ywXU95xr2yy984mYxxYA1t5_GRQwnYZtMXz9h5U6W6Sk9vDz_aHVLEk69e8W4BZ0i_8XRrOWKci4Kox1IDWDVWcEEicyYu_v5-FmNBWIPaj0_ZdQD7B0wn_PcopGHEvsaJ9r5Y3gb9Ix4wQuAISzHMNM",
"fetch_time": 1767859873.4756753
}

352
src2/smartsheet_sync.py Normal file
View File

@ -0,0 +1,352 @@
"""
任务二智能表格同步模块
负责智能表格的数据读取和回写
功能
1. 检测必要字段是否存在
2. 读取所有记录
3. 提取TAPD链接
4. 构造更新记录
5. 批量回写状态信息
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.smartsheet import SmartSheetAPI
from src2.link_parser import parse_tapd_link, extract_story_id
from src2.logger import get_task2_logger
# ============================================================
# 字段名称常量(与智能表格列名完全一致)
# ============================================================
FIELD_TAPD_LINK = "TAPD链接" # 用户填写,解析单号
FIELD_TAPD_STATUS = "TAPD状态" # 工具回写
FIELD_OWNER = "处理人" # 工具回写
FIELD_BEGIN_DATE = "TAPD预计开始日期" # 工具回写
FIELD_DUE_DATE = "TAPD预计完成日期" # 工具回写
# 必要字段列表
REQUIRED_FIELDS = [
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
]
class SmartSheetSync:
"""智能表格同步类"""
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格同步模块
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式
"""
self.api = SmartSheetAPI(access_token, docid, test_mode)
self.logger = get_task2_logger()
self.test_mode = test_mode
def check_required_fields(self, fields: List[Dict]) -> Tuple[bool, List[str], Dict[str, str]]:
"""
检测必要字段是否存在
Args:
fields: 字段列表从get_fields获取
Returns:
Tuple[bool, List[str], Dict[str, str]]:
- 是否所有必要字段都存在
- 缺失的字段列表
- 字段名称到字段ID的映射
"""
# 构建字段映射
field_mapping = {}
for field in fields:
field_title = field.get('field_title', '')
field_id = field.get('field_id', '')
if field_title and field_id:
field_mapping[field_title] = field_id
# 检查必要字段
missing_fields = []
for required_field in REQUIRED_FIELDS:
if required_field not in field_mapping:
missing_fields.append(required_field)
all_present = len(missing_fields) == 0
if all_present:
print(f" ✓ 所有必要字段都存在")
else:
print(f" ⚠ 缺少必要字段: {', '.join(missing_fields)}")
return (all_present, missing_fields, field_mapping)
def get_all_records(self, sheet_id: str) -> List[Dict]:
"""
获取子表的所有记录支持分页
Args:
sheet_id: 子表ID
Returns:
List[Dict]: 所有记录列表
"""
print(f"正在获取所有记录...")
all_records = []
offset = 0
limit = 100
while True:
result = self.api.get_records(sheet_id, limit=limit, offset=offset)
records = result['records']
total = result['total']
all_records.extend(records)
print(f" - 已获取 {len(all_records)}/{total} 条记录")
if len(all_records) >= total:
break
offset += limit
print(f" ✓ 共获取 {len(all_records)} 条记录")
return all_records
def extract_tapd_link(self, record: Dict) -> Optional[str]:
"""
从记录中提取TAPD链接
Args:
record: 记录对象
Returns:
Optional[str]: TAPD链接字符串如果不存在则返回None
"""
link_value = self.api.get_field_value_by_title(record, FIELD_TAPD_LINK)
if not link_value:
return None
# 链接字段可能是字符串或包含url的对象
if isinstance(link_value, str):
return link_value
elif isinstance(link_value, dict):
# 可能是 {url: "...", text: "..."} 格式
return link_value.get('url') or link_value.get('text')
elif isinstance(link_value, list):
# 可能是列表格式
if len(link_value) > 0:
first_item = link_value[0]
if isinstance(first_item, dict):
return first_item.get('url') or first_item.get('text')
elif isinstance(first_item, str):
return first_item
return None
def build_update_record(self, record_id: str, status: str = None,
owner: str = None, begin_date: str = None,
due_date: str = None) -> Dict:
"""
构造更新记录的数据结构
Args:
record_id: 记录ID
status: TAPD状态中文
owner: 处理人
begin_date: 预计开始日期
due_date: 预计完成日期
Returns:
Dict: 更新记录的数据结构
"""
values = {}
# 只添加非空的字段
if status is not None:
values[FIELD_TAPD_STATUS] = [{"text": status}]
if owner is not None:
values[FIELD_OWNER] = [{"text": owner}]
if begin_date is not None:
values[FIELD_BEGIN_DATE] = [{"text": begin_date}]
if due_date is not None:
values[FIELD_DUE_DATE] = [{"text": due_date}]
return {
"record_id": record_id,
"values": values
}
def batch_update_records(self, sheet_id: str, update_records: List[Dict]) -> Dict:
"""
批量回写状态信息
Args:
sheet_id: 子表ID
update_records: 需要更新的记录列表
Returns:
Dict: 更新结果
"""
if not update_records:
print(" ⚠ 没有需要更新的记录")
return {"records": []}
return self.api.update_records(sheet_id, update_records)
def get_records_with_tapd_link(self, sheet_id: str) -> List[Dict]:
"""
获取所有包含TAPD链接的记录
Args:
sheet_id: 子表ID
Returns:
List[Dict]: 包含TAPD链接的记录列表每条记录包含
- record: 原始记录对象
- record_id: 记录ID
- tapd_link: TAPD链接
- story_id: 解析出的需求单号如果解析成功
- parse_success: 链接解析是否成功
- parse_error: 解析失败的错误信息
"""
print(f"正在获取包含TAPD链接的记录...")
all_records = self.get_all_records(sheet_id)
records_with_link = []
for record in all_records:
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
record_id = record.get('record_id', '')
# 解析链接
success, result, link_type = parse_tapd_link(tapd_link)
record_info = {
"record": record,
"record_id": record_id,
"tapd_link": tapd_link,
"parse_success": success,
}
if success:
record_info["story_id"] = result
record_info["link_type"] = link_type
else:
record_info["story_id"] = None
record_info["parse_error"] = result
records_with_link.append(record_info)
# 统计
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" ✓ 找到 {len(records_with_link)} 条包含TAPD链接的记录")
print(f" - 链接解析成功: {success_count}")
if fail_count > 0:
print(f" - 链接解析失败: {fail_count}")
return records_with_link
def get_current_field_values(self, record: Dict) -> Dict[str, Any]:
"""
获取记录当前的字段值
Args:
record: 记录对象
Returns:
Dict: 当前字段值
"""
return {
FIELD_TAPD_STATUS: self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS),
FIELD_OWNER: self.api.get_field_value_by_title(record, FIELD_OWNER),
FIELD_BEGIN_DATE: self.api.get_field_value_by_title(record, FIELD_BEGIN_DATE),
FIELD_DUE_DATE: self.api.get_field_value_by_title(record, FIELD_DUE_DATE),
}
def process_sheet(api: SmartSheetSync, sheet_id: str, sheet_title: str) -> Dict:
"""
处理单个子表的同步流程
Args:
api: SmartSheetSync实例
sheet_id: 子表ID
sheet_title: 子表标题
Returns:
Dict: 处理结果统计
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title}")
print(f"{'='*60}")
result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"success": False,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"parse_success": 0,
"parse_fail": 0,
}
# 1. 获取字段信息
fields = api.api.get_fields(sheet_id)
# 2. 检查必要字段
all_present, missing_fields, field_mapping = api.check_required_fields(fields)
if not all_present:
result["skipped"] = True
result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {result['skip_reason']}")
return result
# 3. 获取包含TAPD链接的记录
records_with_link = api.get_records_with_tapd_link(sheet_id)
result["records_with_link"] = len(records_with_link)
result["parse_success"] = sum(1 for r in records_with_link if r["parse_success"])
result["parse_fail"] = result["records_with_link"] - result["parse_success"]
result["success"] = True
return result
if __name__ == "__main__":
print("=== 智能表格同步模块测试 ===\n")
print("此模块提供以下功能:")
print("1. check_required_fields() - 检测必要字段")
print("2. get_all_records() - 获取所有记录")
print("3. extract_tapd_link() - 提取TAPD链接")
print("4. build_update_record() - 构造更新记录")
print("5. batch_update_records() - 批量回写")
print("6. get_records_with_tapd_link() - 获取包含链接的记录")
print("\n请运行 test_phase3.py 进行完整测试")

330
src2/test_phase3.py Normal file
View File

@ -0,0 +1,330 @@
"""
第三阶段验证脚本智能表格读写功能测试
验证项
1. 字段检测 - 检查必要字段是否存在
2. 记录读取 - 获取所有记录
3. TAPD链接提取 - 从记录中提取链接并解析
4. 数据回写测试 - 构造更新记录可选执行
用法
python src2/test_phase3.py # 只读测试(不修改数据)
python src2/test_phase3.py --write # 包含回写测试(会修改一条记录)
"""
import sys
import argparse
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.smartsheet_sync import (
SmartSheetSync,
REQUIRED_FIELDS,
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
)
def print_separator(title: str = ""):
"""打印分隔线"""
print("\n" + "=" * 60)
if title:
print(f" {title}")
print("=" * 60)
def test_field_detection(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试1字段检测
Returns:
bool: 测试是否通过
"""
print_separator("测试1字段检测")
print(f"必要字段列表:")
for field in REQUIRED_FIELDS:
print(f" - {field}")
print()
# 获取字段信息
fields = sync.api.get_fields(sheet_id)
# 检查必要字段
all_present, missing_fields, field_mapping = sync.check_required_fields(fields)
print(f"\n字段映射结果:")
for field_name in REQUIRED_FIELDS:
field_id = field_mapping.get(field_name, "未找到")
status = "" if field_name in field_mapping else ""
print(f" {status} {field_name}: {field_id}")
if all_present:
print(f"\n✓ 测试通过:所有必要字段都存在")
return True
else:
print(f"\n✗ 测试失败:缺少字段 {missing_fields}")
return False
def test_record_reading(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试2记录读取
Returns:
bool: 测试是否通过
"""
print_separator("测试2记录读取")
try:
records = sync.get_all_records(sheet_id)
print(f"\n记录读取结果:")
print(f" - 总记录数: {len(records)}")
if len(records) > 0:
print(f"\n第一条记录示例:")
first_record = records[0]
record_id = first_record.get('record_id', 'N/A')
print(f" - record_id: {record_id}")
# 显示部分字段值
values = first_record.get('values', {})
print(f" - 字段数量: {len(values)}")
# 显示前5个字段
for i, (key, value) in enumerate(values.items()):
if i >= 5:
print(f" - ... 还有 {len(values) - 5} 个字段")
break
print(f" - {key}: {str(value)[:50]}...")
print(f"\n✓ 测试通过:成功读取 {len(records)} 条记录")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_tapd_link_extraction(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试3TAPD链接提取
Returns:
bool: 测试是否通过
"""
print_separator("测试3TAPD链接提取")
try:
records_with_link = sync.get_records_with_tapd_link(sheet_id)
print(f"\n链接提取结果:")
print(f" - 包含链接的记录数: {len(records_with_link)}")
# 统计解析结果
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" - 解析成功: {success_count}")
print(f" - 解析失败: {fail_count}")
# 显示前3条成功解析的记录
success_records = [r for r in records_with_link if r["parse_success"]]
if success_records:
print(f"\n成功解析的记录示例最多3条")
for i, record_info in enumerate(success_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 单号: {record_info['story_id']}")
print(f" 类型: {record_info.get('link_type', 'N/A')}")
# 显示解析失败的记录
fail_records = [r for r in records_with_link if not r["parse_success"]]
if fail_records:
print(f"\n解析失败的记录最多3条")
for i, record_info in enumerate(fail_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 错误: {record_info.get('parse_error', 'N/A')}")
print(f"\n✓ 测试通过成功提取并解析TAPD链接")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
import traceback
traceback.print_exc()
return False
def test_update_record_structure(sync: SmartSheetSync) -> bool:
"""
测试4更新记录结构构造不实际写入
Returns:
bool: 测试是否通过
"""
print_separator("测试4更新记录结构构造")
try:
# 构造一个测试更新记录
test_record = sync.build_update_record(
record_id="test_record_id_123",
status="进行中",
owner="张三",
begin_date="2025-01-01",
due_date="2025-01-15"
)
print(f"构造的更新记录结构:")
print(f" record_id: {test_record['record_id']}")
print(f" values:")
for key, value in test_record['values'].items():
print(f" {key}: {value}")
# 验证结构
assert 'record_id' in test_record
assert 'values' in test_record
assert FIELD_TAPD_STATUS in test_record['values']
assert FIELD_OWNER in test_record['values']
assert FIELD_BEGIN_DATE in test_record['values']
assert FIELD_DUE_DATE in test_record['values']
print(f"\n✓ 测试通过:更新记录结构正确")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_multi_sheet_support(sync: SmartSheetSync) -> bool:
"""
测试5多子表支持
Returns:
bool: 测试是否通过
"""
print_separator("测试5多子表支持")
try:
# 获取所有子表
sheet_list = sync.api.get_sheet_list()
print(f"子表列表:")
for i, sheet in enumerate(sheet_list):
sheet_id = sheet.get('sheet_id', 'N/A')
title = sheet.get('title', 'N/A')
print(f" [{i+1}] {title} (ID: {sheet_id})")
print(f"\n共找到 {len(sheet_list)} 个子表")
if len(sheet_list) == 0:
print(f"\n⚠ 警告:没有找到子表")
return False
print(f"\n✓ 测试通过:成功获取子表列表")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="第三阶段验证脚本")
parser.add_argument("--write", action="store_true",
help="执行回写测试(会修改数据)")
args = parser.parse_args()
print("=" * 60)
print(" 任务二 第三阶段验证:智能表格读写功能")
print("=" * 60)
# 1. 加载配置
print("\n[初始化] 加载配置...")
config = Task2ConfigManager()
smartsheet_config = config.get_smartsheet_config()
docid = smartsheet_config.get('docid')
print(f" docid: {docid[:20]}...")
# 2. 获取token
print("\n[初始化] 获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" token: {access_token[:20]}...")
# 3. 初始化同步模块
print("\n[初始化] 初始化SmartSheetSync...")
sync = SmartSheetSync(access_token, docid, test_mode=False)
print(" ✓ 初始化完成")
# 4. 获取子表列表
sheet_list = sync.api.get_sheet_list()
if not sheet_list:
print("\n✗ 错误:没有找到子表")
return 1
# 使用第一个子表进行测试
first_sheet = sheet_list[0]
sheet_id = first_sheet.get('sheet_id')
sheet_title = first_sheet.get('title')
print(f"\n使用子表进行测试: {sheet_title}")
# 5. 运行测试
results = []
# 测试5先执行多子表支持
results.append(("多子表支持", test_multi_sheet_support(sync)))
# 测试1字段检测
results.append(("字段检测", test_field_detection(sync, sheet_id)))
# 测试2记录读取
results.append(("记录读取", test_record_reading(sync, sheet_id)))
# 测试3TAPD链接提取
results.append(("TAPD链接提取", test_tapd_link_extraction(sync, sheet_id)))
# 测试4更新记录结构
results.append(("更新记录结构", test_update_record_structure(sync)))
# 6. 输出测试结果汇总
print_separator("测试结果汇总")
passed = 0
failed = 0
for name, result in results:
status = "✓ 通过" if result else "✗ 失败"
print(f" {status}: {name}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if failed == 0:
print("\n" + "=" * 60)
print(" ✓ 第三阶段验证全部通过!")
print("=" * 60)
return 0
else:
print("\n" + "=" * 60)
print(" ✗ 部分测试失败,请检查")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())