G41_TAPD_BUG_SYNC/src2/test_phase3.py

331 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
第三阶段验证脚本:智能表格读写功能测试
验证项:
1. 字段检测 - 检查必要字段是否存在
2. 记录读取 - 获取所有记录
3. TAPD链接提取 - 从记录中提取链接并解析
4. 数据回写测试 - 构造更新记录(可选执行)
用法:
python src2/test_phase3.py # 只读测试(不修改数据)
python src2/test_phase3.py --write # 包含回写测试(会修改一条记录)
"""
import sys
import argparse
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.smartsheet_sync import (
SmartSheetSync,
REQUIRED_FIELDS,
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
)
def print_separator(title: str = ""):
"""打印分隔线"""
print("\n" + "=" * 60)
if title:
print(f" {title}")
print("=" * 60)
def test_field_detection(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试1字段检测
Returns:
bool: 测试是否通过
"""
print_separator("测试1字段检测")
print(f"必要字段列表:")
for field in REQUIRED_FIELDS:
print(f" - {field}")
print()
# 获取字段信息
fields = sync.api.get_fields(sheet_id)
# 检查必要字段
all_present, missing_fields, field_mapping = sync.check_required_fields(fields)
print(f"\n字段映射结果:")
for field_name in REQUIRED_FIELDS:
field_id = field_mapping.get(field_name, "未找到")
status = "" if field_name in field_mapping else ""
print(f" {status} {field_name}: {field_id}")
if all_present:
print(f"\n✓ 测试通过:所有必要字段都存在")
return True
else:
print(f"\n✗ 测试失败:缺少字段 {missing_fields}")
return False
def test_record_reading(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试2记录读取
Returns:
bool: 测试是否通过
"""
print_separator("测试2记录读取")
try:
records = sync.get_all_records(sheet_id)
print(f"\n记录读取结果:")
print(f" - 总记录数: {len(records)}")
if len(records) > 0:
print(f"\n第一条记录示例:")
first_record = records[0]
record_id = first_record.get('record_id', 'N/A')
print(f" - record_id: {record_id}")
# 显示部分字段值
values = first_record.get('values', {})
print(f" - 字段数量: {len(values)}")
# 显示前5个字段
for i, (key, value) in enumerate(values.items()):
if i >= 5:
print(f" - ... 还有 {len(values) - 5} 个字段")
break
print(f" - {key}: {str(value)[:50]}...")
print(f"\n✓ 测试通过:成功读取 {len(records)} 条记录")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_tapd_link_extraction(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试3TAPD链接提取
Returns:
bool: 测试是否通过
"""
print_separator("测试3TAPD链接提取")
try:
records_with_link = sync.get_records_with_tapd_link(sheet_id)
print(f"\n链接提取结果:")
print(f" - 包含链接的记录数: {len(records_with_link)}")
# 统计解析结果
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" - 解析成功: {success_count}")
print(f" - 解析失败: {fail_count}")
# 显示前3条成功解析的记录
success_records = [r for r in records_with_link if r["parse_success"]]
if success_records:
print(f"\n成功解析的记录示例最多3条")
for i, record_info in enumerate(success_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 单号: {record_info['story_id']}")
print(f" 类型: {record_info.get('link_type', 'N/A')}")
# 显示解析失败的记录
fail_records = [r for r in records_with_link if not r["parse_success"]]
if fail_records:
print(f"\n解析失败的记录最多3条")
for i, record_info in enumerate(fail_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 错误: {record_info.get('parse_error', 'N/A')}")
print(f"\n✓ 测试通过成功提取并解析TAPD链接")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
import traceback
traceback.print_exc()
return False
def test_update_record_structure(sync: SmartSheetSync) -> bool:
"""
测试4更新记录结构构造不实际写入
Returns:
bool: 测试是否通过
"""
print_separator("测试4更新记录结构构造")
try:
# 构造一个测试更新记录
test_record = sync.build_update_record(
record_id="test_record_id_123",
status="进行中",
owner="张三",
begin_date="2025-01-01",
due_date="2025-01-15"
)
print(f"构造的更新记录结构:")
print(f" record_id: {test_record['record_id']}")
print(f" values:")
for key, value in test_record['values'].items():
print(f" {key}: {value}")
# 验证结构
assert 'record_id' in test_record
assert 'values' in test_record
assert FIELD_TAPD_STATUS in test_record['values']
assert FIELD_OWNER in test_record['values']
assert FIELD_BEGIN_DATE in test_record['values']
assert FIELD_DUE_DATE in test_record['values']
print(f"\n✓ 测试通过:更新记录结构正确")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_multi_sheet_support(sync: SmartSheetSync) -> bool:
"""
测试5多子表支持
Returns:
bool: 测试是否通过
"""
print_separator("测试5多子表支持")
try:
# 获取所有子表
sheet_list = sync.api.get_sheet_list()
print(f"子表列表:")
for i, sheet in enumerate(sheet_list):
sheet_id = sheet.get('sheet_id', 'N/A')
title = sheet.get('title', 'N/A')
print(f" [{i+1}] {title} (ID: {sheet_id})")
print(f"\n共找到 {len(sheet_list)} 个子表")
if len(sheet_list) == 0:
print(f"\n⚠ 警告:没有找到子表")
return False
print(f"\n✓ 测试通过:成功获取子表列表")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="第三阶段验证脚本")
parser.add_argument("--write", action="store_true",
help="执行回写测试(会修改数据)")
args = parser.parse_args()
print("=" * 60)
print(" 任务二 第三阶段验证:智能表格读写功能")
print("=" * 60)
# 1. 加载配置
print("\n[初始化] 加载配置...")
config = Task2ConfigManager()
smartsheet_config = config.get_smartsheet_config()
docid = smartsheet_config.get('docid')
print(f" docid: {docid[:20]}...")
# 2. 获取token
print("\n[初始化] 获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" token: {access_token[:20]}...")
# 3. 初始化同步模块
print("\n[初始化] 初始化SmartSheetSync...")
sync = SmartSheetSync(access_token, docid, test_mode=False)
print(" ✓ 初始化完成")
# 4. 获取子表列表
sheet_list = sync.api.get_sheet_list()
if not sheet_list:
print("\n✗ 错误:没有找到子表")
return 1
# 使用第一个子表进行测试
first_sheet = sheet_list[0]
sheet_id = first_sheet.get('sheet_id')
sheet_title = first_sheet.get('title')
print(f"\n使用子表进行测试: {sheet_title}")
# 5. 运行测试
results = []
# 测试5先执行多子表支持
results.append(("多子表支持", test_multi_sheet_support(sync)))
# 测试1字段检测
results.append(("字段检测", test_field_detection(sync, sheet_id)))
# 测试2记录读取
results.append(("记录读取", test_record_reading(sync, sheet_id)))
# 测试3TAPD链接提取
results.append(("TAPD链接提取", test_tapd_link_extraction(sync, sheet_id)))
# 测试4更新记录结构
results.append(("更新记录结构", test_update_record_structure(sync)))
# 6. 输出测试结果汇总
print_separator("测试结果汇总")
passed = 0
failed = 0
for name, result in results:
status = "✓ 通过" if result else "✗ 失败"
print(f" {status}: {name}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if failed == 0:
print("\n" + "=" * 60)
print(" ✓ 第三阶段验证全部通过!")
print("=" * 60)
return 0
else:
print("\n" + "=" * 60)
print(" ✗ 部分测试失败,请检查")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())