Compare commits

...

15 Commits

28 changed files with 7199 additions and 10 deletions

1
.gitignore vendored
View File

@ -132,6 +132,7 @@ cython_debug/
# Custom # Custom
logs/ logs/
logs2/
.claude/ .claude/
# Project documentation # Project documentation

24
config/config_task2.ini Normal file
View File

@ -0,0 +1,24 @@
# 任务二配置文件TAPD状态实时同步至腾讯智能表格
[TAPD]
# TAPD项目ID
workspace_id = 58335167
[SmartSheet]
# 智能表格文档ID任务二专用
# 支持配置多个docid用逗号分隔例如
# docid = doc1,doc2,doc3
# 同步时会依次对所有表格进行同步
docid = dcOsT3czWy0YEDg38vlDqwVCTjv0kzwC_GU2XmT9wSZctQ0ZJQUAV7vMQ3ljZx-n_NqxzEEYG2DiLAvNdNsHJwgQ,dcHWzWyaHpZNQwUkZzgH5Kfyx9cMvQzVjZIapajGDuXqjS4nEe0LQqOojBL8s3rlwghw4deOgVnbOqHLoxcKzaHg
#docid =dc2Q5Kb0T4zerbo4_ag0MMcXHCusIaFJX5fO6_8n-l_yV-bn5brZSi1kNw3kjme-qIs0LvPKbC5GDEEPaZ1BGlvA
[Schedule]
# 同步频率(分钟)
sync_interval = 30
[wework]
# 企业微信应用ID
agentid = 1000615
# 接收人列表用户ID多个用|分隔,@all表示全部成员
# receivers = 046364
receivers = 040005

View File

@ -1,4 +1,4 @@
{ {
"access_token": "SPFfB9jMxleGiJn76FQH6v5pploseAJXTCVkTVVxl1_PEmHZJiqXsNGpEyGAK4qmYe3WRxYJ57xgCQLRCHopVfeoDfP87IgxVCytCqQABGES5ndG05SVkrI-9evg8Z4kbstlsiRMmfPGGGoNUgL1kUoZc2No0FYytm8FTfulnAXfiTExzoF8OCTdEPc9mA0g8JKFhlkiS2F0agBESS_2_ewbcZvA0i44-ChTKRBdRa0", "access_token": "GQKX61Nh9c5A4Mp0FDOGy4nZgnFok2gefTA0_X4Y5PEL7NkpD9UHWwji3lWkZLsHMJf3dpbJ_l-NdichZ5qSZuPhF7kJNU47Blf2yLQRqFctmXMU6m1cWU80iLiY0vrX2EPzvldaHMR-al3HgKK6PUSU9T2a5Xp-lCjh9StPzEnQJUnwickV4PiPegLLGcH5F6jcM-9pHztkJ6pSV6bfP5QFFATAldmj-71Occib9V0",
"fetch_time": 1767599732.4166858 "fetch_time": 1772269143.6670134
} }

14
core/__init__.py Normal file
View File

@ -0,0 +1,14 @@
"""全局核心模块导出"""
from core.global_log_system import (
GlobalLogSystem,
create_task1_log_system,
create_task2_log_system,
)
__all__ = [
"GlobalLogSystem",
"create_task1_log_system",
"create_task2_log_system",
]

210
core/global_log_system.py Normal file
View File

@ -0,0 +1,210 @@
"""
全局日志系统内核
职责
1. 按天写入 jsonl 日志
2. 统一记录 API 调用事件
3. 记录同步开始/结束事件与统计
4. token 等敏感字段做脱敏
"""
from __future__ import annotations
import json
import re
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
class GlobalLogSystem:
"""全局日志记录器"""
ALLOWED_MODULES = {"smartsheet", "tapd", "wework"}
SENSITIVE_KEYS = {
"access_token",
"token",
"authorization",
"corpsecret",
"api_password",
"password",
"secret",
"auth",
}
TOKEN_PATTERNS = [
re.compile(r"(access_token=)([^&\s]+)", re.IGNORECASE),
re.compile(r"(corpsecret=)([^&\s]+)", re.IGNORECASE),
re.compile(r"(token=)([^&\s]+)", re.IGNORECASE),
]
def __init__(self, task_name: str, log_dir: str | Path):
self.task_name = task_name
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self._active_sync_id: Optional[str] = None
def start_sync(self,
trigger: str,
metadata: Optional[Dict[str, Any]] = None,
sync_id: Optional[str] = None) -> str:
"""记录一次同步开始事件并返回 sync_id"""
if sync_id is None:
sync_id = self._generate_sync_id()
event = {
"event_type": "start_sync",
"timestamp": self._now_string(),
"task": self.task_name,
"sync_id": sync_id,
"trigger": trigger,
"metadata": self._sanitize(metadata or {}),
}
self._active_sync_id = sync_id
self._write_event_safely(event)
return sync_id
def log_api(self,
module: str,
operation: str,
request_data: Optional[Dict[str, Any]],
response_data: Optional[Dict[str, Any]],
success: bool,
error_message: Optional[str] = None,
duration_ms: Optional[int] = None,
sync_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None) -> None:
"""记录单次 API 调用事件"""
if module not in self.ALLOWED_MODULES:
raise ValueError(
f"module 不合法: {module},允许值: {sorted(self.ALLOWED_MODULES)}"
)
resolved_sync_id = sync_id or self._active_sync_id
event = {
"event_type": "api_call",
"timestamp": self._now_string(),
"task": self.task_name,
"sync_id": resolved_sync_id,
"module": module,
"operation": operation,
"success": success,
"request": self._sanitize(request_data or {}),
"response": self._sanitize(response_data or {}),
"error_message": self._sanitize_text(error_message),
"duration_ms": duration_ms,
"extra": self._sanitize(extra or {}),
}
self._write_event_safely(event)
def end_sync_with_stats(self,
stats: Dict[str, Any],
success: bool,
error_message: Optional[str] = None,
sync_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None) -> None:
"""记录一次同步结束事件"""
resolved_sync_id = sync_id or self._active_sync_id
event = {
"event_type": "end_sync",
"timestamp": self._now_string(),
"task": self.task_name,
"sync_id": resolved_sync_id,
"success": success,
"stats": self._sanitize(stats),
"error_message": self._sanitize_text(error_message),
"extra": self._sanitize(extra or {}),
}
self._write_event_safely(event)
if resolved_sync_id == self._active_sync_id:
self._active_sync_id = None
def _write_event_safely(self, event: Dict[str, Any]) -> None:
"""安全写入:日志失败不影响主流程"""
try:
self._write_event(event)
except Exception as exc:
print(f"⚠️ 全局日志写入失败: {exc}")
def _write_event(self, event: Dict[str, Any]) -> None:
log_file = self._get_today_log_file()
with open(log_file, "a", encoding="utf-8") as handle:
handle.write(json.dumps(event, ensure_ascii=False) + "\n")
def _get_today_log_file(self) -> Path:
today = datetime.now().strftime("%Y-%m-%d")
return self.log_dir / f"api_log_{today}.jsonl"
def _generate_sync_id(self) -> str:
time_part = datetime.now().strftime("%Y%m%d_%H%M%S")
random_part = uuid.uuid4().hex[:8]
return f"{self.task_name}_{time_part}_{random_part}"
def _now_string(self) -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _sanitize(self, value: Any, parent_key: str = "") -> Any:
if isinstance(value, dict):
sanitized: Dict[str, Any] = {}
for key, item in value.items():
key_lower = str(key).lower()
if key_lower in self.SENSITIVE_KEYS:
sanitized[key] = self._mask_sensitive_value(item)
else:
sanitized[key] = self._sanitize(item, parent_key=key_lower)
return sanitized
if isinstance(value, list):
return [self._sanitize(item, parent_key=parent_key) for item in value]
if isinstance(value, tuple):
return tuple(self._sanitize(item, parent_key=parent_key) for item in value)
if isinstance(value, str):
if parent_key in self.SENSITIVE_KEYS:
return self._mask_sensitive_value(value)
return self._sanitize_text(value)
return value
def _sanitize_text(self, text: Optional[str]) -> Optional[str]:
if text is None:
return None
masked = text
for pattern in self.TOKEN_PATTERNS:
masked = pattern.sub(r"\1***", masked)
return masked
def _mask_sensitive_value(self, value: Any) -> str:
text = "" if value is None else str(value)
if not text:
return "***"
if text.lower().startswith("bearer "):
return "Bearer ***"
if len(text) <= 6:
return "***"
return f"{text[:3]}***{text[-2:]}"
def create_task1_log_system(project_root: Optional[Path] = None) -> GlobalLogSystem:
"""创建任务一日志系统(固定 logs/"""
root = project_root or Path(__file__).parent.parent
return GlobalLogSystem(task_name="task1", log_dir=root / "logs")
def create_task2_log_system(project_root: Optional[Path] = None) -> GlobalLogSystem:
"""创建任务二日志系统(固定 logs2/"""
root = project_root or Path(__file__).parent.parent
return GlobalLogSystem(task_name="task2", log_dir=root / "logs2")

111
docs/全局框架文档.md Normal file
View File

