331 lines
9.9 KiB
Python
331 lines
9.9 KiB
Python
"""
|
||
第三阶段验证脚本:智能表格读写功能测试
|
||
|
||
验证项:
|
||
1. 字段检测 - 检查必要字段是否存在
|
||
2. 记录读取 - 获取所有记录
|
||
3. TAPD链接提取 - 从记录中提取链接并解析
|
||
4. 数据回写测试 - 构造更新记录(可选执行)
|
||
|
||
用法:
|
||
python src2/test_phase3.py # 只读测试(不修改数据)
|
||
python src2/test_phase3.py --write # 包含回写测试(会修改一条记录)
|
||
"""
|
||
|
||
import sys
|
||
import argparse
|
||
from pathlib import Path
|
||
|
||
# 将项目根目录添加到 Python 路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from src.token_manager import TokenManager
|
||
from src2.config import Task2ConfigManager
|
||
from src2.smartsheet_sync import (
|
||
SmartSheetSync,
|
||
REQUIRED_FIELDS,
|
||
FIELD_TAPD_LINK,
|
||
FIELD_TAPD_STATUS,
|
||
FIELD_OWNER,
|
||
FIELD_BEGIN_DATE,
|
||
FIELD_DUE_DATE,
|
||
)
|
||
|
||
|
||
def print_separator(title: str = ""):
|
||
"""打印分隔线"""
|
||
print("\n" + "=" * 60)
|
||
if title:
|
||
print(f" {title}")
|
||
print("=" * 60)
|
||
|
||
|
||
def test_field_detection(sync: SmartSheetSync, sheet_id: str) -> bool:
|
||
"""
|
||
测试1:字段检测
|
||
|
||
Returns:
|
||
bool: 测试是否通过
|
||
"""
|
||
print_separator("测试1:字段检测")
|
||
|
||
print(f"必要字段列表:")
|
||
for field in REQUIRED_FIELDS:
|
||
print(f" - {field}")
|
||
print()
|
||
|
||
# 获取字段信息
|
||
fields = sync.api.get_fields(sheet_id)
|
||
|
||
# 检查必要字段
|
||
all_present, missing_fields, field_mapping = sync.check_required_fields(fields)
|
||
|
||
print(f"\n字段映射结果:")
|
||
for field_name in REQUIRED_FIELDS:
|
||
field_id = field_mapping.get(field_name, "未找到")
|
||
status = "✓" if field_name in field_mapping else "✗"
|
||
print(f" {status} {field_name}: {field_id}")
|
||
|
||
if all_present:
|
||
print(f"\n✓ 测试通过:所有必要字段都存在")
|
||
return True
|
||
else:
|
||
print(f"\n✗ 测试失败:缺少字段 {missing_fields}")
|
||
return False
|
||
|
||
|
||
def test_record_reading(sync: SmartSheetSync, sheet_id: str) -> bool:
|
||
"""
|
||
测试2:记录读取
|
||
|
||
Returns:
|
||
bool: 测试是否通过
|
||
"""
|
||
print_separator("测试2:记录读取")
|
||
|
||
try:
|
||
records = sync.get_all_records(sheet_id)
|
||
|
||
print(f"\n记录读取结果:")
|
||
print(f" - 总记录数: {len(records)}")
|
||
|
||
if len(records) > 0:
|
||
print(f"\n第一条记录示例:")
|
||
first_record = records[0]
|
||
record_id = first_record.get('record_id', 'N/A')
|
||
print(f" - record_id: {record_id}")
|
||
|
||
# 显示部分字段值
|
||
values = first_record.get('values', {})
|
||
print(f" - 字段数量: {len(values)}")
|
||
|
||
# 显示前5个字段
|
||
for i, (key, value) in enumerate(values.items()):
|
||
if i >= 5:
|
||
print(f" - ... 还有 {len(values) - 5} 个字段")
|
||
break
|
||
print(f" - {key}: {str(value)[:50]}...")
|
||
|
||
print(f"\n✓ 测试通过:成功读取 {len(records)} 条记录")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 测试失败:{e}")
|
||
return False
|
||
|
||
|
||
def test_tapd_link_extraction(sync: SmartSheetSync, sheet_id: str) -> bool:
|
||
"""
|
||
测试3:TAPD链接提取
|
||
|
||
Returns:
|
||
bool: 测试是否通过
|
||
"""
|
||
print_separator("测试3:TAPD链接提取")
|
||
|
||
try:
|
||
records_with_link = sync.get_records_with_tapd_link(sheet_id)
|
||
|
||
print(f"\n链接提取结果:")
|
||
print(f" - 包含链接的记录数: {len(records_with_link)}")
|
||
|
||
# 统计解析结果
|
||
success_count = sum(1 for r in records_with_link if r["parse_success"])
|
||
fail_count = len(records_with_link) - success_count
|
||
|
||
print(f" - 解析成功: {success_count}")
|
||
print(f" - 解析失败: {fail_count}")
|
||
|
||
# 显示前3条成功解析的记录
|
||
success_records = [r for r in records_with_link if r["parse_success"]]
|
||
if success_records:
|
||
print(f"\n成功解析的记录示例(最多3条):")
|
||
for i, record_info in enumerate(success_records[:3]):
|
||
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
|
||
print(f" 链接: {record_info['tapd_link'][:60]}...")
|
||
print(f" 单号: {record_info['story_id']}")
|
||
print(f" 类型: {record_info.get('link_type', 'N/A')}")
|
||
|
||
# 显示解析失败的记录
|
||
fail_records = [r for r in records_with_link if not r["parse_success"]]
|
||
if fail_records:
|
||
print(f"\n解析失败的记录(最多3条):")
|
||
for i, record_info in enumerate(fail_records[:3]):
|
||
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
|
||
print(f" 链接: {record_info['tapd_link'][:60]}...")
|
||
print(f" 错误: {record_info.get('parse_error', 'N/A')}")
|
||
|
||
print(f"\n✓ 测试通过:成功提取并解析TAPD链接")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 测试失败:{e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
def test_update_record_structure(sync: SmartSheetSync) -> bool:
|
||
"""
|
||
测试4:更新记录结构构造(不实际写入)
|
||
|
||
Returns:
|
||
bool: 测试是否通过
|
||
"""
|
||
print_separator("测试4:更新记录结构构造")
|
||
|
||
try:
|
||
# 构造一个测试更新记录
|
||
test_record = sync.build_update_record(
|
||
record_id="test_record_id_123",
|
||
status="进行中",
|
||
owner="张三",
|
||
begin_date="2025-01-01",
|
||
due_date="2025-01-15"
|
||
)
|
||
|
||
print(f"构造的更新记录结构:")
|
||
print(f" record_id: {test_record['record_id']}")
|
||
print(f" values:")
|
||
for key, value in test_record['values'].items():
|
||
print(f" {key}: {value}")
|
||
|
||
# 验证结构
|
||
assert 'record_id' in test_record
|
||
assert 'values' in test_record
|
||
assert FIELD_TAPD_STATUS in test_record['values']
|
||
assert FIELD_OWNER in test_record['values']
|
||
assert FIELD_BEGIN_DATE in test_record['values']
|
||
assert FIELD_DUE_DATE in test_record['values']
|
||
|
||
print(f"\n✓ 测试通过:更新记录结构正确")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 测试失败:{e}")
|
||
return False
|
||
|
||
|
||
def test_multi_sheet_support(sync: SmartSheetSync) -> bool:
|
||
"""
|
||
测试5:多子表支持
|
||
|
||
Returns:
|
||
bool: 测试是否通过
|
||
"""
|
||
print_separator("测试5:多子表支持")
|
||
|
||
try:
|
||
# 获取所有子表
|
||
sheet_list = sync.api.get_sheet_list()
|
||
|
||
print(f"子表列表:")
|
||
for i, sheet in enumerate(sheet_list):
|
||
sheet_id = sheet.get('sheet_id', 'N/A')
|
||
title = sheet.get('title', 'N/A')
|
||
print(f" [{i+1}] {title} (ID: {sheet_id})")
|
||
|
||
print(f"\n共找到 {len(sheet_list)} 个子表")
|
||
|
||
if len(sheet_list) == 0:
|
||
print(f"\n⚠ 警告:没有找到子表")
|
||
return False
|
||
|
||
print(f"\n✓ 测试通过:成功获取子表列表")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 测试失败:{e}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(description="第三阶段验证脚本")
|
||
parser.add_argument("--write", action="store_true",
|
||
help="执行回写测试(会修改数据)")
|
||
args = parser.parse_args()
|
||
|
||
print("=" * 60)
|
||
print(" 任务二 第三阶段验证:智能表格读写功能")
|
||
print("=" * 60)
|
||
|
||
# 1. 加载配置
|
||
print("\n[初始化] 加载配置...")
|
||
config = Task2ConfigManager()
|
||
smartsheet_config = config.get_smartsheet_config()
|
||
docid = smartsheet_config.get('docid')
|
||
print(f" docid: {docid[:20]}...")
|
||
|
||
# 2. 获取token
|
||
print("\n[初始化] 获取access_token...")
|
||
token_manager = TokenManager()
|
||
access_token = token_manager.get_token()
|
||
print(f" token: {access_token[:20]}...")
|
||
|
||
# 3. 初始化同步模块
|
||
print("\n[初始化] 初始化SmartSheetSync...")
|
||
sync = SmartSheetSync(access_token, docid, test_mode=False)
|
||
print(" ✓ 初始化完成")
|
||
|
||
# 4. 获取子表列表
|
||
sheet_list = sync.api.get_sheet_list()
|
||
if not sheet_list:
|
||
print("\n✗ 错误:没有找到子表")
|
||
return 1
|
||
|
||
# 使用第一个子表进行测试
|
||
first_sheet = sheet_list[0]
|
||
sheet_id = first_sheet.get('sheet_id')
|
||
sheet_title = first_sheet.get('title')
|
||
print(f"\n使用子表进行测试: {sheet_title}")
|
||
|
||
# 5. 运行测试
|
||
results = []
|
||
|
||
# 测试5先执行(多子表支持)
|
||
results.append(("多子表支持", test_multi_sheet_support(sync)))
|
||
|
||
# 测试1:字段检测
|
||
results.append(("字段检测", test_field_detection(sync, sheet_id)))
|
||
|
||
# 测试2:记录读取
|
||
results.append(("记录读取", test_record_reading(sync, sheet_id)))
|
||
|
||
# 测试3:TAPD链接提取
|
||
results.append(("TAPD链接提取", test_tapd_link_extraction(sync, sheet_id)))
|
||
|
||
# 测试4:更新记录结构
|
||
results.append(("更新记录结构", test_update_record_structure(sync)))
|
||
|
||
# 6. 输出测试结果汇总
|
||
print_separator("测试结果汇总")
|
||
|
||
passed = 0
|
||
failed = 0
|
||
for name, result in results:
|
||
status = "✓ 通过" if result else "✗ 失败"
|
||
print(f" {status}: {name}")
|
||
if result:
|
||
passed += 1
|
||
else:
|
||
failed += 1
|
||
|
||
print(f"\n总计: {passed} 通过, {failed} 失败")
|
||
|
||
if failed == 0:
|
||
print("\n" + "=" * 60)
|
||
print(" ✓ 第三阶段验证全部通过!")
|
||
print("=" * 60)
|
||
return 0
|
||
else:
|
||
print("\n" + "=" * 60)
|
||
print(" ✗ 部分测试失败,请检查")
|
||
print("=" * 60)
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|