@ -0,0 +1,111 @@
# 全局框架文档
> 用途:统一维护项目模块、脚本职责与接口定义,避免调用不存在接口与跨任务耦合失控。
> 范围:`src/``src2/`,以及新增的全局模块(如后续 `core/`)。
---
## 1. 日志系统总览(重构目标态)
### 1.1 目标架构
- **全局日志内核**:位于任务目录外,提供统一记录能力。
- **任务一适配层**:写入 `logs/`
- **任务二适配层**:写入 `logs2/`
- **同步边界管理**:每次同步使用唯一 `sync_id` 分隔,结束时写统计。
### 1.2 统一日志字段(约定)
- `event_type``start_sync` / `api_call` / `end_sync`
- `timestamp`:事件时间
- `task``task1` / `task2`
- `sync_id`:单次同步唯一标识
- `module``smartsheet` / `tapd` / `wework`
- `operation`:接口操作名
- `request`:请求快照(含脱敏)
- `response`:响应快照(含脱敏)
- `success`:调用是否成功
- `error_message`:失败原因
- `duration_ms`:调用耗时(毫秒)
- `stats`:同步完成统计(仅 `end_sync`
---
## 2. 模块清单与职责(当前状态)
## 2.0 全局核心(`core/`
- `core/global_log_system.py`
- **职责**提供统一日志内核jsonl 写入、同步分隔、API 事件、统计事件、脱敏)。
- **接口(对内)**`start_sync``log_api``end_sync_with_stats`
- **创建器**`create_task1_log_system`(固定 `logs/`)、`create_task2_log_system`(固定 `logs2/`)。
- `core/__init__.py`
- **职责**:导出全局日志内核与创建器。
## 2.1 任务一(`src/`
- `src/scheduler.py`
- **职责**:任务一调度入口,定时触发单次同步。
- **关键依赖**`src/main.py`
- `src/main.py`
- **职责**:任务一主流程编排(扫描、校验、开单、回写、通知)。
- **接口(对内)**`run_once(...)`
- `src/smartsheet.py`
- **职责**:智能表格 API 封装。
- **接口(对内)**`get_sheet_list``get_fields``get_records``update_records` 等。
- `src/tapd_api.py`
- **职责**TAPD Bug API 封装(创建、查询、附件上传)。
- **接口(对内)**`create_bug``get_bug``upload_attachment` 等。
- `src/token_manager.py`
- **职责**:企业微信 `access_token` 获取与缓存。
- **接口(对内)**`get_token()`
- `src/wework_notifier.py`
- **职责**:企业微信消息通知。
- **接口(对内)**`send_validation_failure_notification(...)`
- `src/api_logger.py`
- **职责**:现有日志记录器(后续将作为兼容层)。
## 2.2 任务二(`src2/`
- `src2/scheduler.py`
- **职责**:任务二调度入口,定时触发同步。
- **关键依赖**`src2/sync_service.py`
- `src2/sync_service.py`
- **职责**:任务二同步编排(读取、解析链接、查询 TAPD、回写、通知
- **接口(对内)**`sync_once()`
- `src2/smartsheet.py`
- **职责**:任务二智能表格 API 适配层。
- **关键点**:应固定写入 `logs2/`
- `src2/smartsheet_sync.py`
- **职责**:任务二表格字段检查、记录提取与回写构造。
- `src2/tapd_api.py`
- **职责**:任务二 TAPD Story 查询与状态映射。
- `src2/notifier.py`
- **职责**:任务二失败通知封装(当前复用任务一通知器,存在串目录风险)。
- `src2/logger.py`
- **职责**:任务二日志实例入口(后续接入统一内核)。
---
## 3. 现存问题与待改造点
- **串目录问题**:任务二复用 `TokenManager``WeWorkNotifier` 可能写入 `logs/`
- **双记录矛盾**:同一次请求在部分分支可能出现先成功后失败两条记录。
- **写入稳定性问题**:现有按 JSON 数组拼接的策略会造成结构损坏风险。
- **同步边界缺失**:缺少标准化 `start_sync` / `end_sync` 分隔与统计记录。
---
## 4. 模块接口演进计划(摘要)
- 阶段1新增全局日志内核模块定义统一接口。已完成
- 阶段2`src/api_logger.py` 改造成兼容层,保证旧调用可用。
- 阶段3`src2/logger.py` 与任务二编排层切换到统一内核,修复串目录。
- 阶段4更新查看工具与文档支持 `jsonl + sync_id`
---
## 5. 变更记录
### 2026-02-28
- 新建本框架文档。
- 写入日志系统重构目标态、模块职责清单、问题清单与演进路线。
### 2026-02-28更新
- 新增 `core/global_log_system.py``core/__init__.py` 模块说明。
- 标记阶段1完成统一日志内核已落地待阶段2/3接线。

105
docs/全局迭代日志.md Normal file
View File

@ -0,0 +1,105 @@
# 全局迭代日志
> 用途:跨任务(`src` / `src2`)记录每个阶段的增改内容、验收结果与回滚点。
> 约束:每次阶段验收通过后,必须追加一条日志记录。
---
## 记录模板
### 阶段:
- **阶段名称**
- **日期**
- **目标**
### 变更清单
- **新增文件**
- **修改文件**
- **删除文件**
### 关键改动说明
- **日志结构变更**
- **接口/调用链变更**
- **兼容性说明**
### 验收结果
- **通过项**
- **未通过项**
- **遗留风险**
### 回滚与追踪
- **可回滚点**
- **关联文档**
- **备注**
---
## 阶段日志
## 阶段0文档与约定先行
- **阶段名称**:日志系统重构 - 阶段0
- **日期**2026-02-28
- **目标**:建立重构方案与全局文档骨架,为后续代码改造提供统一约束。
### 变更清单
- **新增文件**
- `docs/日志系统重构实施方案.md`
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- **修改文件**:无
- **删除文件**:无
### 关键改动说明
- **日志结构变更**:确定后续采用 `jsonl`,并以 `sync_id` 分隔每次同步。
- **接口/调用链变更**:明确生产链路以 `src/scheduler.py``src2/scheduler.py` 为准。
- **兼容性说明**阶段0仅文档不影响现网行为。
### 验收结果
- **通过项**
- 分阶段路线、边界条件、验收清单已落文档。
- 已建立全局迭代日志与框架文档容器。
- **未通过项**:无
- **遗留风险**
- 任务二存在复用模块导致串目录风险待阶段3修复
- 现有日志写入策略存在双记录与结构损坏风险待阶段1/2修复
### 回滚与追踪
- **可回滚点**:当前为纯文档提交,可直接整提交回滚。
- **关联文档**`docs/日志系统重构实施方案.md`
- **备注**阶段1开始前需再次确认日志字段最终版。
## 阶段1实现全局日志内核
- **阶段名称**:日志系统重构 - 阶段1
- **日期**2026-02-28
- **负责人**Codex
- **目标**:在任务目录外提供可复用的统一日志内核,支持 jsonl、sync_id、token 脱敏。
### 变更清单
- **新增文件**
- `core/global_log_system.py`
- `core/__init__.py`
- **修改文件**
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- **删除文件**:无
### 关键改动说明
- **日志结构变更**:新增 `start_sync` / `api_call` / `end_sync` 三类事件模型。
- **接口/调用链变更**:提供 `start_sync``log_api``end_sync_with_stats` 三个核心接口。
- **兼容性说明**阶段1仅新增内核未接入任务一/任务二业务调用,不影响现网逻辑。
### 验收结果
- **通过项**
- 已支持按天写入 `api_log_YYYY-MM-DD.jsonl`
- 已支持 `sync_id` 生命周期记录。
- 已支持 token/secret 脱敏。
- 已提供任务一、任务二创建器(固定目录)。
- **未通过项**
- 尚未接入 `src` / `src2`,串目录与双记录矛盾仍待后续阶段修复。
- **遗留风险**
- 阶段2/3接线时若沿用旧 logger 分支逻辑,可能再次引入双记录。
### 回滚与追踪
- **可回滚点**`core/` 新增为独立改动,可单独回滚。
- **关联文档**`docs/日志系统重构实施方案.md`
- **备注**:下一阶段优先完成任务一接入并验证“单次调用单条记录”。

View File

@ -0,0 +1,119 @@
# 日志系统重构实施方案(任务一 + 任务二)
## 1. 目标与边界
### 1.1 重构目标
- 在任务一和任务二之外,提供统一的全局日志记录系统。
- 覆盖生产链路:`src/scheduler.py``src2/scheduler.py` 触发的同步流程。
- 在每一个发起 API 调用的地方记录请求与结果(成功/失败都记录)。
- 任务一日志落在 `logs/`,任务二日志落在 `logs2/`
- 每条日志必须包含 `module` 字段,允许值:`smartsheet` / `tapd` / `wework`
- 通过 `sync_id` 分隔每次同步,并在同步完成后写入当次统计信息。
### 1.2 明确要解决的问题
- 解决“串目录”问题:任务二不得写入 `logs/`,任务一不得写入 `logs2/`
- 解决“双记录矛盾”问题:同一次 API 调用只能有一条最终记录(不能先 success 再 failure
- 日志格式改为 `jsonl`(每行一条完整 JSON 记录)。
- 对 token 做脱敏,不做响应内容截断。
### 1.3 非目标
- 本次不改业务流程语义(只改日志系统与接线方式)。
- 本次不对历史 JSON 文件做离线迁移,仅保证新写入生效。
---
## 2. 总体设计
### 2.1 新增全局日志内核(任务外)
- 新增独立模块(建议路径:`core/global_log_system.py`)。
- 提供统一接口:
- `start_sync(...)`:开始一次同步,生成 `sync_id`
- `log_api(...)`:记录单次 API 调用结果(成功/失败统一出口)。
- `end_sync_with_stats(...)`:写入同步统计并结束。
- 通过构造参数确定任务上下文:`task_name``log_dir`、默认 `module`
### 2.2 日志存储格式
- 文件命名:`api_log_YYYY-MM-DD.jsonl`
- 存储目录:任务一 `logs/`,任务二 `logs2/`
- 记录模型:
- 通用字段:`event_type``timestamp``task``sync_id``module`
- API 事件字段:`operation``request``response``success``error_message``duration_ms`
- 同步边界字段:`start_sync` / `end_sync` 事件及统计 `stats`
### 2.3 脱敏规则
- `request/response` 中出现 token 字段统一脱敏(如 `access_token``Authorization``corpsecret``api_password`)。
- 脱敏策略:保留前后少量字符,中间替换为 `***`
---
## 3. 分阶段实施路线(小步走)
## 阶段0文档与约定先行
### 工作内容
- 新建并维护两份全局文档:
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- 在文档中建立日志重构专属章节、字段定义、阶段验收标准。
### 验收标准
- 文档结构可用于后续持续更新。
- 明确字段与职责边界(尤其是 `sync_id``task``module`)。
## 阶段1实现全局日志内核
### 工作内容
- 新增 `core/global_log_system.py`,实现 `jsonl` 写入、脱敏、同步分隔和统计写入。
- 新增兼容层,避免一次性改动过大。
### 验收标准
- 能独立写入 `logs/` / `logs2/`
- `start_sync -> 多条 log_api -> end_sync_with_stats` 链路完整。
## 阶段2接入任务一src
### 工作内容
- 改造 `src/api_logger.py` 为新内核兼容封装。
- 在任务一同步主流程增加 `sync_id` 生命周期。
- 覆盖所有 API 调用记录点,统一单次调用单条记录。
### 验收标准
- 任务一生产链路日志全部位于 `logs/`
- 无“双记录矛盾”。
## 阶段3接入任务二src2
### 工作内容
- 改造 `src2/logger.py` 接入同一内核并固定 `logs2/`
- 在任务二同步主流程增加 `sync_id` 生命周期。
- 修复复用模块导致的串目录问题(含 token 获取、wework 通知链路)。
### 验收标准
- 任务二生产链路日志全部位于 `logs2/`
- 与任务一日志完全隔离。
## 阶段4查看工具与收尾
### 工作内容
- 更新 `src/log_viewer.py` 支持读取 `jsonl`
- 更新 `logs/README.md`(新增 jsonl 规范与排障说明)。
### 验收标准
- 可按日期与 `sync_id` 追踪一次完整同步。
- 文档、代码、日志格式三者一致。
---
## 4. 核心风险与规避
- 风险1兼容层改造影响原调用。
- 规避:先保留 `log_api_call(...)` 旧接口,再逐步替换调用。
- 风险2任务二通过复用模块写错目录。
- 规避:统一使用可注入 logger/context禁止隐式全局默认目录。
- 风险3单次请求出现多状态记录。
- 规避:采用“单出口记录”,业务错误也只写一次最终状态。
---
## 5. 验收清单(最终)
- [ ] 生产链路(两个 scheduler所有 API 调用均有日志。
- [ ] 每次同步都有 `start_sync``end_sync`,并可通过 `sync_id` 聚合。
- [ ] 任务一只写 `logs/`,任务二只写 `logs2/`
- [ ] 无同一次 API 调用 success/failure 双写冲突。
- [ ] token 已脱敏。
- [ ] `jsonl` 可被查看工具读取。

2462
install.cmd Normal file

File diff suppressed because one or more lines are too long

View File

@ -22,7 +22,13 @@ from src.config import ConfigManager
class WeWorkAPITester: class WeWorkAPITester:
"""企业微信API测试类""" """企业微信API测试类"""
def __init__(self): def __init__(self, auto_load_token=True):
"""
初始化企业微信API测试类
Args:
auto_load_token: 是否自动加载token默认为True
"""
self.access_token = None self.access_token = None
self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json') self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json')
self.base_url = "https://qyapi.weixin.qq.com/cgi-bin" self.base_url = "https://qyapi.weixin.qq.com/cgi-bin"
@ -30,12 +36,92 @@ class WeWorkAPITester:
# 确保日志文件存在 # 确保日志文件存在
self._init_log_file() self._init_log_file()
# 自动加载token
if auto_load_token:
self._auto_load_token()
def _init_log_file(self): def _init_log_file(self):
"""初始化日志文件""" """初始化日志文件"""
if not os.path.exists(self.log_file): if not os.path.exists(self.log_file):
with open(self.log_file, 'w', encoding='utf-8') as f: with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump({"records": []}, f, ensure_ascii=False, indent=2) json.dump({"records": []}, f, ensure_ascii=False, indent=2)
def _auto_load_token(self):
"""
自动加载access_token
优先从缓存读取如果缓存不存在或已过期则尝试从API获取新token
"""
print("\n=== 自动加载access_token ===")
# 先尝试从缓存读取
if self._load_token_from_cache_silent():
return
# 缓存无效尝试从API获取
print(" 尝试从API获取新token...")
try:
from src.token_manager import TokenManager
token_manager = TokenManager()
self.access_token = token_manager.get_token()
print(f" ✓ 成功获取access_token")
print(f" Token: {self.access_token[:20]}...")
except ValueError as e:
print(f" ⚠ 环境变量未配置: {e}")
print(" 请手动选择菜单选项1获取token")
except Exception as e:
print(f" ⚠ 自动获取token失败: {e}")
print(" 请手动选择菜单选项1获取token")
def _load_token_from_cache_silent(self):
"""
静默从缓存读取token不打印标题
Returns:
bool: 是否成功读取有效token
"""
cache_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'config',
'token_cache.json'
)
try:
if not os.path.exists(cache_file):
print(" 缓存文件不存在")
return False
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
access_token = cache_data.get('access_token')
fetch_time = cache_data.get('fetch_time')
if not access_token:
print(" 缓存文件中没有access_token")
return False
# 检查token是否过期7200秒 = 2小时提前5分钟刷新
import time
current_time = time.time()
elapsed_time = current_time - fetch_time
remaining_time = 7200 - elapsed_time
if remaining_time <= 300: # 剩余不足5分钟视为过期
print(f" 缓存的token已过期或即将过期")
return False
# token有效
self.access_token = access_token
print(f" ✓ 从缓存读取access_token成功")
print(f" Token: {self.access_token[:20]}...")
print(f" 剩余有效期: {int(remaining_time)}秒 ({int(remaining_time//60)}分钟)")
return True
except Exception as e:
print(f" 读取缓存失败: {e}")
return False
def _log_api_call(self, operation, request_data, response_data): def _log_api_call(self, operation, request_data, response_data):
"""记录API调用到JSON文件""" """记录API调用到JSON文件"""
try: try:
@ -481,6 +567,110 @@ class TAPDAPITester:
except Exception as e: except Exception as e:
print(f"✗ 记录日志失败: {str(e)}") print(f"✗ 记录日志失败: {str(e)}")
def get_story_fields_info(self):
"""
获取TAPD需求的所有字段配置及候选值
Returns:
dict: 字段配置信息失败返回None
"""
print("\n=== 获取TAPD需求字段配置 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
url = f"{self.base_url}/stories/get_fields_info"
params = {
"workspace_id": self.workspace_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_story_fields_info", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 成功获取需求字段配置")
# 显示字段统计
if isinstance(data, dict):
field_count = len(data)
print(f"{field_count} 个字段配置")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_story_fields.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 字段配置已保存到: {output_file}")
# 显示部分字段信息
print(f"\n需求字段列表预览:")
print("-" * 80)
for idx, (field_name, field_info) in enumerate(list(data.items())[:10], 1):
field_label = field_info.get('label', '(无标签)')
html_type = field_info.get('html_type', '(未知类型)')
print(f" {idx}. {field_name} ({field_label}) - 类型: {html_type}")
# 如果有候选值,显示
options = field_info.get('options', [])
if options:
if isinstance(options, dict):
option_list = list(options.values())[:5]
total_count = len(options)
elif isinstance(options, list):
option_list = options[:5]
total_count = len(options)
else:
option_list = []
total_count = 0
if option_list:
print(f" 候选值: {', '.join(str(o) for o in option_list)}" +
(f" ...等{total_count}" if total_count > 5 else ""))
if field_count > 10:
print(f" ... 还有 {field_count - 10} 个字段,详见输出文件")
print("-" * 80)
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def get_bug_custom_fields(self): def get_bug_custom_fields(self):
""" """
获取TAPD缺陷的所有字段配置及候选值 获取TAPD缺陷的所有字段配置及候选值
@ -586,6 +776,121 @@ class TAPDAPITester:
traceback.print_exc() traceback.print_exc()
return None return None
def get_story(self, story_id):
"""
根据需求ID获取需求详情
Args:
story_id: 需求ID
Returns:
dict: 需求信息失败返回None
"""
print("\n=== 获取TAPD需求 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 验证story_id
if not story_id:
print("✗ 需求ID不能为空")
return None
url = f"{self.base_url}/stories"
params = {
"workspace_id": self.workspace_id,
"id": story_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
print(f" story_id: {story_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_story", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", [])
if not data:
print(f"\n✗ 未找到需求ID为 {story_id} 的需求")
return None
# TAPD返回的是列表取第一个
story_data = data[0] if isinstance(data, list) else data
story_info = story_data.get("Story", {})
print(f"\n✓ 成功获取需求信息")
print(f"\n需求详情:")
print("=" * 80)
# 显示关键字段
key_fields = [
("id", "ID"),
("name", "标题"),
("status", "状态"),
("priority", "优先级"),
("owner", "处理人"),
("creator", "创建人"),
("created", "创建时间"),
("modified", "最后修改时间"),
("iteration_id", "迭代ID"),
("description", "详细描述")
]
for field_name, field_label in key_fields:
value = story_info.get(field_name, '')
if field_name == "description" and value:
# 描述字段可能很长只显示前100个字符
if len(str(value)) > 100:
value = str(value)[:100] + "..."
print(f" {field_label}: {value}")
print("=" * 80)
# 保存完整数据到文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
f'tapd_story_{story_id}.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(story_info, f, ensure_ascii=False, indent=2)
print(f"\n✓ 完整需求信息已保存到: {output_file}")
return story_info
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def upload_attachment(self, file_path, entry_type, entry_id, owner=None, overwrite=False): def upload_attachment(self, file_path, entry_type, entry_id, owner=None, overwrite=False):
""" """
上传附件到TAPD 上传附件到TAPD
@ -927,10 +1232,12 @@ def print_menu():
print("5. 发送应用消息") print("5. 发送应用消息")
print("\n【TAPD API】") print("\n【TAPD API】")
print("6. 获取缺陷字段配置") print("6. 获取缺陷字段配置")
print("7. 获取附件列表") print("7. 获取需求字段配置")
print("8. 上传附件") print("8. 获取需求")
print("9. 获取附件列表")
print("10. 上传附件")
print("\n【其他】") print("\n【其他】")
print("9. 查看日志文件") print("11. 查看日志文件")
print("0. 退出") print("0. 退出")
print("="*50) print("="*50)
@ -942,7 +1249,7 @@ def main():
while True: while True:
print_menu() print_menu()
choice = input("\n请选择操作 (0-9): ").strip() choice = input("\n请选择操作 (0-11): ").strip()
if choice == "0": if choice == "0":
print("\n感谢使用,再见!") print("\n感谢使用,再见!")
@ -1018,6 +1325,18 @@ def main():
tapd_tester.get_bug_custom_fields() tapd_tester.get_bug_custom_fields()
elif choice == "7": elif choice == "7":
# 获取TAPD需求字段配置
tapd_tester.get_story_fields_info()
elif choice == "8":
# 获取TAPD需求
story_id = input("\n请输入需求ID: ").strip()
if not story_id:
print("✗ 需求ID不能为空")
continue
tapd_tester.get_story(story_id)
elif choice == "9":
# 获取TAPD附件列表 # 获取TAPD附件列表
print("\n=== 获取附件列表 ===") print("\n=== 获取附件列表 ===")
print("是否需要添加筛选条件?") print("是否需要添加筛选条件?")
@ -1055,7 +1374,7 @@ def main():
limit=limit limit=limit
) )
elif choice == "8": elif choice == "10":
# 上传附件到TAPD # 上传附件到TAPD
print("\n=== 上传附件 ===") print("\n=== 上传附件 ===")
file_path = input("请输入文件路径: ").strip() file_path = input("请输入文件路径: ").strip()
@ -1099,7 +1418,7 @@ def main():
overwrite=overwrite overwrite=overwrite
) )
elif choice == "9": elif choice == "11":
print("\n=== 查看日志文件 ===") print("\n=== 查看日志文件 ===")
try: try:
with open(wework_tester.log_file, 'r', encoding='utf-8') as f: with open(wework_tester.log_file, 'r', encoding='utf-8') as f:

View File

@ -20,6 +20,7 @@ from src.mapper import FieldMapper, BugCreationResult
from src.token_manager import TokenManager from src.token_manager import TokenManager
from src.status_mapper import BugStatusMapper from src.status_mapper import BugStatusMapper
from src.wework_notifier import WeWorkNotifier from src.wework_notifier import WeWorkNotifier
from src.api_logger import get_logger
def parse_arguments(): def parse_arguments():
@ -388,6 +389,17 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
error_msg = f"字段映射失败: {e}" error_msg = f"字段映射失败: {e}"
print(f"{error_msg}") print(f"{error_msg}")
# 记录失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="create_bug_failure",
request_data={"record_id": record_id, "title": title},
response_data={},
success=False,
error_message=error_msg
)
result = BugCreationResult( result = BugCreationResult(
record_id=record_id, record_id=record_id,
success=False, success=False,
@ -400,6 +412,17 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
error_msg = f"TAPD API调用失败: {e}" error_msg = f"TAPD API调用失败: {e}"
print(f"{error_msg}") print(f"{error_msg}")
# 记录失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="create_bug_failure",
request_data={"record_id": record_id, "title": title},
response_data={},
success=False,
error_message=error_msg
)
result = BugCreationResult( result = BugCreationResult(
record_id=record_id, record_id=record_id,
success=False, success=False,
@ -412,6 +435,17 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
error_msg = f"未预期的错误: {type(e).__name__}: {e}" error_msg = f"未预期的错误: {type(e).__name__}: {e}"
print(f"{error_msg}") print(f"{error_msg}")
# 记录失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="create_bug_failure",
request_data={"record_id": record_id, "title": title},
response_data={},
success=False,
error_message=error_msg
)
result = BugCreationResult( result = BugCreationResult(
record_id=record_id, record_id=record_id,
success=False, success=False,
@ -638,11 +672,26 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
for invalid_record in validation_result['invalid_records']: for invalid_record in validation_result['invalid_records']:
record_data = invalid_record['record_data'] record_data = invalid_record['record_data']
missing_fields = invalid_record['missing_fields'] missing_fields = invalid_record['missing_fields']
error_msg = f"校验失败,缺失字段: {', '.join(missing_fields)}"
# 记录校验失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="validation_failure",
request_data={
"record_id": record_data.get('record_id', '未知'),
"title": record_data.get('标题', '(无标题)')
},
response_data={},
success=False,
error_message=error_msg
)
validation_failed_result = BugCreationResult( validation_failed_result = BugCreationResult(
record_id=record_data.get('record_id', '未知'), record_id=record_data.get('record_id', '未知'),
success=False, success=False,
error_message=f"校验失败,缺失字段: {', '.join(missing_fields)}" error_message=error_msg
) )
validation_failed_results.append(validation_failed_result) validation_failed_results.append(validation_failed_result)

View File

@ -60,6 +60,7 @@ class SmartSheetAPI:
print("=" * 80) print("=" * 80)
print(f"请求方法: {method}") print(f"请求方法: {method}")
print(f"请求URL: {self.BASE_URL}/{endpoint}") print(f"请求URL: {self.BASE_URL}/{endpoint}")
print(f"完整URL含参数: {url}") # 显示完整URL
if data: if data:
import json import json
print(f"请求数据:") print(f"请求数据:")
@ -94,6 +95,15 @@ class SmartSheetAPI:
# 检查企业微信API返回的错误码 # 检查企业微信API返回的错误码
if result.get('errcode', 0) != 0: if result.get('errcode', 0) != 0:
error_msg = result.get('errmsg', '未知错误') error_msg = result.get('errmsg', '未知错误')
# 记录API调用日志业务错误
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=f"errcode={result['errcode']}, errmsg={error_msg}"
)
raise RuntimeError(f"API调用失败: errcode={result['errcode']}, errmsg={error_msg}") raise RuntimeError(f"API调用失败: errcode={result['errcode']}, errmsg={error_msg}")
return result return result

3
src2/__init__.py Normal file
View File

@ -0,0 +1,3 @@
"""
任务二TAPD状态实时同步至腾讯智能表格
"""

196
src2/config.py Normal file
View File

@ -0,0 +1,196 @@
"""
任务二配置管理模块
复用任务一的ConfigManager读取任务二专用配置文件
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径,以便导入 src 模块
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager as BaseConfigManager
from typing import List
class Task2ConfigManager(BaseConfigManager):
"""任务二配置管理器继承自任务一的ConfigManager"""
def __init__(self, config_path=None):
"""
初始化任务二配置管理器
Args:
config_path: 配置文件路径如果为None则使用任务二默认路径
"""
if config_path is None:
# 默认路径:项目根目录/config/config_task2.ini
config_path = project_root / "config" / "config_task2.ini"
super().__init__(config_path)
def get_tapd_config(self):
"""
获取TAPD配置任务二版本不需要reporter
Returns:
dict: 包含workspace_id的字典
"""
if not self.config.has_section('TAPD'):
raise ValueError("配置文件缺少[TAPD]节")
if not self.config.has_option('TAPD', 'workspace_id'):
raise ValueError("配置文件[TAPD]节缺少workspace_id配置项")
workspace_id = self.config.get('TAPD', 'workspace_id').strip()
if not workspace_id:
raise ValueError("workspace_id配置项不能为空")
return {
'workspace_id': workspace_id
}
def get_smartsheet_config(self):
"""
获取智能表格配置任务二版本支持多个docid
Returns:
dict: 包含docid_list的字典
Raises:
ValueError: 配置项缺失时抛出
"""
if not self.config.has_section('SmartSheet'):
raise ValueError("配置文件缺少[SmartSheet]节")
if not self.config.has_option('SmartSheet', 'docid'):
raise ValueError("配置文件[SmartSheet]节缺少docid配置项")
docid_raw = self.config.get('SmartSheet', 'docid').strip()
if not docid_raw:
raise ValueError("docid配置项不能为空")
# 解析逗号分隔的多个docid
docid_list = [d.strip() for d in docid_raw.split(',') if d.strip()]
if not docid_list:
raise ValueError("docid配置项解析后为空")
return {
'docid': docid_list[0], # 保持向后兼容返回第一个docid
'docid_list': docid_list # 新增返回所有docid列表
}
def get_docid_list(self) -> List[str]:
"""
获取所有配置的docid列表
Returns:
List[str]: docid列表
"""
return self.get_smartsheet_config()['docid_list']
def get_schedule_config(self):
"""
获取调度配置任务二版本只需要sync_interval
Returns:
dict: 包含sync_interval的字典
"""
default_sync_interval = 15
if not self.config.has_section('Schedule'):
return {'sync_interval': default_sync_interval}
if not self.config.has_option('Schedule', 'sync_interval'):
return {'sync_interval': default_sync_interval}
sync_interval_str = self.config.get('Schedule', 'sync_interval').strip()
try:
sync_interval = int(sync_interval_str)
except ValueError:
raise ValueError(f"sync_interval必须为整数当前值: {sync_interval_str}")
if sync_interval <= 0:
raise ValueError(f"sync_interval必须为正整数当前值: {sync_interval}")
return {'sync_interval': sync_interval}
def get_wework_config(self):
"""
获取企业微信推送配置
Returns:
dict: 包含agentid和receivers的字典如果配置不存在则返回None
"""
if not self.config.has_section('wework'):
return None
agentid = self.config.get('wework', 'agentid', fallback='').strip()
receivers = self.config.get('wework', 'receivers', fallback='').strip()
# 如果配置不完整返回None
if not agentid or not receivers:
return None
return {
'agentid': agentid,
'receivers': receivers
}
def get_all_config(self):
"""获取所有配置"""
return {
'tapd': self.get_tapd_config(),
'smartsheet': self.get_smartsheet_config(),
'schedule': self.get_schedule_config(),
'wework': self.get_wework_config()
}
def print_config(self):
"""打印当前配置信息"""
print("\n=== 任务二配置信息 ===")
try:
tapd_config = self.get_tapd_config()
print(f"[TAPD]")
print(f" workspace_id: {tapd_config['workspace_id']}")
except ValueError as e:
print(f"[TAPD] 配置错误: {e}")
try:
smartsheet_config = self.get_smartsheet_config()
print(f"[SmartSheet]")
docid_list = smartsheet_config['docid_list']
print(f" docid数量: {len(docid_list)}")
for i, docid in enumerate(docid_list, 1):
# 显示docid的前20个字符便于识别
display_id = docid[:20] + "..." if len(docid) > 20 else docid
print(f" docid_{i}: {display_id}")
except ValueError as e:
print(f"[SmartSheet] 配置错误: {e}")
try:
schedule_config = self.get_schedule_config()
print(f"[Schedule]")
print(f" sync_interval: {schedule_config['sync_interval']} 分钟")
except ValueError as e:
print(f"[Schedule] 配置错误: {e}")
wework_config = self.get_wework_config()
if wework_config:
print(f"[wework]")
print(f" agentid: {wework_config['agentid']}")
print(f" receivers: {wework_config['receivers']}")
else:
print(f"[wework] 未配置(推送功能将被禁用)")
print("======================\n")
if __name__ == "__main__":
try:
config = Task2ConfigManager()
config.print_config()
except Exception as e:
print(f"错误: {e}")

115
src2/link_parser.py Normal file
View File

@ -0,0 +1,115 @@
"""
TAPD链接解析模块
负责从TAPD链接中提取需求单号
支持的链接格式
1. 列表页弹窗链接: https://www.tapd.cn/tapd_fe/{workspace_id}/story/list?...dialog_preview_id=story_{单号}
2. 详情页链接: https://www.tapd.cn/{workspace_id}/prong/stories/view/{单号}
"""
import re
from typing import Tuple, Optional
# 链接类型常量
LINK_TYPE_DIALOG = "dialog" # 列表页弹窗链接
LINK_TYPE_VIEW = "view" # 详情页链接
LINK_TYPE_UNKNOWN = "unknown"
def parse_tapd_link(url: str) -> Tuple[bool, str, str]:
"""
解析TAPD链接提取需求单号
Args:
url: TAPD链接字符串
Returns:
Tuple[bool, str, str]: (是否成功, 单号或错误信息, 链接类型)
- 成功时: (True, "1234567890", "dialog" "view")
- 失败时: (False, "错误信息", "unknown")
"""
if not url:
return (False, "链接为空", LINK_TYPE_UNKNOWN)
# 确保是字符串类型
if not isinstance(url, str):
url = str(url)
url = url.strip()
if not url:
return (False, "链接为空", LINK_TYPE_UNKNOWN)
# 格式一:列表页弹窗链接
# 匹配 dialog_preview_id=story_(\d+)
pattern_dialog = r'dialog_preview_id=story_(\d+)'
match_dialog = re.search(pattern_dialog, url)
if match_dialog:
story_id = match_dialog.group(1)
return (True, story_id, LINK_TYPE_DIALOG)
# 格式二:详情页链接
# 匹配 /stories/view/(\d+)
pattern_view = r'/stories/view/(\d+)'
match_view = re.search(pattern_view, url)
if match_view:
story_id = match_view.group(1)
return (True, story_id, LINK_TYPE_VIEW)
# 未匹配到任何格式
return (False, f"无法识别的链接格式: {url[:100]}", LINK_TYPE_UNKNOWN)
def extract_story_id(url: str) -> Optional[str]:
"""
从TAPD链接中提取需求单号简化接口
Args:
url: TAPD链接字符串
Returns:
Optional[str]: 成功返回单号失败返回None
"""
success, result, _ = parse_tapd_link(url)
return result if success else None
def is_valid_tapd_link(url: str) -> bool:
"""
检查是否为有效的TAPD链接
Args:
url: TAPD链接字符串
Returns:
bool: 是否为有效链接
"""
success, _, _ = parse_tapd_link(url)
return success
if __name__ == "__main__":
print("=== TAPD链接解析器测试 ===\n")
# 测试用例
test_cases = [
# 格式一:列表页弹窗链接
"https://www.tapd.cn/tapd_fe/58335167/story/list?dialog_preview_id=story_1158335167001044388",
# 格式二:详情页链接
"https://www.tapd.cn/58335167/prong/stories/view/1158335167001044388",
# 无效链接
"https://www.tapd.cn/58335167/bugtrace/bugs/view/123456",
"https://www.google.com",
"",
None,
]
for i, url in enumerate(test_cases, 1):
print(f"测试 {i}: {url}")
success, result, link_type = parse_tapd_link(url)
if success:
print(f" ✓ 解析成功: 单号={result}, 类型={link_type}")
else:
print(f" ✗ 解析失败: {result}")
print()

56
src2/logger.py Normal file
View File

@ -0,0 +1,56 @@
"""
任务二日志模块
创建独立的 APILogger 实例日志写入 logs2/ 目录
设计说明
- 不修改 src/api_logger.py get_logger() 全局单例
- 任务二使用独立的 logger 实例避免与任务一冲突
- 两个任务可以同时运行日志互不干扰
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.api_logger import APILogger
# 任务二日志目录
TASK2_LOG_DIR = project_root / "logs2"
# 任务二专用的 logger 实例(模块级单例)
_task2_logger = None
def get_task2_logger() -> APILogger:
"""
获取任务二专用的日志记录器
Returns:
APILogger: 任务二专用的日志记录器实例
"""
global _task2_logger
if _task2_logger is None:
_task2_logger = APILogger(log_dir=str(TASK2_LOG_DIR))
return _task2_logger
if __name__ == "__main__":
print("=== 任务二日志模块测试 ===\n")
logger = get_task2_logger()
# 测试记录一条日志
logger.log_api_call(
api_type="test",
operation="task2/test_log",
request_data={"test": "任务二日志测试"},
response_data={"status": "ok"},
success=True
)
print(f"日志目录: {TASK2_LOG_DIR}")
print(f"日志文件: {logger._get_today_log_file()}")
print("\n测试完成,请检查 logs2/ 目录")

156
src2/main.py Normal file
View File

@ -0,0 +1,156 @@
"""
任务二主程序入口
TAPD状态实时同步至腾讯智能表格
功能
1. 命令行参数解析
2. 单次执行模式
3. 执行结果统计
"""
import sys
import argparse
from pathlib import Path
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.sync_service import run_once
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='TAPD状态同步工具 - 将TAPD需求状态同步到腾讯智能表格',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
# 执行一次同步
python src2/main.py
# 指定配置文件
python src2/main.py --config /path/to/config.ini
# 测试模式(显示详细信息)
python src2/main.py --test
# 手动传入access_token
python src2/main.py --token YOUR_ACCESS_TOKEN
"""
)
parser.add_argument(
'-c', '--config',
default=None,
help='配置文件路径(默认: config/config_task2.ini'
)
parser.add_argument(
'-t', '--token',
default=None,
help='手动传入access_token默认自动获取'
)
parser.add_argument(
'--test',
action='store_true',
help='启用测试模式显示详细的API调用信息'
)
return parser.parse_args()
def print_result_summary(result: dict):
"""打印执行结果摘要"""
print("\n" + "=" * 60)
print("执行结果摘要")
print("=" * 60)
if result["success"]:
print("状态: ✓ 成功")
else:
print("状态: ✗ 失败")
if result.get("error_message"):
print(f"错误: {result['error_message']}")
print(f"\n子表统计:")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f"\n记录统计:")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
print(f" 同步失败: {result['records_failed']}")
# 显示各子表详情
if result.get("sheet_results"):
print(f"\n子表详情:")
for sheet in result["sheet_results"]:
status = "跳过" if sheet["skipped"] else "完成"
print(f" - {sheet['sheet_title']}: {status}")
if sheet["skipped"]:
print(f" 原因: {sheet['skip_reason']}")
else:
print(f" 链接: {sheet['records_with_link']} | "
f"更新: {sheet['records_updated']} | "
f"失败: {sheet['records_failed']}")
print("=" * 60)
def main():
"""主函数"""
print("\n" + "=" * 60)
print("TAPD状态同步工具 (任务二)")
print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
try:
# 解析命令行参数
args = parse_arguments()
# 初始化配置管理器
print("\n正在加载配置...")
config_manager = Task2ConfigManager(config_path=args.config)
config_manager.print_config()
# 获取access_token
access_token = args.token
if access_token is None:
print("正在获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" ✓ access_token获取成功")
# 执行同步
print("\n开始执行同步...")
result = run_once(
config_manager=config_manager,
access_token=access_token,
test_mode=args.test
)
# 打印结果摘要
print_result_summary(result)
# 返回状态码
return 0 if result["success"] else 1
except KeyboardInterrupt:
print("\n\n用户中断执行")
return 130
except Exception as e:
print(f"\n✗ 执行失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

136
src2/notifier.py Normal file
View File

@ -0,0 +1,136 @@
"""
任务二企业微信消息推送模块
用于发送同步失败通知
"""
import sys
from pathlib import Path
from typing import List, Dict
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.wework_notifier import WeWorkNotifier
def send_sync_failure_notification(access_token: str, agentid: str,
receivers: str, failed_records: List[Dict]) -> bool:
"""
发送同步失败通知
Args:
access_token: 企业微信access_token
agentid: 应用ID
receivers: 接收人列表
failed_records: 失败记录列表每条记录包含
- sheet_title: 子表标题
- record_id: 记录ID
- tapd_link: TAPD链接
- error_message: 失败原因
Returns:
bool: 是否发送成功
"""
if not failed_records:
return True
# 构造消息内容
content = _build_sync_failure_message(failed_records)
# 使用任务一的推送器发送消息
notifier = WeWorkNotifier(access_token, agentid, receivers)
return notifier._send_text_message(content)
def _build_sync_failure_message(failed_records: List[Dict]) -> str:
"""
构造同步失败消息内容支持多表格多子表分组
Args:
failed_records: 失败记录列表每条记录可包含
- doc_index: 表格序号可选
- docid_short: 表格ID简写可选
- sheet_title: 子表标题
- record_id: 记录ID
- tapd_link: TAPD链接
- error_message: 失败原因
Returns:
str: 格式化的消息内容
"""
# 按表格和子表分组
records_by_doc = {}
for record in failed_records:
doc_index = record.get('doc_index', 1)
docid_short = record.get('docid_short', '')
doc_key = (doc_index, docid_short)
if doc_key not in records_by_doc:
records_by_doc[doc_key] = {}
sheet_title = record.get('sheet_title', '未知子表')
if sheet_title not in records_by_doc[doc_key]:
records_by_doc[doc_key][sheet_title] = []
records_by_doc[doc_key][sheet_title].append(record)
# 消息头部
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
total_count = len(failed_records)
doc_count = len(records_by_doc)
lines = [
"【autoTAPD 同步失败通知】",
f"时间: {timestamp}",
f"失败数量: {total_count}",
]
# 如果有多个表格,显示表格数量
if doc_count > 1:
lines.append(f"涉及表格: {doc_count}")
lines.extend([
"",
"以下记录同步失败,请检查:",
"=" * 40
])
# 按表格和子表分组显示失败记录
global_idx = 1
for (doc_index, docid_short), sheets in sorted(records_by_doc.items()):
# 如果有多个表格,显示表格标识
if doc_count > 1:
doc_label = f"表格{doc_index}"
if docid_short:
doc_label += f" ({docid_short})"
lines.append(f"\n{'#'*20}")
lines.append(f"# {doc_label}")
lines.append(f"{'#'*20}")
for sheet_title, sheet_records in sheets.items():
lines.append(f"\n【子表:{sheet_title}")
lines.append("")
for record in sheet_records:
record_id = record.get('record_id', '未知')
tapd_link = record.get('tapd_link', '(无链接)')
error_message = record.get('error_message', '未知错误')
lines.append(f"[{global_idx}] 记录ID: {record_id}")
lines.append(f"TAPD链接: {tapd_link}")
lines.append(f"失败原因: {error_message}")
lines.append("")
global_idx += 1
lines.append("=" * 40)
lines.append("系统将在下次同步时自动重试失败记录。")
return "\n".join(lines)
if __name__ == "__main__":
print("=== 任务二推送模块 ===")
print("此模块提供同步失败通知功能")
print("请通过 sync_service 或 main.py 调用")

354
src2/scheduler.py Normal file
View File

@ -0,0 +1,354 @@
"""
任务二调度器模块
负责定时执行TAPD状态同步任务
功能
1. 按配置频率定时执行
2. 优雅退出Ctrl+C
3. 运行统计
"""
import sys
import time
import signal
import argparse
from pathlib import Path
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 导入schedule库
try:
import schedule
except ImportError:
print("错误: 缺少schedule库")
print("请运行: pip install schedule")
sys.exit(1)
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.sync_service import run_once
class Task2Scheduler:
"""任务二调度器"""
def __init__(self, config_path=None, verbose=False):
"""
初始化调度器
Args:
config_path: 配置文件路径
verbose: 是否显示详细信息
"""
self.config_path = config_path
self.verbose = verbose
self.running = True
# 统计信息
self.stats = {
'start_time': None,
'total_runs': 0,
'success_runs': 0,
'failed_runs': 0,
'total_records_synced': 0,
'total_records_updated': 0,
'last_run_time': None
}
# 初始化配置管理器
self._init_config()
# 初始化TokenManager
self._init_token_manager()
# 获取调度配置
self.sync_interval = self.config['schedule']['sync_interval']
def _init_config(self):
"""初始化配置"""
try:
print("正在加载配置文件...")
self.config_manager = Task2ConfigManager(config_path=self.config_path)
self.config = self.config_manager.get_all_config()
print("✓ 配置文件加载成功")
except Exception as e:
print(f"✗ 配置文件加载失败: {e}")
raise
def _init_token_manager(self):
"""初始化TokenManager"""
try:
print("正在初始化TokenManager...")
self.token_manager = TokenManager()
print("✓ TokenManager初始化成功")
except Exception as e:
print(f"✗ TokenManager初始化失败: {e}")
raise
def job(self):
"""执行一次同步任务"""
print("\n" + "=" * 80)
print(f"开始执行同步任务 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
try:
# 获取access_token
access_token = self.token_manager.get_token()
# 执行一次同步流程
result = run_once(
config_manager=self.config_manager,
access_token=access_token,
test_mode=self.verbose
)
# 更新统计信息
self.stats['total_runs'] += 1
self.stats['last_run_time'] = datetime.now()
if result['success']:
self.stats['success_runs'] += 1
self.stats['total_records_synced'] += result['records_synced']
self.stats['total_records_updated'] += result['records_updated']
print("\n" + "-" * 80)
print("本次执行统计:")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
print(f" 同步失败: {result['records_failed']}")
print("-" * 80)
else:
self.stats['failed_runs'] += 1
print("\n" + "-" * 80)
print("本次执行失败:")
print(f" 错误信息: {result.get('error_message', '未知错误')}")
print("-" * 80)
except Exception as e:
self.stats['total_runs'] += 1
self.stats['failed_runs'] += 1
self.stats['last_run_time'] = datetime.now()
print("\n" + "-" * 80)
print("本次执行异常:")
print(f" 错误类型: {type(e).__name__}")
print(f" 错误信息: {e}")
print("-" * 80)
if self.verbose:
import traceback
print("\n详细错误信息:")
traceback.print_exc()
# 显示下次执行时间
self._show_next_run_time()
def _show_next_run_time(self):
"""显示下次执行时间"""
next_run = schedule.idle_seconds()
if next_run is not None:
next_run_time = datetime.now().timestamp() + next_run
next_run_str = datetime.fromtimestamp(next_run_time).strftime('%Y-%m-%d %H:%M:%S')
print(f"\n下次执行时间: {next_run_str} (约 {int(next_run / 60)} 分钟后)")
def _startup_check(self):
"""启动检查"""
print("\n" + "=" * 80)
print("启动检查")
print("=" * 80)
checks_passed = True
# 1. 检查配置文件
print("\n[1/3] 检查配置文件...")
try:
print(f" ✓ 配置文件已加载: {self.config_path or '默认路径'}")
print(f" ✓ TAPD workspace_id: {self.config['tapd']['workspace_id']}")
print(f" ✓ SmartSheet docid: {self.config['smartsheet']['docid'][:20]}...")
print(f" ✓ 同步间隔: {self.sync_interval} 分钟")
except Exception as e:
print(f" ✗ 配置检查失败: {e}")
checks_passed = False
# 2. 检查环境变量
print("\n[2/3] 检查环境变量...")
import os
required_env_vars = {
'CORPID': '企业微信CorpID',
'CORPSECRET': '企业微信CorpSecret',
'TAPD_API_USER': 'TAPD API用户名',
'TAPD_API_PASSWORD': 'TAPD API密码'
}
for env_var, description in required_env_vars.items():
if os.environ.get(env_var):
print(f"{description} ({env_var}): 已设置")
else:
print(f"{description} ({env_var}): 未设置")
checks_passed = False
# 3. 测试access_token获取
print("\n[3/3] 测试access_token获取...")
try:
access_token = self.token_manager.get_token()
print(f" ✓ access_token获取成功: {access_token[:10]}...(已隐藏)")
except Exception as e:
print(f" ✗ access_token获取失败: {e}")
checks_passed = False
print("\n" + "=" * 80)
if checks_passed:
print("启动检查通过 ✓")
else:
print("启动检查失败 ✗")
print("\n请检查上述错误并修复后重试")
return False
print("=" * 80)
return True
def start(self):
"""启动调度器"""
print("\n" + "=" * 80)
print("TAPD状态同步调度器 (任务二)")
print("版本: 1.0.0 (第四阶段)")
print("=" * 80)
# 执行启动检查
if not self._startup_check():
print("\n调度器启动失败")
sys.exit(1)
# 记录启动时间
self.stats['start_time'] = datetime.now()
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# 配置定时任务
schedule.every(self.sync_interval).minutes.do(self.job)
print(f"\n调度器已启动:")
print(f" - 同步任务: 立即执行一次,然后每 {self.sync_interval} 分钟执行一次")
print("按 Ctrl+C 停止调度器")
print()
# 立即执行一次同步任务
self.job()
# 进入调度循环
while self.running:
schedule.run_pending()
time.sleep(1)
def _signal_handler(self, signum, frame):
"""信号处理函数"""
print("\n\n" + "=" * 80)
print("收到停止信号,正在优雅退出...")
print("=" * 80)
self.running = False
self._print_final_stats()
def _print_final_stats(self):
"""打印最终统计信息"""
print("\n" + "=" * 80)
print("运行统计")
print("=" * 80)
if self.stats['start_time']:
start_time_str = self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')
print(f"启动时间: {start_time_str}")
if self.stats['last_run_time']:
last_run_str = self.stats['last_run_time'].strftime('%Y-%m-%d %H:%M:%S')
print(f"最后执行: {last_run_str}")
if self.stats['start_time']:
running_time = datetime.now() - self.stats['start_time']
hours = int(running_time.total_seconds() // 3600)
minutes = int((running_time.total_seconds() % 3600) // 60)
print(f"运行时长: {hours} 小时 {minutes} 分钟")
print(f"\n同步任务统计:")
print(f" 总执行次数: {self.stats['total_runs']}")
print(f" 成功次数: {self.stats['success_runs']}")
print(f" 失败次数: {self.stats['failed_runs']}")
print(f" 总同步记录: {self.stats['total_records_synced']}")
print(f" 总更新记录: {self.stats['total_records_updated']}")
if self.stats['total_runs'] > 0:
success_rate = (self.stats['success_runs'] / self.stats['total_runs']) * 100
print(f" 成功率: {success_rate:.1f}%")
print("=" * 80)
print("调度器已停止")
print("=" * 80)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='TAPD状态同步调度器 - 定时执行同步任务',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
# 使用默认配置启动调度器
python src2/scheduler.py
# 指定配置文件路径
python src2/scheduler.py --config /path/to/config.ini
# 显示详细信息
python src2/scheduler.py --verbose
"""
)
parser.add_argument(
'-c', '--config',
default=None,
help='配置文件路径(默认: config/config_task2.ini'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='显示详细输出信息'
)
return parser.parse_args()
def main():
"""主函数"""
try:
# 解析命令行参数
args = parse_arguments()
# 创建并启动调度器
scheduler = Task2Scheduler(
config_path=args.config,
verbose=args.verbose
)
scheduler.start()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
print(f"\n✗ 调度器启动失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

45
src2/smartsheet.py Normal file
View File

@ -0,0 +1,45 @@
"""
任务二专用的智能表格API模块
继承自任务一的 SmartSheetAPI使用任务二专用的日志记录器
设计说明
- 继承 src.smartsheet.SmartSheetAPI 的所有功能
- 重写 __init__ 方法使用 get_task2_logger() 替代 get_logger()
- 确保所有智能表格 API 调用日志写入 logs2/ 目录
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.smartsheet import SmartSheetAPI
from src2.logger import get_task2_logger
class SmartSheetAPITask2(SmartSheetAPI):
"""任务二专用的智能表格API类"""
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格API任务二专用
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式显示API返回结果
"""
# 调用父类初始化
super().__init__(access_token, docid, test_mode)
# 替换为任务二专用的日志记录器
self.logger = get_task2_logger()
if __name__ == "__main__":
print("=== 任务二智能表格API模块 ===\n")
print("此模块继承自 src.smartsheet.SmartSheetAPI")
print("使用任务二专用的日志记录器,日志写入 logs2/ 目录")
print("\n请通过 SmartSheetSync 类使用此模块")

461
src2/smartsheet_sync.py Normal file
View File

@ -0,0 +1,461 @@
"""
任务二智能表格同步模块
负责智能表格的数据读取和回写
功能
1. 检测必要字段是否存在
2. 读取所有记录
3. 提取TAPD链接
4. 构造更新记录
5. 批量回写状态信息
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src2.smartsheet import SmartSheetAPITask2
from src2.link_parser import parse_tapd_link, extract_story_id
from src2.logger import get_task2_logger
# ============================================================
# 字段名称常量(与智能表格列名完全一致)
# ============================================================
FIELD_TAPD_LINK = "TAPD链接" # 用户填写,解析单号
FIELD_TAPD_STATUS = "TAPD状态(🈲勿手改)" # 工具回写
FIELD_OWNER = "处理人(🈲勿手改)" # 工具回写
FIELD_BEGIN_DATE = "TAPD预计开始日期(🈲勿手改)" # 工具回写
FIELD_DUE_DATE = "TAPD预计完成日期(🈲勿手改)" # 工具回写
FIELD_PLAN = "发布计划(🈲勿手改)" # 工具回写TAPD发布计划字段
FIELD_SYNC_STATUS = "同步状态(🈲勿手改)" # 工具回写,标记同步结果
# 必要字段列表
REQUIRED_FIELDS = [
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
FIELD_PLAN,
FIELD_SYNC_STATUS,
]
class SmartSheetSync:
"""智能表格同步类"""
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格同步模块
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式
"""
self.api = SmartSheetAPITask2(access_token, docid, test_mode)
self.logger = get_task2_logger()
self.test_mode = test_mode
def check_required_fields(self, fields: List[Dict]) -> Tuple[bool, List[str], Dict[str, str]]:
"""
检测必要字段是否存在
Args:
fields: 字段列表从get_fields获取
Returns:
Tuple[bool, List[str], Dict[str, str]]:
- 是否所有必要字段都存在
- 缺失的字段列表
- 字段名称到字段ID的映射
"""
# 构建字段映射
field_mapping = {}
for field in fields:
field_title = field.get('field_title', '')
field_id = field.get('field_id', '')
if field_title and field_id:
field_mapping[field_title] = field_id
# 检查必要字段
missing_fields = []
for required_field in REQUIRED_FIELDS:
if required_field not in field_mapping:
missing_fields.append(required_field)
all_present = len(missing_fields) == 0
if all_present:
print(f" ✓ 所有必要字段都存在")
else:
print(f" ⚠ 缺少必要字段: {', '.join(missing_fields)}")
return (all_present, missing_fields, field_mapping)
def get_all_records(self, sheet_id: str) -> List[Dict]:
"""
获取子表的所有记录支持分页
Args:
sheet_id: 子表ID
Returns:
List[Dict]: 所有记录列表
"""
print(f"正在获取所有记录...")
all_records = []
offset = 0
limit = 100
while True:
result = self.api.get_records(sheet_id, limit=limit, offset=offset)
records = result['records']
total = result['total']
all_records.extend(records)
print(f" - 已获取 {len(all_records)}/{total} 条记录")
if len(all_records) >= total:
break
offset += limit
print(f" ✓ 共获取 {len(all_records)} 条记录")
return all_records
def extract_tapd_link(self, record: Dict) -> Optional[str]:
"""
从记录中提取TAPD链接
Args:
record: 记录对象
Returns:
Optional[str]: TAPD链接字符串如果不存在则返回None
"""
link_value = self.api.get_field_value_by_title(record, FIELD_TAPD_LINK)
if not link_value:
return None
# 链接字段可能是字符串或包含url的对象
if isinstance(link_value, str):
return link_value
elif isinstance(link_value, dict):
# 可能是 {url: "...", text: "..."} 格式
return link_value.get('url') or link_value.get('text')
elif isinstance(link_value, list):
# 可能是列表格式
if len(link_value) > 0:
first_item = link_value[0]
if isinstance(first_item, dict):
return first_item.get('url') or first_item.get('text')
elif isinstance(first_item, str):
return first_item
return None
def build_update_record(self, record_id: str, status: str = None,
owner: str = None, begin_date: str = None,
due_date: str = None, plan: str = None,
sync_status: str = None) -> Dict:
"""
构造更新记录的数据结构
Args:
record_id: 记录ID
status: TAPD状态中文
owner: 处理人
begin_date: 预计开始日期
due_date: 预计完成日期
plan: 计划中文名称
sync_status: 同步状态"成功" "失败"
Returns:
Dict: 更新记录的数据结构
"""
values = {}
# 处理字段值:
# - None: 不更新该字段(跳过)
# - 空字符串 "": 清空该字段(传入空数组)
# - 非空字符串: 更新为新值
if status is not None:
if status == "":
values[FIELD_TAPD_STATUS] = []
else:
values[FIELD_TAPD_STATUS] = [{"type": "text", "text": status}]
if owner is not None:
if owner == "":
values[FIELD_OWNER] = []
else:
values[FIELD_OWNER] = [{"type": "text", "text": owner}]
if begin_date is not None:
if begin_date == "":
values[FIELD_BEGIN_DATE] = []
else:
values[FIELD_BEGIN_DATE] = [{"type": "text", "text": begin_date}]
if due_date is not None:
if due_date == "":
values[FIELD_DUE_DATE] = []
else:
values[FIELD_DUE_DATE] = [{"type": "text", "text": due_date}]
if plan is not None:
if plan == "":
values[FIELD_PLAN] = []
else:
values[FIELD_PLAN] = [{"type": "text", "text": plan}]
if sync_status is not None:
if sync_status == "":
values[FIELD_SYNC_STATUS] = []
else:
values[FIELD_SYNC_STATUS] = [{"type": "text", "text": sync_status}]
return {
"record_id": record_id,
"values": values
}
def batch_update_records(self, sheet_id: str, update_records: List[Dict]) -> Dict:
"""
批量回写状态信息使用任务一的API带debug参数
Args:
sheet_id: 子表ID
update_records: 需要更新的记录列表
Returns:
Dict: 更新结果
"""
if not update_records:
print(" ⚠ 没有需要更新的记录")
return {"records": []}
# 直接使用任务一的 update_records 方法已添加debug=1
return self.api.update_records(sheet_id, update_records)
def get_records_with_tapd_link(self, sheet_id: str,
all_records: List[Dict] = None) -> List[Dict]:
"""
获取所有包含TAPD链接的新记录同步状态为空
Args:
sheet_id: 子表ID
all_records: 可选已获取的所有记录列表避免重复获取
Returns:
List[Dict]: 包含TAPD链接的记录列表
"""
print(f"正在获取包含TAPD链接的新记录...")
if all_records is None:
all_records = self.get_all_records(sheet_id)
records_with_link = []
skipped_synced_count = 0
for record in all_records:
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
# 检查同步状态字段,如果不为空则跳过
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
if sync_status is not None and sync_status != "":
skipped_synced_count += 1
continue
record_id = record.get('record_id', '')
# 解析链接
success, result, link_type = parse_tapd_link(tapd_link)
record_info = {
"record": record,
"record_id": record_id,
"tapd_link": tapd_link,
"parse_success": success,
}
if success:
record_info["story_id"] = result
record_info["link_type"] = link_type
else:
record_info["story_id"] = None
record_info["parse_error"] = result
records_with_link.append(record_info)
# 统计
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" ✓ 找到 {len(records_with_link)} 条包含TAPD链接的记录")
if skipped_synced_count > 0:
print(f" - 跳过已同步记录: {skipped_synced_count}")
print(f" - 链接解析成功: {success_count}")
if fail_count > 0:
print(f" - 链接解析失败: {fail_count}")
return records_with_link
def get_current_field_values(self, record: Dict) -> Dict[str, Any]:
"""
获取记录当前的字段值
Args:
record: 记录对象
Returns:
Dict: 当前字段值
"""
return {
FIELD_TAPD_STATUS: self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS),
FIELD_OWNER: self.api.get_field_value_by_title(record, FIELD_OWNER),
FIELD_BEGIN_DATE: self.api.get_field_value_by_title(record, FIELD_BEGIN_DATE),
FIELD_DUE_DATE: self.api.get_field_value_by_title(record, FIELD_DUE_DATE),
FIELD_PLAN: self.api.get_field_value_by_title(record, FIELD_PLAN),
}
def get_synced_records_for_update(self, sheet_id: str,
terminal_statuses: List[str],
all_records: List[Dict] = None) -> List[Dict]:
"""
获取需要持续同步的已同步记录
筛选条件
- 同步状态 = "成功"
- TAPD状态 不在终态列表中
Args:
sheet_id: 子表ID
terminal_statuses: 终态列表 ['已完成', '取消']
all_records: 可选已获取的所有记录列表避免重复获取
Returns:
List[Dict]: 需要持续同步的记录列表
"""
print(f"正在获取需要持续同步的记录...")
if all_records is None:
all_records = self.get_all_records(sheet_id)
records_for_update = []
skipped_terminal_count = 0
for record in all_records:
# 检查同步状态是否为成功(兼容新旧格式)
# 旧格式: "成功"
# 新格式: "✅ 同步成功 01-14 15:30"
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
sync_status_str = str(sync_status) if sync_status else ""
if not (sync_status == "成功" or "同步成功" in sync_status_str):
continue
# 检查TAPD链接是否存在
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
# 检查TAPD状态是否为终态
tapd_status = self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS)
if tapd_status in terminal_statuses:
skipped_terminal_count += 1
continue
# 解析链接获取story_id
success, result, link_type = parse_tapd_link(tapd_link)
if not success:
continue
record_info = {
"record": record,
"record_id": record.get('record_id', ''),
"tapd_link": tapd_link,
"story_id": result,
"current_status": tapd_status,
}
records_for_update.append(record_info)
print(f" ✓ 找到 {len(records_for_update)} 条需要持续同步的记录")
if skipped_terminal_count > 0:
print(f" - 跳过终态记录: {skipped_terminal_count}")
return records_for_update
def process_sheet(api: SmartSheetSync, sheet_id: str, sheet_title: str) -> Dict:
"""
处理单个子表的同步流程
Args:
api: SmartSheetSync实例
sheet_id: 子表ID
sheet_title: 子表标题
Returns:
Dict: 处理结果统计
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title}")
print(f"{'='*60}")
result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"success": False,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"parse_success": 0,
"parse_fail": 0,
}
# 1. 获取字段信息
fields = api.api.get_fields(sheet_id)
# 2. 检查必要字段
all_present, missing_fields, field_mapping = api.check_required_fields(fields)
if not all_present:
result["skipped"] = True
result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {result['skip_reason']}")
return result
# 3. 获取包含TAPD链接的记录
records_with_link = api.get_records_with_tapd_link(sheet_id)
result["records_with_link"] = len(records_with_link)
result["parse_success"] = sum(1 for r in records_with_link if r["parse_success"])
result["parse_fail"] = result["records_with_link"] - result["parse_success"]
result["success"] = True
return result
if __name__ == "__main__":
print("=== 智能表格同步模块测试 ===\n")
print("此模块提供以下功能:")
print("1. check_required_fields() - 检测必要字段")
print("2. get_all_records() - 获取所有记录")
print("3. extract_tapd_link() - 提取TAPD链接")
print("4. build_update_record() - 构造更新记录")
print("5. batch_update_records() - 批量回写")
print("6. get_records_with_tapd_link() - 获取包含链接的记录")
print("\n请运行 test_phase3.py 进行完整测试")

812
src2/sync_service.py Normal file
View File

@ -0,0 +1,812 @@
"""
任务二同步服务模块
整合链接解析TAPD查询表格回写实现完整的同步流程
功能
1. 单次同步流程
2. 执行统计
3. 错误处理
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta, timezone
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.logger import get_task2_logger
from src2.tapd_api import TAPDStoryApi, TERMINAL_STATUSES, StoryNotFoundException
from src2.smartsheet_sync import SmartSheetSync, REQUIRED_FIELDS
class CacheEntry:
"""TAPD查询缓存条目"""
def __init__(self, success: bool, data: Optional[Dict] = None,
error: Optional[Exception] = None):
self.success = success
self.data = data
self.error = error
self.timestamp = datetime.now()
class SyncService:
"""TAPD状态同步服务支持多表格同步"""
def __init__(self, config_manager: Task2ConfigManager = None,
access_token: str = None, test_mode: bool = False):
"""
初始化同步服务
Args:
config_manager: 配置管理器如果为None则自动创建
access_token: 企业微信access_token如果为None则自动获取
test_mode: 是否启用测试模式
"""
self.test_mode = test_mode
self.logger = get_task2_logger()
# 初始化配置管理器
if config_manager is None:
self.config_manager = Task2ConfigManager()
else:
self.config_manager = config_manager
# 获取配置
self.config = self.config_manager.get_all_config()
self.workspace_id = self.config['tapd']['workspace_id']
# 获取所有docid列表
self.docid_list = self.config['smartsheet']['docid_list']
print(f" 配置了 {len(self.docid_list)} 个智能表格")
# 获取access_token
if access_token is None:
token_manager = TokenManager()
self.access_token = token_manager.get_token()
else:
self.access_token = access_token
# 初始化TAPD API所有表格共用
self.tapd_api = TAPDStoryApi(self.workspace_id, test_mode=test_mode)
# 获取计划字段映射(每次同步时实时获取,所有表格共用)
print(f" 正在获取计划字段映射...")
try:
self.plan_mapping = self.tapd_api.get_plan_mapping()
print(f" ✓ 计划字段映射获取完成,共 {len(self.plan_mapping)} 个选项")
except Exception as e:
print(f" ⚠ 计划字段映射获取失败: {e},将使用空映射")
self.plan_mapping = {}
# TAPD查询缓存在sync_once中初始化
self._story_cache: Dict[str, CacheEntry] = {}
self._cache_stats = {
"total_queries": 0,
"cache_hits": 0,
"cache_misses": 0,
"api_calls": 0,
"cached_failures": 0
}
print(f" ✓ 同步服务初始化完成")
def _get_beijing_time_str(self) -> str:
"""
获取北京时间格式字符串MM-dd HH:mm
Returns:
str: 格式化的北京时间字符串例如 "01-14 15:30"
"""
# 北京时间是 UTC+8
beijing_tz = timezone(timedelta(hours=8))
beijing_time = datetime.now(beijing_tz)
return beijing_time.strftime("%m-%d %H:%M")
def sync_once(self) -> Dict[str, Any]:
"""
执行一次完整的同步流程遍历所有配置的智能表格
Returns:
Dict: 同步结果统计
"""
# 初始化本次同步的缓存
self._story_cache = {}
self._cache_stats = {
"total_queries": 0,
"cache_hits": 0,
"cache_misses": 0,
"api_calls": 0,
"cached_failures": 0
}
result = {
"success": False,
"start_time": datetime.now().isoformat(),
"end_time": None,
"docs_total": len(self.docid_list),
"docs_success": 0,
"docs_failed": 0,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"doc_results": [] # 每个表格的详细结果
}
all_failed_records = [] # 汇总所有表格的失败记录
# 遍历所有配置的智能表格
for doc_index, docid in enumerate(self.docid_list, 1):
doc_result = self._sync_single_doc(docid, doc_index, len(self.docid_list))
result["doc_results"].append(doc_result)
if doc_result["success"]:
result["docs_success"] += 1
# 累加统计数据
result["sheets_processed"] += doc_result["sheets_processed"]
result["sheets_skipped"] += doc_result["sheets_skipped"]
result["total_records"] += doc_result["total_records"]
result["records_with_link"] += doc_result["records_with_link"]
result["records_synced"] += doc_result["records_synced"]
result["records_updated"] += doc_result["records_updated"]
result["records_failed"] += doc_result["records_failed"]
# 收集失败记录
all_failed_records.extend(doc_result.get("all_failed_records", []))
else:
result["docs_failed"] += 1
# 判断整体是否成功(至少有一个表格成功)
result["success"] = result["docs_success"] > 0
# 发送失败通知(汇总所有表格的失败记录)
if all_failed_records:
self._send_failure_notification(all_failed_records)
# 记录缓存统计
self._log_cache_statistics()
result["cache_stats"] = self._cache_stats.copy()
result["end_time"] = datetime.now().isoformat()
return result
def _sync_single_doc(self, docid: str, doc_index: int, total_docs: int) -> Dict[str, Any]:
"""
同步单个智能表格
Args:
docid: 智能表格文档ID
doc_index: 当前表格序号从1开始
total_docs: 表格总数
Returns:
Dict: 单个表格的同步结果
"""
# 显示docid的前16个字符便于识别
display_id = docid[:16] + "..." if len(docid) > 16 else docid
print(f"\n{'#'*70}")
print(f"# [表格 {doc_index}/{total_docs}] docid: {display_id}")
print(f"{'#'*70}")
doc_result = {
"docid": docid,
"doc_index": doc_index,
"success": False,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"sheet_results": [],
"all_failed_records": []
}
try:
# 为当前表格创建SmartSheetSync实例
smartsheet = SmartSheetSync(self.access_token, docid, test_mode=self.test_mode)
self.current_smartsheet = smartsheet # 供_process_sheet等方法使用
# 获取所有子表
print("\n正在获取子表列表...")
sheets = smartsheet.api.get_sheet_list()
print(f" ✓ 找到 {len(sheets)} 个子表")
# 处理每个子表
for sheet in sheets:
sheet_id = sheet.get('sheet_id', '')
sheet_title = sheet.get('title', '未命名')
sheet_result = self._process_sheet(sheet_id, sheet_title, doc_index)
doc_result["sheet_results"].append(sheet_result)
if sheet_result["skipped"]:
doc_result["sheets_skipped"] += 1
else:
doc_result["sheets_processed"] += 1
doc_result["total_records"] += sheet_result["total_records"]
doc_result["records_with_link"] += sheet_result["records_with_link"]
doc_result["records_synced"] += sheet_result["records_synced"]
doc_result["records_updated"] += sheet_result["records_updated"]
doc_result["records_failed"] += sheet_result["records_failed"]
# 收集失败记录(添加表格标识)
for failed in sheet_result.get("failed_records", []):
failed["doc_index"] = doc_index
failed["docid_short"] = display_id
doc_result["all_failed_records"].append(failed)
doc_result["success"] = True
print(f"\n✓ [表格 {doc_index}/{total_docs}] 同步完成")
except Exception as e:
doc_result["error_message"] = str(e)
print(f"\n✗ [表格 {doc_index}/{total_docs}] 同步失败: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
return doc_result
def _process_sheet(self, sheet_id: str, sheet_title: str, doc_index: int = 1) -> Dict[str, Any]:
"""
处理单个子表的同步
Args:
sheet_id: 子表ID
sheet_title: 子表标题
doc_index: 表格序号用于日志显示
Returns:
Dict: 子表处理结果
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title} (表格{doc_index})")
print(f"{'='*60}")
sheet_result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"details": [],
"failed_records": [] # 收集失败记录的详细信息
}
try:
# 1. 获取字段信息并检查必要字段
fields = self.current_smartsheet.api.get_fields(sheet_id)
all_present, missing_fields, field_mapping = self.current_smartsheet.check_required_fields(fields)
if not all_present:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {sheet_result['skip_reason']}")
return sheet_result
# 2. 获取所有记录(只获取一次,供新记录同步和持续同步共用)
print(f"正在获取所有记录...")
all_records = self.current_smartsheet.get_all_records(sheet_id)
# 3. 获取包含TAPD链接的新记录同步状态为空
records_with_link = self.current_smartsheet.get_records_with_tapd_link(
sheet_id, all_records=all_records
)
sheet_result["records_with_link"] = len(records_with_link)
# 4. 处理新记录
if records_with_link:
print(f"\n--- 新记录同步 ---")
success_records = [] # 成功同步的记录
failed_records_info = [] # 失败的记录信息列表包含ID和状态
for record_info in records_with_link:
record_result = self._process_record(record_info)
sheet_result["details"].append(record_result)
if record_result["success"]:
sheet_result["records_synced"] += 1
if record_result["update_record"]:
success_records.append(record_result["update_record"])
sheet_result["records_updated"] += 1
else:
sheet_result["records_failed"] += 1
# 收集失败记录的ID和对应的同步状态
failed_records_info.append({
"record_id": record_info["record_id"],
"sync_status": record_result.get("sync_status", "⚠️ 同步失败请联系PM")
})
# 收集失败记录的详细信息用于推送
failed_record = {
"sheet_title": sheet_title,
"record_id": record_info["record_id"],
"tapd_link": record_info.get("tapd_link", "(无链接)"),
"error_message": record_result.get("error_message", "未知错误")
}
sheet_result["failed_records"].append(failed_record)
# 4. 批量回写成功记录
if success_records:
print(f"\n正在回写 {len(success_records)} 条成功记录...")
try:
self.current_smartsheet.batch_update_records(sheet_id, success_records)
print(f" ✓ 成功记录回写完成")
except Exception as e:
print(f" ✗ 成功记录回写失败: {e}")
# 5. 批量回写失败记录(根据不同失败原因回写不同状态)
if failed_records_info:
print(f"\n正在回写 {len(failed_records_info)} 条失败记录的状态...")
try:
failed_updates = [
self.current_smartsheet.build_update_record(
record_id=info["record_id"],
sync_status=info["sync_status"]
)
for info in failed_records_info
]
self.current_smartsheet.batch_update_records(sheet_id, failed_updates)
print(f" ✓ 失败记录状态回写完成")
except Exception as e:
print(f" ✗ 失败记录状态回写失败: {e}")
else:
print(f" 没有新记录需要同步")
# 7. 持续同步:更新已同步记录的最新状态
print(f"\n--- 持续同步 ---")
update_result = self._process_synced_records_update(sheet_id, all_records=all_records)
sheet_result["continuous_sync"] = update_result
sheet_result["total_records"] = len(records_with_link)
except Exception as e:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"处理异常: {str(e)}"
print(f" ✗ 处理子表异常: {e}")
return sheet_result
def _get_story_with_cache(self, story_id: str) -> Dict:
"""
带缓存的获取需求详情
Args:
story_id: 需求ID
Returns:
Dict: 需求详细信息
Raises:
StoryNotFoundException: 需求不存在
Exception: 其他API错误
"""
self._cache_stats["total_queries"] += 1
# 检查缓存
if story_id in self._story_cache:
cache_entry = self._story_cache[story_id]
self._cache_stats["cache_hits"] += 1
if cache_entry.success:
# 缓存命中 - 成功记录
if self.test_mode:
print(f" [缓存命中] story_id={story_id}")
return cache_entry.data
else:
# 缓存命中 - 失败记录
self._cache_stats["cached_failures"] += 1
if self.test_mode:
print(f" [缓存命中-失败] story_id={story_id}")
raise cache_entry.error
# 缓存未命中调用API
self._cache_stats["cache_misses"] += 1
self._cache_stats["api_calls"] += 1
if self.test_mode:
print(f" [API调用] story_id={story_id}")
try:
story_info = self.tapd_api.get_story(story_id)
# 缓存成功结果
self._story_cache[story_id] = CacheEntry(success=True, data=story_info)
return story_info
except StoryNotFoundException as e:
# 缓存确定性失败(单号无效)
self._story_cache[story_id] = CacheEntry(success=False, error=e)
raise
except Exception as e:
# 不缓存非确定性失败网络错误、API限流等
raise
def _process_record(self, record_info: Dict) -> Dict[str, Any]:
"""
处理单条记录的同步
Args:
record_info: 记录信息包含解析结果
Returns:
Dict: 记录处理结果
"""
record_result = {
"record_id": record_info["record_id"],
"tapd_link": record_info["tapd_link"],
"success": False,
"update_record": None,
"error_message": None,
"story_info": None,
"sync_status": None # 用于失败记录的状态回写
}
# 检查链接解析结果
if not record_info["parse_success"]:
record_result["error_message"] = record_info.get("parse_error", "链接解析失败")
record_result["sync_status"] = "❌ 单号无效"
# 记录链接解析失败日志
self.logger.log_api_call(
api_type="task2",
operation="link_parse_failure",
request_data={
"record_id": record_info["record_id"],
"tapd_link": record_info["tapd_link"]
},
response_data={},
success=False,
error_message=record_result["error_message"]
)
return record_result
story_id = record_info["story_id"]
try:
# 查询TAPD获取需求信息使用缓存
story_info = self._get_story_with_cache(story_id)
record_result["story_info"] = story_info
# 提取需要同步的字段None 转为空字符串)
status = story_info.get('status') or ''
owner = story_info.get('owner') or ''
begin_date = story_info.get('begin') or ''
due_date = story_info.get('due') or ''
# 提取发布计划字段并转换为中文名称
plan_id = story_info.get('release_id') or ''
plan_name = self.tapd_api.map_plan_id_to_name(plan_id)
# 获取当前字段值,判断是否需要更新
current_values = self.current_smartsheet.get_current_field_values(record_info["record"])
needs_update = self._check_needs_update(
current_values, status, owner, begin_date, due_date, plan_name
)
# 生成同步成功状态(包含时间戳)
time_str = self._get_beijing_time_str()
sync_status_success = f"✅ 同步成功 {time_str}"
# 构造更新记录(包含业务字段 + 同步状态=成功+时间戳)
# 即使业务字段没有变化,也要写入同步状态
update_record = self.current_smartsheet.build_update_record(
record_id=record_info["record_id"],
status=status,
owner=owner,
begin_date=begin_date,
due_date=due_date,
plan=plan_name,
sync_status=sync_status_success
)
record_result["update_record"] = update_record
record_result["success"] = True
except StoryNotFoundException as e:
# 单号无效TAPD中未找到该需求
record_result["error_message"] = str(e)
record_result["sync_status"] = "❌ 单号无效"
# 记录TAPD查询失败日志
self.logger.log_api_call(
api_type="task2",
operation="sync_record_failure",
request_data={
"record_id": record_info["record_id"],
"story_id": story_id
},
response_data={},
success=False,
error_message=record_result["error_message"]
)
except Exception as e:
# 其他异常API调用失败等
record_result["error_message"] = str(e)
record_result["sync_status"] = "⚠️ 同步失败请联系PM"
# 记录TAPD查询失败日志
self.logger.log_api_call(
api_type="task2",
operation="sync_record_failure",
request_data={
"record_id": record_info["record_id"],
"story_id": story_id
},
response_data={},
success=False,
error_message=record_result["error_message"]
)
return record_result
def _check_needs_update(self, current_values: Dict,
status: str, owner: str,
begin_date: str, due_date: str,
plan: str = "") -> bool:
"""
检查是否需要更新记录
Args:
current_values: 当前字段值
status: 新状态
owner: 新处理人
begin_date: 新开始日期
due_date: 新结束日期
plan: 新计划
Returns:
bool: 是否需要更新
"""
# 提取当前值的文本内容
def extract_text(value):
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, list) and len(value) > 0:
first = value[0]
if isinstance(first, dict):
return first.get('text', '')
return str(first)
if isinstance(value, dict):
return value.get('text', '')
return str(value)
current_status = extract_text(current_values.get('TAPD状态'))
current_owner = extract_text(current_values.get('处理人'))
current_begin = extract_text(current_values.get('TAPD预计开始日期'))
current_due = extract_text(current_values.get('TAPD预计完成日期'))
current_plan = extract_text(current_values.get('计划'))
# 比较是否有变化
if current_status != status:
return True
if current_owner != owner:
return True
if current_begin != begin_date:
return True
if current_due != due_date:
return True
if current_plan != plan:
return True
return False
def _process_synced_records_update(self, sheet_id: str,
all_records: List[Dict] = None) -> Dict[str, Any]:
"""
处理已同步记录的状态更新持续同步
Args:
sheet_id: 子表ID
all_records: 可选已获取的所有记录列表
Returns:
Dict: 更新结果统计
"""
result = {
"checked": 0,
"updated": 0,
"failed": 0,
}
# 获取需要持续同步的记录
records = self.current_smartsheet.get_synced_records_for_update(
sheet_id, TERMINAL_STATUSES, all_records=all_records
)
if not records:
print(f" 没有需要持续同步的记录")
return result
result["checked"] = len(records)
updates = []
for record_info in records:
try:
# 查询TAPD最新状态使用缓存
story_info = self._get_story_with_cache(record_info["story_id"])
# 提取最新字段值None 转为空字符串)
new_status = story_info.get('status') or ''
new_owner = story_info.get('owner') or ''
new_begin = story_info.get('begin') or ''
new_due = story_info.get('due') or ''
plan_id = story_info.get('release_id') or ''
new_plan = self.tapd_api.map_plan_id_to_name(plan_id)
# 获取当前值并比较
current = self.current_smartsheet.get_current_field_values(record_info["record"])
# 调试:打印当前值和新值(含类型)
print(f"\n [DEBUG] 记录 {record_info['record_id']} 字段比较:")
print(f" 结束日期: 当前='{current.get('TAPD预计完成日期')}' (type={type(current.get('TAPD预计完成日期'))}) vs 新='{new_due}' (type={type(new_due)})")
needs_update = self._check_needs_update(
current, new_status, new_owner, new_begin, new_due, new_plan
)
print(f" needs_update = {needs_update}")
if needs_update:
# 生成同步成功状态(包含时间戳)
time_str = self._get_beijing_time_str()
sync_status_success = f"✅ 同步成功 {time_str}"
update_record = self.current_smartsheet.build_update_record(
record_id=record_info["record_id"],
status=new_status,
owner=new_owner,
begin_date=new_begin,
due_date=new_due,
plan=new_plan,
sync_status=sync_status_success
)
updates.append(update_record)
except Exception as e:
result["failed"] += 1
error_msg = str(e)
# 记录持续同步失败日志
self.logger.log_api_call(
api_type="task2",
operation="continuous_sync_failure",
request_data={
"record_id": record_info["record_id"],
"story_id": record_info["story_id"]
},
response_data={},
success=False,
error_message=error_msg
)
if self.test_mode:
print(f" ✗ 记录 {record_info['record_id']} 更新失败: {e}")
# 批量更新
if updates:
print(f" 正在更新 {len(updates)} 条记录...")
self.current_smartsheet.batch_update_records(sheet_id, updates)
result["updated"] = len(updates)
print(f" ✓ 持续同步更新完成")
else:
print(f" ✓ 所有记录状态均未变化")
return result
def _log_cache_statistics(self):
"""记录缓存统计信息到日志"""
stats = self._cache_stats
if stats["total_queries"] == 0:
return
hit_rate = (stats["cache_hits"] / stats["total_queries"]) * 100
print(f"\n{'='*60}")
print(f"TAPD查询缓存统计")
print(f"{'='*60}")
print(f" 总查询次数: {stats['total_queries']}")
print(f" 缓存命中: {stats['cache_hits']} ({hit_rate:.1f}%)")
print(f" 缓存未命中: {stats['cache_misses']}")
print(f" 实际API调用: {stats['api_calls']}")
print(f" 缓存失败记录命中: {stats['cached_failures']}")
print(f" 缓存条目数: {len(self._story_cache)}")
print(f"{'='*60}\n")
# 记录到日志文件
self.logger.log_api_call(
api_type="task2",
operation="cache_statistics",
request_data={},
response_data=stats,
success=True,
error_message=None
)
def _send_failure_notification(self, failed_records: List[Dict]) -> None:
"""
发送同步失败通知
Args:
failed_records: 失败记录列表
"""
try:
# 获取企业微信配置
wework_config = self.config.get('wework')
if not wework_config:
print(" 未配置企业微信推送,跳过失败通知")
return
agentid = wework_config.get('agentid')
receivers = wework_config.get('receivers')
if not agentid or not receivers:
print(" ⚠ 企业微信推送配置不完整,跳过失败通知")
return
# 发送推送通知
print(f"\n正在发送同步失败通知...")
from src2.notifier import send_sync_failure_notification
success = send_sync_failure_notification(
self.access_token,
agentid,
receivers,
failed_records
)
if success:
print(f" ✓ 失败通知已发送")
else:
print(f" ✗ 失败通知发送失败")
except Exception as e:
print(f" ✗ 发送失败通知时出错: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
def run_once(config_manager: Task2ConfigManager = None,
access_token: str = None,
test_mode: bool = False) -> Dict[str, Any]:
"""
执行一次同步流程供调度器调用
Args:
config_manager: 配置管理器
access_token: access_token
test_mode: 测试模式
Returns:
Dict: 同步结果
"""
service = SyncService(
config_manager=config_manager,
access_token=access_token,
test_mode=test_mode
)
return service.sync_once()
if __name__ == "__main__":
print("=== 任务二同步服务测试 ===\n")
print("请使用 main.py 或 scheduler.py 运行同步服务")
print("或使用 test_phase4.py 进行测试")

381
src2/tapd_api.py Normal file
View File

@ -0,0 +1,381 @@
"""
TAPD API调用模块任务二专用
负责与TAPD Open API交互查询需求Story信息
与任务一的区别
- 任务一创建和管理Bug单
- 任务二查询需求Story状态信息
"""
import os
import requests
import time
from typing import Dict, Optional, Any
from requests.auth import HTTPBasicAuth
# 导入任务二专用的日志模块
from src2.logger import get_task2_logger
# ============================================================
# 自定义异常类
# ============================================================
class StoryNotFoundException(Exception):
"""当TAPD需求Story不存在时抛出的异常"""
pass
# TAPD状态值映射表
# 将API返回的状态代码转换为中文显示文本
STATUS_MAPPING = {
"status_5": "进行中",
"status_7": "未开始",
"status_8": "已完成",
"status_9": "待验收",
"status_10": "联调",
"status_12": "取消",
"status_13": "待评审",
}
# 需求终态列表(这些状态不需要持续同步)
TERMINAL_STATUSES = ['已完成', '取消']
def map_status(status_code: str) -> str:
"""
将TAPD状态代码转换为中文显示文本
Args:
status_code: TAPD API返回的状态代码 "status_5"
Returns:
str: 中文显示文本 "进行中"未知状态返回原始值
"""
if not status_code:
return "未知"
return STATUS_MAPPING.get(status_code, status_code)
class TAPDStoryApi:
"""TAPD需求API封装类任务二专用"""
# TAPD API基础URL与任务一相同
BASE_URL = "https://tapd-api.bilibili.co/tapd"
# 发布计划字段名称
PLAN_FIELD_NAME = "release_id"
def __init__(self, workspace_id: str, test_mode: bool = False):
"""
初始化TAPD Story API
Args:
workspace_id: TAPD项目ID
test_mode: 是否启用测试模式显示API请求和响应
Raises:
ValueError: 环境变量未设置时抛出
"""
self.workspace_id = workspace_id
self.test_mode = test_mode
self.session = requests.Session()
# 从环境变量读取认证信息(与任务一共用)
self.api_user = os.environ.get('TAPD_API_USER')
self.api_password = os.environ.get('TAPD_API_PASSWORD')
if not self.api_user or not self.api_password:
raise ValueError(
"TAPD认证信息未设置。请设置环境变量:\n"
" - TAPD_API_USER\n"
" - TAPD_API_PASSWORD"
)
# 设置Basic Auth
self.auth = HTTPBasicAuth(self.api_user, self.api_password)
# 初始化任务二专用的日志记录器
self.logger = get_task2_logger()
# 计划字段映射缓存ID -> 中文名称)
self._plan_mapping = None
print(f" ✓ TAPD Story API初始化完成 (workspace_id: {workspace_id})")
if test_mode:
print(f" ⚠ 测试模式已启用将显示所有API调用的详细信息")
def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
发起TAPD API GET请求的通用方法支持429错误重试
Args:
endpoint: API端点 "stories"
params: URL查询参数
Returns:
Dict: API响应数据
Raises:
RuntimeError: API调用失败时抛出
"""
url = f"{self.BASE_URL}/{endpoint}"
# 准备日志记录的请求数据
log_request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
# 测试模式:显示请求信息
if self.test_mode:
print("\n" + "=" * 60)
print(f"【测试模式】TAPD API调用: {endpoint}")
print("=" * 60)
print(f"请求URL: {url}")
if params:
print(f"URL参数:")
for key, value in params.items():
print(f" {key}: {value}")
# 429错误重试逻辑最多重试1次
max_retries = 1
retry_count = 0
while retry_count <= max_retries:
try:
response = self.session.get(
url,
params=params,
auth=self.auth,
timeout=30
)
# 测试模式:显示响应信息
if self.test_mode:
print(f"\n响应状态码: {response.status_code}")
try:
import json
result = response.json()
print(f"响应数据:")
print(json.dumps(result, ensure_ascii=False, indent=2))
except:
print(f"响应内容: {response.text[:500]}")
print("=" * 60)
# 检查是否是429错误在raise_for_status之前检查
if response.status_code == 429:
if retry_count < max_retries:
retry_count += 1
wait_seconds = 60
print(f"\n⚠️ 触发TAPD API限流 (429 Too Many Requests)")
print(f" [开始等待] 等待 {wait_seconds} 秒后重试... (第 {retry_count}/{max_retries} 次重试)")
print(f" [重要] 在等待期间,代码会阻塞在这里,不会继续处理其他记录")
# 记录429错误日志
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={"status_code": 429, "retry_count": retry_count},
success=False,
error_message=f"429 Too Many Requests, 等待{wait_seconds}秒后重试"
)
time.sleep(wait_seconds)
print(f" [等待结束] 开始重试请求...")
continue # 重试
else:
# 已达到最大重试次数,抛出异常
error_msg = "TAPD API限流 (429 Too Many Requests),重试后仍然失败"
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={"status_code": 429, "retry_count": retry_count},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
# 对于非429错误调用raise_for_status检查HTTP状态
response.raise_for_status()
result = response.json()
# 检查TAPD API返回的状态
if result.get('status') != 1:
error_msg = result.get('info', '未知错误')
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=error_msg
)
raise RuntimeError(f"TAPD API调用失败: {error_msg}")
# 记录成功日志
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=True
)
return result
except requests.exceptions.Timeout:
error_msg = f"TAPD API请求超时: {endpoint}"
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
except requests.exceptions.RequestException as e:
# 如果是HTTPError且状态码是429由上面的status_code检查处理
# 这里不应该到达因为429在response.status_code检查时已处理
error_msg = f"TAPD API请求失败: {e}"
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
# 理论上不会到达这里
raise RuntimeError("TAPD API请求失败未知错误")
def get_story(self, story_id: str) -> Dict:
"""
获取需求详情
Args:
story_id: 需求ID
Returns:
Dict: 需求详细信息
Raises:
RuntimeError: 获取失败时抛出
"""
params = {
'workspace_id': self.workspace_id,
'id': story_id
}
result = self._make_request("stories", params=params)
# TAPD API返回格式: {"status": 1, "data": [{"Story": {...}}]}
data = result.get('data', [])
if not isinstance(data, list) or len(data) == 0:
raise StoryNotFoundException(f"未找到需求: {story_id}")
# 取第一个元素
first_item = data[0]
# 提取Story对象
if isinstance(first_item, dict) and 'Story' in first_item:
story_info = first_item['Story']
else:
raise RuntimeError(f"API返回数据格式异常: {first_item}")
if not story_info:
raise StoryNotFoundException(f"未找到需求: {story_id}")
# 转换状态为中文
raw_status = story_info.get('status', '')
story_info['raw_status'] = raw_status
story_info['status'] = map_status(raw_status)
return story_info
def get_story_url(self, story_id: str) -> str:
"""
生成需求的访问URL
Args:
story_id: 需求ID
Returns:
str: 需求的访问URL
"""
return f"https://www.tapd.cn/{self.workspace_id}/prong/stories/view/{story_id}"
def get_story_fields_info(self) -> Dict:
"""
获取需求所有字段及候选值
Returns:
Dict: 字段信息包含各字段的名称选项等
Raises:
RuntimeError: 获取失败时抛出
"""
params = {
'workspace_id': self.workspace_id
}
result = self._make_request("stories/get_fields_info", params=params)
return result.get('data', {})
def get_plan_mapping(self) -> Dict[str, str]:
"""
获取发布计划字段的ID到中文名称映射
Returns:
Dict[str, str]: 发布计划ID到中文名称的映射
例如: {"1010104801000069739": "v2test", ...}
"""
# 获取字段信息
fields_info = self.get_story_fields_info()
# 提取计划字段的options
plan_field = fields_info.get(self.PLAN_FIELD_NAME, {})
options = plan_field.get('options', {})
# 缓存映射
self._plan_mapping = options
if self.test_mode:
print(f"\n【测试模式】计划字段映射:")
for plan_id, plan_name in options.items():
print(f" {plan_id} -> {plan_name}")
return options
def map_plan_id_to_name(self, plan_id: str) -> str:
"""
将发布计划ID转换为中文名称
Args:
plan_id: 发布计划ID "1010104801000069739"
Returns:
str: 中文名称 "v2test"未找到则返回空字符串
"""
if not plan_id or plan_id == "0":
return ""
# 如果映射未初始化,先获取
if self._plan_mapping is None:
self.get_plan_mapping()
return self._plan_mapping.get(plan_id, "")
if __name__ == "__main__":
print("=== TAPD Story API 测试 ===\n")
print("请使用 test_phase2.py 进行完整测试")

210
src2/test_phase2.py Normal file
View File

@ -0,0 +1,210 @@
"""
任务二第二阶段验证脚本
测试链接解析和TAPD API功能
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src2.link_parser import parse_tapd_link, extract_story_id, is_valid_tapd_link
from src2.tapd_api import TAPDStoryApi, map_status, STATUS_MAPPING
def test_link_parser():
"""测试链接解析功能"""
print("=" * 60)
print("测试1: 链接解析器")
print("=" * 60)
test_cases = [
# 格式一:列表页弹窗链接
(
"https://www.tapd.cn/tapd_fe/58335167/story/list?dialog_preview_id=story_1158335167001044388",
True,
"1158335167001044388",
"dialog"
),
# 格式二:详情页链接
(
"https://www.tapd.cn/58335167/prong/stories/view/1158335167001044388",
True,
"1158335167001044388",
"view"
),
# 无效链接Bug链接
(
"https://www.tapd.cn/58335167/bugtrace/bugs/view/123456",
False,
None,
"unknown"
),
# 无效链接:其他网站
(
"https://www.google.com",
False,
None,
"unknown"
),
# 空链接
(
"",
False,
None,
"unknown"
),
]
passed = 0
failed = 0
for i, (url, expected_success, expected_id, expected_type) in enumerate(test_cases, 1):
success, result, link_type = parse_tapd_link(url)
# 检查结果
if success == expected_success and link_type == expected_type:
if success and result == expected_id:
print(f" [{i}] PASS: {url[:50]}...")
passed += 1
elif not success:
print(f" [{i}] PASS: 正确识别无效链接")
passed += 1
else:
print(f" [{i}] FAIL: 单号不匹配 (期望={expected_id}, 实际={result})")
failed += 1
else:
print(f" [{i}] FAIL: {url[:50]}...")
print(f" 期望: success={expected_success}, type={expected_type}")
print(f" 实际: success={success}, type={link_type}")
failed += 1
print(f"\n链接解析测试结果: {passed} 通过, {failed} 失败")
return failed == 0
def test_status_mapping():
"""测试状态映射功能"""
print("\n" + "=" * 60)
print("测试2: 状态映射")
print("=" * 60)
test_cases = [
("status_5", "进行中"),
("status_7", "未开始"),
("status_8", "已完成"),
("status_9", "待验收"),
("status_10", "联调"),
("status_12", "取消"),
("status_13", "待评审"),
("status_99", "status_99"), # 未知状态返回原值
("", "未知"),
(None, "未知"),
]
passed = 0
failed = 0
for status_code, expected in test_cases:
result = map_status(status_code)
if result == expected:
print(f" PASS: {status_code} -> {result}")
passed += 1
else:
print(f" FAIL: {status_code} -> {result} (期望: {expected})")
failed += 1
print(f"\n状态映射测试结果: {passed} 通过, {failed} 失败")
return failed == 0
def test_tapd_api(story_id: str = None):
"""测试TAPD API功能"""
print("\n" + "=" * 60)
print("测试3: TAPD API")
print("=" * 60)
# 从配置读取workspace_id
from src2.config import Task2ConfigManager
config = Task2ConfigManager()
tapd_config = config.get_tapd_config()
workspace_id = tapd_config['workspace_id']
print(f" workspace_id: {workspace_id}")
try:
# 初始化API
api = TAPDStoryApi(workspace_id, test_mode=True)
print(" ✓ API初始化成功")
except ValueError as e:
print(f" ✗ API初始化失败: {e}")
return False
if not story_id:
print("\n 跳过需求查询测试未提供story_id")
print(" 用法: python test_phase2.py <story_id>")
return True
# 测试获取需求详情
print(f"\n 测试获取需求: {story_id}")
try:
story = api.get_story(story_id)
print(f" ✓ 获取成功")
print(f" - ID: {story.get('id')}")
print(f" - 名称: {story.get('name')}")
print(f" - 状态: {story.get('status')}")
print(f" - 处理人: {story.get('owner')}")
print(f" - 预计开始: {story.get('begin')}")
print(f" - 预计结束: {story.get('due')}")
return True
except RuntimeError as e:
print(f" ✗ 获取失败: {e}")
return False
def main():
"""主函数"""
print("=" * 60)
print("任务二第二阶段验证")
print("=" * 60)
# 获取命令行参数
story_id = None
if len(sys.argv) > 1:
story_id = sys.argv[1]
results = []
# 测试1: 链接解析
results.append(("链接解析", test_link_parser()))
# 测试2: 状态映射
results.append(("状态映射", test_status_mapping()))
# 测试3: TAPD API
results.append(("TAPD API", test_tapd_api(story_id)))
# 汇总结果
print("\n" + "=" * 60)
print("验收结果汇总")
print("=" * 60)
all_passed = True
for name, passed in results:
status = "✓ PASS" if passed else "✗ FAIL"
print(f" {status}: {name}")
if not passed:
all_passed = False
if all_passed:
print("\n所有测试通过!第二阶段验收完成。")
else:
print("\n部分测试失败,请检查。")
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())

330
src2/test_phase3.py Normal file
View File

@ -0,0 +1,330 @@
"""
第三阶段验证脚本智能表格读写功能测试
验证项
1. 字段检测 - 检查必要字段是否存在
2. 记录读取 - 获取所有记录
3. TAPD链接提取 - 从记录中提取链接并解析
4. 数据回写测试 - 构造更新记录可选执行
用法
python src2/test_phase3.py # 只读测试(不修改数据)
python src2/test_phase3.py --write # 包含回写测试(会修改一条记录)
"""
import sys
import argparse
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.smartsheet_sync import (
SmartSheetSync,
REQUIRED_FIELDS,
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
)
def print_separator(title: str = ""):
"""打印分隔线"""
print("\n" + "=" * 60)
if title:
print(f" {title}")
print("=" * 60)
def test_field_detection(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试1字段检测
Returns:
bool: 测试是否通过
"""
print_separator("测试1字段检测")
print(f"必要字段列表:")
for field in REQUIRED_FIELDS:
print(f" - {field}")
print()
# 获取字段信息
fields = sync.api.get_fields(sheet_id)
# 检查必要字段
all_present, missing_fields, field_mapping = sync.check_required_fields(fields)
print(f"\n字段映射结果:")
for field_name in REQUIRED_FIELDS:
field_id = field_mapping.get(field_name, "未找到")
status = "" if field_name in field_mapping else ""
print(f" {status} {field_name}: {field_id}")
if all_present:
print(f"\n✓ 测试通过:所有必要字段都存在")
return True
else:
print(f"\n✗ 测试失败:缺少字段 {missing_fields}")
return False
def test_record_reading(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试2记录读取
Returns:
bool: 测试是否通过
"""
print_separator("测试2记录读取")
try:
records = sync.get_all_records(sheet_id)
print(f"\n记录读取结果:")
print(f" - 总记录数: {len(records)}")
if len(records) > 0:
print(f"\n第一条记录示例:")
first_record = records[0]
record_id = first_record.get('record_id', 'N/A')
print(f" - record_id: {record_id}")
# 显示部分字段值
values = first_record.get('values', {})
print(f" - 字段数量: {len(values)}")
# 显示前5个字段
for i, (key, value) in enumerate(values.items()):
if i >= 5:
print(f" - ... 还有 {len(values) - 5} 个字段")
break
print(f" - {key}: {str(value)[:50]}...")
print(f"\n✓ 测试通过:成功读取 {len(records)} 条记录")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_tapd_link_extraction(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试3TAPD链接提取
Returns:
bool: 测试是否通过
"""
print_separator("测试3TAPD链接提取")
try:
records_with_link = sync.get_records_with_tapd_link(sheet_id)
print(f"\n链接提取结果:")
print(f" - 包含链接的记录数: {len(records_with_link)}")
# 统计解析结果
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" - 解析成功: {success_count}")
print(f" - 解析失败: {fail_count}")
# 显示前3条成功解析的记录
success_records = [r for r in records_with_link if r["parse_success"]]
if success_records:
print(f"\n成功解析的记录示例最多3条")
for i, record_info in enumerate(success_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 单号: {record_info['story_id']}")
print(f" 类型: {record_info.get('link_type', 'N/A')}")
# 显示解析失败的记录
fail_records = [r for r in records_with_link if not r["parse_success"]]
if fail_records:
print(f"\n解析失败的记录最多3条")
for i, record_info in enumerate(fail_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 错误: {record_info.get('parse_error', 'N/A')}")
print(f"\n✓ 测试通过成功提取并解析TAPD链接")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
import traceback
traceback.print_exc()
return False
def test_update_record_structure(sync: SmartSheetSync) -> bool:
"""
测试4更新记录结构构造不实际写入
Returns:
bool: 测试是否通过
"""
print_separator("测试4更新记录结构构造")
try:
# 构造一个测试更新记录
test_record = sync.build_update_record(
record_id="test_record_id_123",
status="进行中",
owner="张三",
begin_date="2025-01-01",
due_date="2025-01-15"
)
print(f"构造的更新记录结构:")
print(f" record_id: {test_record['record_id']}")
print(f" values:")
for key, value in test_record['values'].items():
print(f" {key}: {value}")
# 验证结构
assert 'record_id' in test_record
assert 'values' in test_record
assert FIELD_TAPD_STATUS in test_record['values']
assert FIELD_OWNER in test_record['values']
assert FIELD_BEGIN_DATE in test_record['values']
assert FIELD_DUE_DATE in test_record['values']
print(f"\n✓ 测试通过:更新记录结构正确")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_multi_sheet_support(sync: SmartSheetSync) -> bool:
"""
测试5多子表支持
Returns:
bool: 测试是否通过
"""
print_separator("测试5多子表支持")
try:
# 获取所有子表
sheet_list = sync.api.get_sheet_list()
print(f"子表列表:")
for i, sheet in enumerate(sheet_list):
sheet_id = sheet.get('sheet_id', 'N/A')
title = sheet.get('title', 'N/A')
print(f" [{i+1}] {title} (ID: {sheet_id})")
print(f"\n共找到 {len(sheet_list)} 个子表")
if len(sheet_list) == 0:
print(f"\n⚠ 警告:没有找到子表")
return False
print(f"\n✓ 测试通过:成功获取子表列表")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="第三阶段验证脚本")
parser.add_argument("--write", action="store_true",
help="执行回写测试(会修改数据)")
args = parser.parse_args()
print("=" * 60)
print(" 任务二 第三阶段验证:智能表格读写功能")
print("=" * 60)
# 1. 加载配置
print("\n[初始化] 加载配置...")
config = Task2ConfigManager()
smartsheet_config = config.get_smartsheet_config()
docid = smartsheet_config.get('docid')
print(f" docid: {docid[:20]}...")
# 2. 获取token
print("\n[初始化] 获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" token: {access_token[:20]}...")
# 3. 初始化同步模块
print("\n[初始化] 初始化SmartSheetSync...")
sync = SmartSheetSync(access_token, docid, test_mode=False)
print(" ✓ 初始化完成")
# 4. 获取子表列表
sheet_list = sync.api.get_sheet_list()
if not sheet_list:
print("\n✗ 错误:没有找到子表")
return 1
# 使用第一个子表进行测试
first_sheet = sheet_list[0]
sheet_id = first_sheet.get('sheet_id')
sheet_title = first_sheet.get('title')
print(f"\n使用子表进行测试: {sheet_title}")
# 5. 运行测试
results = []
# 测试5先执行多子表支持
results.append(("多子表支持", test_multi_sheet_support(sync)))
# 测试1字段检测
results.append(("字段检测", test_field_detection(sync, sheet_id)))
# 测试2记录读取
results.append(("记录读取", test_record_reading(sync, sheet_id)))
# 测试3TAPD链接提取
results.append(("TAPD链接提取", test_tapd_link_extraction(sync, sheet_id)))
# 测试4更新记录结构
results.append(("更新记录结构", test_update_record_structure(sync)))
# 6. 输出测试结果汇总
print_separator("测试结果汇总")
passed = 0
failed = 0
for name, result in results:
status = "✓ 通过" if result else "✗ 失败"
print(f" {status}: {name}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if failed == 0:
print("\n" + "=" * 60)
print(" ✓ 第三阶段验证全部通过!")
print("=" * 60)
return 0
else:
print("\n" + "=" * 60)
print(" ✗ 部分测试失败,请检查")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())

160
src2/test_phase4.py Normal file
View File

@ -0,0 +1,160 @@
"""
任务二第四阶段验证脚本
验证同步服务与主程序的完整功能
验证项
1. 同步服务初始化
2. 单次同步流程
3. 调度器初始化
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_sync_service_init():
"""测试1: 同步服务初始化"""
print("\n" + "=" * 60)
print("测试1: 同步服务初始化")
print("=" * 60)
try:
from src2.sync_service import SyncService
from src2.config import Task2ConfigManager
# 初始化配置
config_manager = Task2ConfigManager()
print("✓ 配置管理器初始化成功")
# 初始化同步服务(测试模式)
service = SyncService(
config_manager=config_manager,
test_mode=True
)
print("✓ 同步服务初始化成功")
# 验证属性
assert service.workspace_id is not None
assert service.docid is not None
assert service.access_token is not None
print("✓ 服务属性验证通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_sync_once():
"""测试2: 单次同步流程"""
print("\n" + "=" * 60)
print("测试2: 单次同步流程")
print("=" * 60)
try:
from src2.sync_service import run_once
# 执行一次同步
print("正在执行同步...")
result = run_once(test_mode=False)
# 验证结果结构
assert 'success' in result
assert 'sheets_processed' in result
assert 'records_with_link' in result
assert 'records_synced' in result
print("✓ 结果结构验证通过")
# 打印结果摘要
print(f"\n同步结果:")
print(f" 成功: {result['success']}")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
return result['success']
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_scheduler_init():
"""测试3: 调度器初始化"""
print("\n" + "=" * 60)
print("测试3: 调度器初始化")
print("=" * 60)
try:
from src2.scheduler import Task2Scheduler
# 初始化调度器(不启动)
scheduler = Task2Scheduler(verbose=False)
print("✓ 调度器初始化成功")
# 验证属性
assert scheduler.config is not None
assert scheduler.sync_interval > 0
print(f"✓ 同步间隔: {scheduler.sync_interval} 分钟")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("\n" + "=" * 60)
print("任务二第四阶段验证")
print("=" * 60)
results = {}
# 测试1: 同步服务初始化
results['sync_service_init'] = test_sync_service_init()
# 测试2: 单次同步流程
results['sync_once'] = test_sync_once()
# 测试3: 调度器初始化
results['scheduler_init'] = test_scheduler_init()
# 打印总结
print("\n" + "=" * 60)
print("验证结果总结")
print("=" * 60)
all_passed = True
for test_name, passed in results.items():
status = "✓ 通过" if passed else "✗ 失败"
print(f" {test_name}: {status}")
if not passed:
all_passed = False
print("=" * 60)
if all_passed:
print("所有测试通过!第四阶段验收完成。")
else:
print("部分测试失败,请检查错误信息。")
print("=" * 60)
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())

220
src2/test_setup.py Normal file
View File

@ -0,0 +1,220 @@
"""
任务二第一阶段验证脚本
验证基础框架搭建是否正确
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_directory_structure():
"""测试1: 验证目录结构"""
print("=" * 50)
print("测试1: 验证目录结构")
print("=" * 50)
src2_dir = project_root / "src2"
logs2_dir = project_root / "logs2"
config_file = project_root / "config" / "config_task2.ini"
results = []
# 检查 src2 目录
if src2_dir.exists() and src2_dir.is_dir():
print(f" [OK] src2/ 目录存在")
results.append(True)
else:
print(f" [FAIL] src2/ 目录不存在")
results.append(False)
# 检查 logs2 目录
if logs2_dir.exists() and logs2_dir.is_dir():
print(f" [OK] logs2/ 目录存在")
results.append(True)
else:
print(f" [FAIL] logs2/ 目录不存在")
results.append(False)
# 检查配置文件
if config_file.exists():
print(f" [OK] config/config_task2.ini 存在")
results.append(True)
else:
print(f" [FAIL] config/config_task2.ini 不存在")
results.append(False)
return all(results)
def test_config_read():
"""测试2: 验证配置文件读取"""
print("\n" + "=" * 50)
print("测试2: 验证配置文件读取")
print("=" * 50)
try:
from src2.config import Task2ConfigManager
config = Task2ConfigManager()
# 读取TAPD配置
tapd_config = config.get_tapd_config()
print(f" [OK] TAPD配置读取成功")
print(f" workspace_id: {tapd_config['workspace_id']}")
# 读取SmartSheet配置
smartsheet_config = config.get_smartsheet_config()
print(f" [OK] SmartSheet配置读取成功")
print(f" docid: {smartsheet_config['docid']}")
# 读取Schedule配置
schedule_config = config.get_schedule_config()
print(f" [OK] Schedule配置读取成功")
print(f" sync_interval: {schedule_config['sync_interval']} 分钟")
return True
except Exception as e:
print(f" [FAIL] 配置读取失败: {e}")
return False
def test_logger():
"""测试3: 验证日志写入"""
print("\n" + "=" * 50)
print("测试3: 验证日志写入到 logs2/")
print("=" * 50)
try:
from src2.logger import get_task2_logger, TASK2_LOG_DIR
logger = get_task2_logger()
# 写入测试日志
logger.log_api_call(
api_type="test",
operation="task2/setup_test",
request_data={"test": "验证脚本测试"},
response_data={"status": "success"},
success=True
)
# 检查日志文件是否创建
log_file = logger._get_today_log_file()
if log_file.exists():
print(f" [OK] 日志写入成功")
print(f" 日志目录: {TASK2_LOG_DIR}")
print(f" 日志文件: {log_file.name}")
return True
else:
print(f" [FAIL] 日志文件未创建")
return False
except Exception as e:
print(f" [FAIL] 日志测试失败: {e}")
return False
def test_token_manager():
"""测试4: 验证Token管理器复用"""
print("\n" + "=" * 50)
print("测试4: 验证Token管理器复用")
print("=" * 50)
try:
from src.token_manager import TokenManager
# 创建TokenManager实例使用默认缓存路径
token_manager = TokenManager()
print(f" [OK] TokenManager导入成功")
print(f" 缓存文件: {token_manager.cache_file_path}")
# 尝试获取token
token = token_manager.get_token()
print(f" [OK] Token获取成功")
print(f" Token前20字符: {token[:20]}...")
return True
except ValueError as e:
print(f" [WARN] 环境变量未设置: {e}")
print(f" 这不影响框架搭建,后续运行时需要设置")
return True # 环境变量未设置不算失败
except Exception as e:
print(f" [FAIL] Token测试失败: {e}")
return False
def test_smartsheet_api():
"""测试5: 验证SmartSheetAPI复用"""
print("\n" + "=" * 50)
print("测试5: 验证SmartSheetAPI复用")
print("=" * 50)
try:
from src.smartsheet import SmartSheetAPI
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
print(f" [OK] SmartSheetAPI导入成功")
# 获取配置
config = Task2ConfigManager()
docid = config.get_smartsheet_config()['docid']
# 获取token
token_manager = TokenManager()
token = token_manager.get_token()
# 创建SmartSheetAPI实例
api = SmartSheetAPI(token, docid)
print(f" [OK] SmartSheetAPI实例创建成功")
print(f" docid: {docid}")
return True
except Exception as e:
print(f" [FAIL] SmartSheetAPI测试失败: {e}")
return False
def main():
"""运行所有测试"""
print("\n" + "=" * 50)
print("任务二第一阶段验证")
print("=" * 50)
results = {
"目录结构": test_directory_structure(),
"配置读取": test_config_read(),
"日志写入": test_logger(),
"Token管理": test_token_manager(),
"SmartSheetAPI": test_smartsheet_api()
}
# 汇总结果
print("\n" + "=" * 50)
print("验证结果汇总")
print("=" * 50)
passed = 0
failed = 0
for name, result in results.items():
status = "[OK]" if result else "[FAIL]"
print(f" {status} {name}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if failed == 0:
print("\n第一阶段验收通过!")
else:
print("\n请检查失败项并修复")
return failed == 0
if __name__ == "__main__":
main()

130
src2/test_update_records.py Normal file
View File

@ -0,0 +1,130 @@
"""
测试 update_records 功能的独立脚本
用于调试智能表格的记录更新功能
"""
import sys
import json
import requests
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
def test_update_records():
"""测试更新记录功能"""
print("=" * 60)
print("测试 update_records 功能")
print("=" * 60)
# 1. 获取 access_token
print("\n[1/3] 获取 access_token...")
try:
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" ✓ access_token: {access_token[:20]}...(已隐藏)")
except Exception as e:
print(f" ✗ 获取失败: {e}")
return False
# 2. 准备测试数据
print("\n[2/3] 准备测试数据...")
BASE_URL = "https://qyapi.weixin.qq.com/cgi-bin/wedoc"
# 测试数据
data = {
"docid": "dcOsT3czWy0YEDg38vlDqwVCTjv0kzwC_GU2XmT9wSZctQ0ZJQUAV7vMQ3ljZx-n_NqxzEEYG2DiLAvNdNsHJwgQ",
"sheet_id": "56YEeR",
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": [
{
"record_id": "rNrk6o",
"values": {
"TAPD状态": [
{
"type": "text",
"text": "已完成"
}
],
"处理人": [
{
"type": "text",
"text": ""
}
],
"TAPD预计完成日期": [
{
"type": "text",
"text": "2025-11-28"
}
]
}
}
]
}
print(" ✓ 测试数据已准备")
print(f" - docid: {data['docid'][:20]}...")
print(f" - sheet_id: {data['sheet_id']}")
print(f" - 记录数: {len(data['records'])}")
print(f" - record_id: {data['records'][0]['record_id']}")
# 3. 发送请求
print("\n[3/3] 发送 update_records 请求...")
# 构造完整URL带debug=1
url = f"{BASE_URL}/smartsheet/update_records?access_token={access_token}&debug=1"
print(f"\n请求信息:")
print(f" URL: {BASE_URL}/smartsheet/update_records?access_token=***&debug=1")
print(f" Method: POST")
print(f"\n请求数据:")
print(json.dumps(data, ensure_ascii=False, indent=2))
try:
# 发送POST请求
print("\n正在发送请求...")
response = requests.post(url, json=data, timeout=30)
# 打印响应信息
print(f"\n响应信息:")
print(f" 状态码: {response.status_code}")
print(f"\n响应数据:")
result = response.json()
print(json.dumps(result, ensure_ascii=False, indent=2))
# 检查结果
print("\n" + "=" * 60)
if result.get('errcode', 0) == 0:
print("✓ 测试成功!")
updated_records = result.get('records', [])
print(f" 更新记录数: {len(updated_records)}")
return True
else:
print("✗ 测试失败!")
print(f" 错误码: {result.get('errcode')}")
print(f" 错误信息: {result.get('errmsg')}")
return False
except Exception as e:
print(f"\n✗ 请求异常: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
success = test_update_records()
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())