Compare commits

..

15 Commits

28 changed files with 7199 additions and 10 deletions

1
.gitignore vendored
View File

@ -132,6 +132,7 @@ cython_debug/
# Custom
logs/
logs2/
.claude/
# Project documentation

24
config/config_task2.ini Normal file
View File

@ -0,0 +1,24 @@
# 任务二配置文件TAPD状态实时同步至腾讯智能表格
[TAPD]
# TAPD项目ID
workspace_id = 58335167
[SmartSheet]
# 智能表格文档ID任务二专用
# 支持配置多个docid用逗号分隔例如
# docid = doc1,doc2,doc3
# 同步时会依次对所有表格进行同步
docid = dcOsT3czWy0YEDg38vlDqwVCTjv0kzwC_GU2XmT9wSZctQ0ZJQUAV7vMQ3ljZx-n_NqxzEEYG2DiLAvNdNsHJwgQ,dcHWzWyaHpZNQwUkZzgH5Kfyx9cMvQzVjZIapajGDuXqjS4nEe0LQqOojBL8s3rlwghw4deOgVnbOqHLoxcKzaHg
#docid =dc2Q5Kb0T4zerbo4_ag0MMcXHCusIaFJX5fO6_8n-l_yV-bn5brZSi1kNw3kjme-qIs0LvPKbC5GDEEPaZ1BGlvA
[Schedule]
# 同步频率(分钟)
sync_interval = 30
[wework]
# 企业微信应用ID
agentid = 1000615
# 接收人列表用户ID多个用|分隔,@all表示全部成员
# receivers = 046364
receivers = 040005

View File

@ -1,4 +1,4 @@
{
"access_token": "SPFfB9jMxleGiJn76FQH6v5pploseAJXTCVkTVVxl1_PEmHZJiqXsNGpEyGAK4qmYe3WRxYJ57xgCQLRCHopVfeoDfP87IgxVCytCqQABGES5ndG05SVkrI-9evg8Z4kbstlsiRMmfPGGGoNUgL1kUoZc2No0FYytm8FTfulnAXfiTExzoF8OCTdEPc9mA0g8JKFhlkiS2F0agBESS_2_ewbcZvA0i44-ChTKRBdRa0",
"fetch_time": 1767599732.4166858
"access_token": "GQKX61Nh9c5A4Mp0FDOGy4nZgnFok2gefTA0_X4Y5PEL7NkpD9UHWwji3lWkZLsHMJf3dpbJ_l-NdichZ5qSZuPhF7kJNU47Blf2yLQRqFctmXMU6m1cWU80iLiY0vrX2EPzvldaHMR-al3HgKK6PUSU9T2a5Xp-lCjh9StPzEnQJUnwickV4PiPegLLGcH5F6jcM-9pHztkJ6pSV6bfP5QFFATAldmj-71Occib9V0",
"fetch_time": 1772269143.6670134
}

14
core/__init__.py Normal file
View File

@ -0,0 +1,14 @@
"""全局核心模块导出"""
from core.global_log_system import (
GlobalLogSystem,
create_task1_log_system,
create_task2_log_system,
)
__all__ = [
"GlobalLogSystem",
"create_task1_log_system",
"create_task2_log_system",
]

210
core/global_log_system.py Normal file
View File

@ -0,0 +1,210 @@
"""
全局日志系统内核
职责
1. 按天写入 jsonl 日志
2. 统一记录 API 调用事件
3. 记录同步开始/结束事件与统计
4. token 等敏感字段做脱敏
"""
from __future__ import annotations
import json
import re
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
class GlobalLogSystem:
"""全局日志记录器"""
ALLOWED_MODULES = {"smartsheet", "tapd", "wework"}
SENSITIVE_KEYS = {
"access_token",
"token",
"authorization",
"corpsecret",
"api_password",
"password",
"secret",
"auth",
}
TOKEN_PATTERNS = [
re.compile(r"(access_token=)([^&\s]+)", re.IGNORECASE),
re.compile(r"(corpsecret=)([^&\s]+)", re.IGNORECASE),
re.compile(r"(token=)([^&\s]+)", re.IGNORECASE),
]
def __init__(self, task_name: str, log_dir: str | Path):
self.task_name = task_name
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self._active_sync_id: Optional[str] = None
def start_sync(self,
trigger: str,
metadata: Optional[Dict[str, Any]] = None,
sync_id: Optional[str] = None) -> str:
"""记录一次同步开始事件并返回 sync_id"""
if sync_id is None:
sync_id = self._generate_sync_id()
event = {
"event_type": "start_sync",
"timestamp": self._now_string(),
"task": self.task_name,
"sync_id": sync_id,
"trigger": trigger,
"metadata": self._sanitize(metadata or {}),
}
self._active_sync_id = sync_id
self._write_event_safely(event)
return sync_id
def log_api(self,
module: str,
operation: str,
request_data: Optional[Dict[str, Any]],
response_data: Optional[Dict[str, Any]],
success: bool,
error_message: Optional[str] = None,
duration_ms: Optional[int] = None,
sync_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None) -> None:
"""记录单次 API 调用事件"""
if module not in self.ALLOWED_MODULES:
raise ValueError(
f"module 不合法: {module},允许值: {sorted(self.ALLOWED_MODULES)}"
)
resolved_sync_id = sync_id or self._active_sync_id
event = {
"event_type": "api_call",
"timestamp": self._now_string(),
"task": self.task_name,
"sync_id": resolved_sync_id,
"module": module,
"operation": operation,
"success": success,
"request": self._sanitize(request_data or {}),
"response": self._sanitize(response_data or {}),
"error_message": self._sanitize_text(error_message),
"duration_ms": duration_ms,
"extra": self._sanitize(extra or {}),
}
self._write_event_safely(event)
def end_sync_with_stats(self,
stats: Dict[str, Any],
success: bool,
error_message: Optional[str] = None,
sync_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None) -> None:
"""记录一次同步结束事件"""
resolved_sync_id = sync_id or self._active_sync_id
event = {
"event_type": "end_sync",
"timestamp": self._now_string(),
"task": self.task_name,
"sync_id": resolved_sync_id,
"success": success,
"stats": self._sanitize(stats),
"error_message": self._sanitize_text(error_message),
"extra": self._sanitize(extra or {}),
}
self._write_event_safely(event)
if resolved_sync_id == self._active_sync_id:
self._active_sync_id = None
def _write_event_safely(self, event: Dict[str, Any]) -> None:
"""安全写入:日志失败不影响主流程"""
try:
self._write_event(event)
except Exception as exc:
print(f"⚠️ 全局日志写入失败: {exc}")
def _write_event(self, event: Dict[str, Any]) -> None:
log_file = self._get_today_log_file()
with open(log_file, "a", encoding="utf-8") as handle:
handle.write(json.dumps(event, ensure_ascii=False) + "\n")
def _get_today_log_file(self) -> Path:
today = datetime.now().strftime("%Y-%m-%d")
return self.log_dir / f"api_log_{today}.jsonl"
def _generate_sync_id(self) -> str:
time_part = datetime.now().strftime("%Y%m%d_%H%M%S")
random_part = uuid.uuid4().hex[:8]
return f"{self.task_name}_{time_part}_{random_part}"
def _now_string(self) -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _sanitize(self, value: Any, parent_key: str = "") -> Any:
if isinstance(value, dict):
sanitized: Dict[str, Any] = {}
for key, item in value.items():
key_lower = str(key).lower()
if key_lower in self.SENSITIVE_KEYS:
sanitized[key] = self._mask_sensitive_value(item)
else:
sanitized[key] = self._sanitize(item, parent_key=key_lower)
return sanitized
if isinstance(value, list):
return [self._sanitize(item, parent_key=parent_key) for item in value]
if isinstance(value, tuple):
return tuple(self._sanitize(item, parent_key=parent_key) for item in value)
if isinstance(value, str):
if parent_key in self.SENSITIVE_KEYS:
return self._mask_sensitive_value(value)
return self._sanitize_text(value)
return value
def _sanitize_text(self, text: Optional[str]) -> Optional[str]:
if text is None:
return None
masked = text
for pattern in self.TOKEN_PATTERNS:
masked = pattern.sub(r"\1***", masked)
return masked
def _mask_sensitive_value(self, value: Any) -> str:
text = "" if value is None else str(value)
if not text:
return "***"
if text.lower().startswith("bearer "):
return "Bearer ***"
if len(text) <= 6:
return "***"
return f"{text[:3]}***{text[-2:]}"
def create_task1_log_system(project_root: Optional[Path] = None) -> GlobalLogSystem:
"""创建任务一日志系统(固定 logs/"""
root = project_root or Path(__file__).parent.parent
return GlobalLogSystem(task_name="task1", log_dir=root / "logs")
def create_task2_log_system(project_root: Optional[Path] = None) -> GlobalLogSystem:
"""创建任务二日志系统(固定 logs2/"""
root = project_root or Path(__file__).parent.parent
return GlobalLogSystem(task_name="task2", log_dir=root / "logs2")

111
docs/全局框架文档.md Normal file
View File

@ -0,0 +1,111 @@
# 全局框架文档
> 用途:统一维护项目模块、脚本职责与接口定义,避免调用不存在接口与跨任务耦合失控。
> 范围:`src/``src2/`,以及新增的全局模块(如后续 `core/`)。
---
## 1. 日志系统总览(重构目标态)
### 1.1 目标架构
- **全局日志内核**:位于任务目录外,提供统一记录能力。
- **任务一适配层**:写入 `logs/`
- **任务二适配层**:写入 `logs2/`
- **同步边界管理**:每次同步使用唯一 `sync_id` 分隔,结束时写统计。
### 1.2 统一日志字段(约定)
- `event_type``start_sync` / `api_call` / `end_sync`
- `timestamp`:事件时间
- `task``task1` / `task2`
- `sync_id`:单次同步唯一标识
- `module``smartsheet` / `tapd` / `wework`
- `operation`:接口操作名
- `request`:请求快照(含脱敏)
- `response`:响应快照(含脱敏)
- `success`:调用是否成功
- `error_message`:失败原因
- `duration_ms`:调用耗时(毫秒)
- `stats`:同步完成统计(仅 `end_sync`
---
## 2. 模块清单与职责(当前状态)
## 2.0 全局核心(`core/`
- `core/global_log_system.py`
- **职责**提供统一日志内核jsonl 写入、同步分隔、API 事件、统计事件、脱敏)。
- **接口(对内)**`start_sync``log_api``end_sync_with_stats`
- **创建器**`create_task1_log_system`(固定 `logs/`)、`create_task2_log_system`(固定 `logs2/`)。
- `core/__init__.py`
- **职责**:导出全局日志内核与创建器。
## 2.1 任务一(`src/`
- `src/scheduler.py`
- **职责**:任务一调度入口,定时触发单次同步。
- **关键依赖**`src/main.py`
- `src/main.py`
- **职责**:任务一主流程编排(扫描、校验、开单、回写、通知)。
- **接口(对内)**`run_once(...)`
- `src/smartsheet.py`
- **职责**:智能表格 API 封装。
- **接口(对内)**`get_sheet_list``get_fields``get_records``update_records` 等。
- `src/tapd_api.py`
- **职责**TAPD Bug API 封装(创建、查询、附件上传)。
- **接口(对内)**`create_bug``get_bug``upload_attachment` 等。
- `src/token_manager.py`
- **职责**:企业微信 `access_token` 获取与缓存。
- **接口(对内)**`get_token()`
- `src/wework_notifier.py`
- **职责**:企业微信消息通知。
- **接口(对内)**`send_validation_failure_notification(...)`
- `src/api_logger.py`
- **职责**:现有日志记录器(后续将作为兼容层)。
## 2.2 任务二(`src2/`
- `src2/scheduler.py`
- **职责**:任务二调度入口,定时触发同步。
- **关键依赖**`src2/sync_service.py`
- `src2/sync_service.py`
- **职责**:任务二同步编排(读取、解析链接、查询 TAPD、回写、通知
- **接口(对内)**`sync_once()`
- `src2/smartsheet.py`
- **职责**:任务二智能表格 API 适配层。
- **关键点**:应固定写入 `logs2/`
- `src2/smartsheet_sync.py`
- **职责**:任务二表格字段检查、记录提取与回写构造。
- `src2/tapd_api.py`
- **职责**:任务二 TAPD Story 查询与状态映射。
- `src2/notifier.py`
- **职责**:任务二失败通知封装(当前复用任务一通知器,存在串目录风险)。
- `src2/logger.py`
- **职责**:任务二日志实例入口(后续接入统一内核)。
---
## 3. 现存问题与待改造点
- **串目录问题**:任务二复用 `TokenManager``WeWorkNotifier` 可能写入 `logs/`
- **双记录矛盾**:同一次请求在部分分支可能出现先成功后失败两条记录。
- **写入稳定性问题**:现有按 JSON 数组拼接的策略会造成结构损坏风险。
- **同步边界缺失**:缺少标准化 `start_sync` / `end_sync` 分隔与统计记录。
---
## 4. 模块接口演进计划(摘要)
- 阶段1新增全局日志内核模块定义统一接口。已完成
- 阶段2`src/api_logger.py` 改造成兼容层,保证旧调用可用。
- 阶段3`src2/logger.py` 与任务二编排层切换到统一内核,修复串目录。
- 阶段4更新查看工具与文档支持 `jsonl + sync_id`
---
## 5. 变更记录
### 2026-02-28
- 新建本框架文档。
- 写入日志系统重构目标态、模块职责清单、问题清单与演进路线。
### 2026-02-28更新
- 新增 `core/global_log_system.py``core/__init__.py` 模块说明。
- 标记阶段1完成统一日志内核已落地待阶段2/3接线。

105
docs/全局迭代日志.md Normal file
View File

@ -0,0 +1,105 @@
# 全局迭代日志
> 用途:跨任务(`src` / `src2`)记录每个阶段的增改内容、验收结果与回滚点。
> 约束:每次阶段验收通过后,必须追加一条日志记录。
---
## 记录模板
### 阶段:
- **阶段名称**
- **日期**
- **目标**
### 变更清单
- **新增文件**
- **修改文件**
- **删除文件**
### 关键改动说明
- **日志结构变更**
- **接口/调用链变更**
- **兼容性说明**
### 验收结果
- **通过项**
- **未通过项**
- **遗留风险**
### 回滚与追踪
- **可回滚点**
- **关联文档**
- **备注**
---
## 阶段日志
## 阶段0文档与约定先行
- **阶段名称**:日志系统重构 - 阶段0
- **日期**2026-02-28
- **目标**:建立重构方案与全局文档骨架,为后续代码改造提供统一约束。
### 变更清单
- **新增文件**
- `docs/日志系统重构实施方案.md`
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- **修改文件**:无
- **删除文件**:无
### 关键改动说明
- **日志结构变更**:确定后续采用 `jsonl`,并以 `sync_id` 分隔每次同步。
- **接口/调用链变更**:明确生产链路以 `src/scheduler.py``src2/scheduler.py` 为准。
- **兼容性说明**阶段0仅文档不影响现网行为。
### 验收结果
- **通过项**
- 分阶段路线、边界条件、验收清单已落文档。
- 已建立全局迭代日志与框架文档容器。
- **未通过项**:无
- **遗留风险**
- 任务二存在复用模块导致串目录风险待阶段3修复
- 现有日志写入策略存在双记录与结构损坏风险待阶段1/2修复
### 回滚与追踪
- **可回滚点**:当前为纯文档提交,可直接整提交回滚。
- **关联文档**`docs/日志系统重构实施方案.md`
- **备注**阶段1开始前需再次确认日志字段最终版。
## 阶段1实现全局日志内核
- **阶段名称**:日志系统重构 - 阶段1
- **日期**2026-02-28
- **负责人**Codex
- **目标**:在任务目录外提供可复用的统一日志内核,支持 jsonl、sync_id、token 脱敏。
### 变更清单
- **新增文件**
- `core/global_log_system.py`
- `core/__init__.py`
- **修改文件**
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- **删除文件**:无
### 关键改动说明
- **日志结构变更**:新增 `start_sync` / `api_call` / `end_sync` 三类事件模型。
- **接口/调用链变更**:提供 `start_sync``log_api``end_sync_with_stats` 三个核心接口。
- **兼容性说明**阶段1仅新增内核未接入任务一/任务二业务调用,不影响现网逻辑。
### 验收结果
- **通过项**
- 已支持按天写入 `api_log_YYYY-MM-DD.jsonl`
- 已支持 `sync_id` 生命周期记录。
- 已支持 token/secret 脱敏。
- 已提供任务一、任务二创建器(固定目录)。
- **未通过项**
- 尚未接入 `src` / `src2`,串目录与双记录矛盾仍待后续阶段修复。
- **遗留风险**
- 阶段2/3接线时若沿用旧 logger 分支逻辑,可能再次引入双记录。
### 回滚与追踪
- **可回滚点**`core/` 新增为独立改动,可单独回滚。
- **关联文档**`docs/日志系统重构实施方案.md`
- **备注**:下一阶段优先完成任务一接入并验证“单次调用单条记录”。

View File

@ -0,0 +1,119 @@
# 日志系统重构实施方案(任务一 + 任务二)
## 1. 目标与边界
### 1.1 重构目标
- 在任务一和任务二之外,提供统一的全局日志记录系统。
- 覆盖生产链路:`src/scheduler.py``src2/scheduler.py` 触发的同步流程。
- 在每一个发起 API 调用的地方记录请求与结果(成功/失败都记录)。
- 任务一日志落在 `logs/`,任务二日志落在 `logs2/`
- 每条日志必须包含 `module` 字段,允许值:`smartsheet` / `tapd` / `wework`
- 通过 `sync_id` 分隔每次同步,并在同步完成后写入当次统计信息。
### 1.2 明确要解决的问题
- 解决“串目录”问题:任务二不得写入 `logs/`,任务一不得写入 `logs2/`
- 解决“双记录矛盾”问题:同一次 API 调用只能有一条最终记录(不能先 success 再 failure
- 日志格式改为 `jsonl`(每行一条完整 JSON 记录)。
- 对 token 做脱敏,不做响应内容截断。
### 1.3 非目标
- 本次不改业务流程语义(只改日志系统与接线方式)。
- 本次不对历史 JSON 文件做离线迁移,仅保证新写入生效。
---
## 2. 总体设计
### 2.1 新增全局日志内核(任务外)
- 新增独立模块(建议路径:`core/global_log_system.py`)。
- 提供统一接口:
- `start_sync(...)`:开始一次同步,生成 `sync_id`
- `log_api(...)`:记录单次 API 调用结果(成功/失败统一出口)。
- `end_sync_with_stats(...)`:写入同步统计并结束。
- 通过构造参数确定任务上下文:`task_name``log_dir`、默认 `module`
### 2.2 日志存储格式
- 文件命名:`api_log_YYYY-MM-DD.jsonl`
- 存储目录:任务一 `logs/`,任务二 `logs2/`
- 记录模型:
- 通用字段:`event_type``timestamp``task``sync_id``module`
- API 事件字段:`operation``request``response``success``error_message``duration_ms`
- 同步边界字段:`start_sync` / `end_sync` 事件及统计 `stats`
### 2.3 脱敏规则
- `request/response` 中出现 token 字段统一脱敏(如 `access_token``Authorization``corpsecret``api_password`)。
- 脱敏策略:保留前后少量字符,中间替换为 `***`
---
## 3. 分阶段实施路线(小步走)
## 阶段0文档与约定先行
### 工作内容
- 新建并维护两份全局文档:
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- 在文档中建立日志重构专属章节、字段定义、阶段验收标准。
### 验收标准
- 文档结构可用于后续持续更新。
- 明确字段与职责边界(尤其是 `sync_id``task``module`)。
## 阶段1实现全局日志内核
### 工作内容
- 新增 `core/global_log_system.py`,实现 `jsonl` 写入、脱敏、同步分隔和统计写入。
- 新增兼容层,避免一次性改动过大。
### 验收标准
- 能独立写入 `logs/` / `logs2/`
- `start_sync -> 多条 log_api -> end_sync_with_stats` 链路完整。
## 阶段2接入任务一src
### 工作内容
- 改造 `src/api_logger.py` 为新内核兼容封装。
- 在任务一同步主流程增加 `sync_id` 生命周期。
- 覆盖所有 API 调用记录点,统一单次调用单条记录。
### 验收标准
- 任务一生产链路日志全部位于 `logs/`
- 无“双记录矛盾”。
## 阶段3接入任务二src2
### 工作内容
- 改造 `src2/logger.py` 接入同一内核并固定 `logs2/`
- 在任务二同步主流程增加 `sync_id` 生命周期。
- 修复复用模块导致的串目录问题(含 token 获取、wework 通知链路)。
### 验收标准
- 任务二生产链路日志全部位于 `logs2/`
- 与任务一日志完全隔离。
## 阶段4查看工具与收尾
### 工作内容
- 更新 `src/log_viewer.py` 支持读取 `jsonl`
- 更新 `logs/README.md`(新增 jsonl 规范与排障说明)。
### 验收标准
- 可按日期与 `sync_id` 追踪一次完整同步。
- 文档、代码、日志格式三者一致。
---
## 4. 核心风险与规避
- 风险1兼容层改造影响原调用。
- 规避:先保留 `log_api_call(...)` 旧接口,再逐步替换调用。
- 风险2任务二通过复用模块写错目录。
- 规避:统一使用可注入 logger/context禁止隐式全局默认目录。
- 风险3单次请求出现多状态记录。
- 规避:采用“单出口记录”,业务错误也只写一次最终状态。
---
## 5. 验收清单(最终)
- [ ] 生产链路(两个 scheduler所有 API 调用均有日志。
- [ ] 每次同步都有 `start_sync``end_sync`,并可通过 `sync_id` 聚合。
- [ ] 任务一只写 `logs/`,任务二只写 `logs2/`
- [ ] 无同一次 API 调用 success/failure 双写冲突。
- [ ] token 已脱敏。
- [ ] `jsonl` 可被查看工具读取。

2462
install.cmd Normal file

File diff suppressed because one or more lines are too long

View File

@ -22,7 +22,13 @@ from src.config import ConfigManager
class WeWorkAPITester:
"""企业微信API测试类"""
def __init__(self):
def __init__(self, auto_load_token=True):
"""
初始化企业微信API测试类
Args:
auto_load_token: 是否自动加载token默认为True
"""
self.access_token = None
self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json')
self.base_url = "https://qyapi.weixin.qq.com/cgi-bin"
@ -30,12 +36,92 @@ class WeWorkAPITester:
# 确保日志文件存在
self._init_log_file()
# 自动加载token
if auto_load_token:
self._auto_load_token()
def _init_log_file(self):
"""初始化日志文件"""
if not os.path.exists(self.log_file):
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump({"records": []}, f, ensure_ascii=False, indent=2)
def _auto_load_token(self):
"""
自动加载access_token
优先从缓存读取如果缓存不存在或已过期则尝试从API获取新token
"""
print("\n=== 自动加载access_token ===")
# 先尝试从缓存读取
if self._load_token_from_cache_silent():
return
# 缓存无效尝试从API获取
print(" 尝试从API获取新token...")
try:
from src.token_manager import TokenManager
token_manager = TokenManager()
self.access_token = token_manager.get_token()
print(f" ✓ 成功获取access_token")
print(f" Token: {self.access_token[:20]}...")
except ValueError as e:
print(f" ⚠ 环境变量未配置: {e}")
print(" 请手动选择菜单选项1获取token")
except Exception as e:
print(f" ⚠ 自动获取token失败: {e}")
print(" 请手动选择菜单选项1获取token")
def _load_token_from_cache_silent(self):
"""
静默从缓存读取token不打印标题
Returns:
bool: 是否成功读取有效token
"""
cache_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'config',
'token_cache.json'
)
try:
if not os.path.exists(cache_file):
print(" 缓存文件不存在")
return False
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
access_token = cache_data.get('access_token')
fetch_time = cache_data.get('fetch_time')
if not access_token:
print(" 缓存文件中没有access_token")
return False
# 检查token是否过期7200秒 = 2小时提前5分钟刷新
import time
current_time = time.time()
elapsed_time = current_time - fetch_time
remaining_time = 7200 - elapsed_time
if remaining_time <= 300: # 剩余不足5分钟视为过期
print(f" 缓存的token已过期或即将过期")
return False
# token有效
self.access_token = access_token
print(f" ✓ 从缓存读取access_token成功")
print(f" Token: {self.access_token[:20]}...")
print(f" 剩余有效期: {int(remaining_time)}秒 ({int(remaining_time//60)}分钟)")
return True
except Exception as e:
print(f" 读取缓存失败: {e}")
return False
def _log_api_call(self, operation, request_data, response_data):
"""记录API调用到JSON文件"""
try:
@ -481,6 +567,110 @@ class TAPDAPITester:
except Exception as e:
print(f"✗ 记录日志失败: {str(e)}")
def get_story_fields_info(self):
"""
获取TAPD需求的所有字段配置及候选值
Returns:
dict: 字段配置信息失败返回None
"""
print("\n=== 获取TAPD需求字段配置 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
url = f"{self.base_url}/stories/get_fields_info"
params = {
"workspace_id": self.workspace_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_story_fields_info", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 成功获取需求字段配置")
# 显示字段统计
if isinstance(data, dict):
field_count = len(data)
print(f"{field_count} 个字段配置")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_story_fields.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 字段配置已保存到: {output_file}")
# 显示部分字段信息
print(f"\n需求字段列表预览:")
print("-" * 80)
for idx, (field_name, field_info) in enumerate(list(data.items())[:10], 1):
field_label = field_info.get('label', '(无标签)')
html_type = field_info.get('html_type', '(未知类型)')
print(f" {idx}. {field_name} ({field_label}) - 类型: {html_type}")
# 如果有候选值,显示
options = field_info.get('options', [])
if options:
if isinstance(options, dict):
option_list = list(options.values())[:5]
total_count = len(options)
elif isinstance(options, list):
option_list = options[:5]
total_count = len(options)
else:
option_list = []
total_count = 0
if option_list:
print(f" 候选值: {', '.join(str(o) for o in option_list)}" +
(f" ...等{total_count}" if total_count > 5 else ""))
if field_count > 10:
print(f" ... 还有 {field_count - 10} 个字段,详见输出文件")
print("-" * 80)
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def get_bug_custom_fields(self):
"""
获取TAPD缺陷的所有字段配置及候选值
@ -586,6 +776,121 @@ class TAPDAPITester:
traceback.print_exc()
return None
def get_story(self, story_id):
"""
根据需求ID获取需求详情
Args:
story_id: 需求ID
Returns:
dict: 需求信息失败返回None
"""
print("\n=== 获取TAPD需求 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 验证story_id
if not story_id:
print("✗ 需求ID不能为空")
return None
url = f"{self.base_url}/stories"
params = {
"workspace_id": self.workspace_id,
"id": story_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
print(f" story_id: {story_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_story", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", [])
if not data:
print(f"\n✗ 未找到需求ID为 {story_id} 的需求")
return None
# TAPD返回的是列表取第一个
story_data = data[0] if isinstance(data, list) else data
story_info = story_data.get("Story", {})
print(f"\n✓ 成功获取需求信息")
print(f"\n需求详情:")
print("=" * 80)
# 显示关键字段
key_fields = [
("id", "ID"),
("name", "标题"),
("status", "状态"),
("priority", "优先级"),
("owner", "处理人"),
("creator", "创建人"),
("created", "创建时间"),
("modified", "最后修改时间"),
("iteration_id", "迭代ID"),
("description", "详细描述")
]
for field_name, field_label in key_fields:
value = story_info.get(field_name, '')
if field_name == "description" and value:
# 描述字段可能很长只显示前100个字符
if len(str(value)) > 100:
value = str(value)[:100] + "..."
print(f" {field_label}: {value}")
print("=" * 80)
# 保存完整数据到文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
f'tapd_story_{story_id}.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(story_info, f, ensure_ascii=False, indent=2)
print(f"\n✓ 完整需求信息已保存到: {output_file}")
return story_info
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def upload_attachment(self, file_path, entry_type, entry_id, owner=None, overwrite=False):
"""
上传附件到TAPD
@ -927,10 +1232,12 @@ def print_menu():
print("5. 发送应用消息")
print("\n【TAPD API】")
print("6. 获取缺陷字段配置")
print("7. 获取附件列表")
print("8. 上传附件")
print("7. 获取需求字段配置")
print("8. 获取需求")
print("9. 获取附件列表")
print("10. 上传附件")
print("\n【其他】")
print("9. 查看日志文件")
print("11. 查看日志文件")
print("0. 退出")
print("="*50)
@ -942,7 +1249,7 @@ def main():
while True:
print_menu()
choice = input("\n请选择操作 (0-9): ").strip()
choice = input("\n请选择操作 (0-11): ").strip()
if choice == "0":
print("\n感谢使用,再见!")
@ -1018,6 +1325,18 @@ def main():
tapd_tester.get_bug_custom_fields()
elif choice == "7":
# 获取TAPD需求字段配置
tapd_tester.get_story_fields_info()
elif choice == "8":
# 获取TAPD需求
story_id = input("\n请输入需求ID: ").strip()
if not story_id:
print("✗ 需求ID不能为空")
continue
tapd_tester.get_story(story_id)
elif choice == "9":
# 获取TAPD附件列表
print("\n=== 获取附件列表 ===")
print("是否需要添加筛选条件?")
@ -1055,7 +1374,7 @@ def main():
limit=limit
)
elif choice == "8":
elif choice == "10":
# 上传附件到TAPD
print("\n=== 上传附件 ===")
file_path = input("请输入文件路径: ").strip()
@ -1099,7 +1418,7 @@ def main():
overwrite=overwrite
)
elif choice == "9":
elif choice == "11":
print("\n=== 查看日志文件 ===")
try:
with open(wework_tester.log_file, 'r', encoding='utf-8') as f:

View File

@ -20,6 +20,7 @@ from src.mapper import FieldMapper, BugCreationResult
from src.token_manager import TokenManager
from src.status_mapper import BugStatusMapper
from src.wework_notifier import WeWorkNotifier
from src.api_logger import get_logger
def parse_arguments():
@ -388,6 +389,17 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
error_msg = f"字段映射失败: {e}"
print(f"{error_msg}")
# 记录失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="create_bug_failure",
request_data={"record_id": record_id, "title": title},
response_data={},
success=False,
error_message=error_msg
)
result = BugCreationResult(
record_id=record_id,
success=False,
@ -400,6 +412,17 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
error_msg = f"TAPD API调用失败: {e}"
print(f"{error_msg}")
# 记录失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="create_bug_failure",
request_data={"record_id": record_id, "title": title},
response_data={},
success=False,
error_message=error_msg
)
result = BugCreationResult(
record_id=record_id,
success=False,
@ -412,6 +435,17 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
error_msg = f"未预期的错误: {type(e).__name__}: {e}"
print(f"{error_msg}")
# 记录失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="create_bug_failure",
request_data={"record_id": record_id, "title": title},
response_data={},
success=False,
error_message=error_msg
)
result = BugCreationResult(
record_id=record_id,
success=False,
@ -638,11 +672,26 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
for invalid_record in validation_result['invalid_records']:
record_data = invalid_record['record_data']
missing_fields = invalid_record['missing_fields']
error_msg = f"校验失败,缺失字段: {', '.join(missing_fields)}"
# 记录校验失败日志
logger = get_logger()
logger.log_api_call(
api_type="task1",
operation="validation_failure",
request_data={
"record_id": record_data.get('record_id', '未知'),
"title": record_data.get('标题', '(无标题)')
},
response_data={},
success=False,
error_message=error_msg
)
validation_failed_result = BugCreationResult(
record_id=record_data.get('record_id', '未知'),
success=False,
error_message=f"校验失败,缺失字段: {', '.join(missing_fields)}"
error_message=error_msg
)
validation_failed_results.append(validation_failed_result)

View File

@ -60,6 +60,7 @@ class SmartSheetAPI:
print("=" * 80)
print(f"请求方法: {method}")
print(f"请求URL: {self.BASE_URL}/{endpoint}")
print(f"完整URL含参数: {url}") # 显示完整URL
if data:
import json
print(f"请求数据:")
@ -94,6 +95,15 @@ class SmartSheetAPI:
# 检查企业微信API返回的错误码
if result.get('errcode', 0) != 0:
error_msg = result.get('errmsg', '未知错误')
# 记录API调用日志业务错误
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=f"errcode={result['errcode']}, errmsg={error_msg}"
)
raise RuntimeError(f"API调用失败: errcode={result['errcode']}, errmsg={error_msg}")
return result

3
src2/__init__.py Normal file
View File

@ -0,0 +1,3 @@
"""
任务二TAPD状态实时同步至腾讯智能表格
"""

196
src2/config.py Normal file
View File

@ -0,0 +1,196 @@
"""
任务二配置管理模块
复用任务一的ConfigManager读取任务二专用配置文件
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径,以便导入 src 模块
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager as BaseConfigManager
from typing import List
class Task2ConfigManager(BaseConfigManager):
"""任务二配置管理器继承自任务一的ConfigManager"""
def __init__(self, config_path=None):
"""
初始化任务二配置管理器
Args:
config_path: 配置文件路径如果为None则使用任务二默认路径
"""
if config_path is None:
# 默认路径:项目根目录/config/config_task2.ini
config_path = project_root / "config" / "config_task2.ini"
super().__init__(config_path)
def get_tapd_config(self):
"""
获取TAPD配置任务二版本不需要reporter
Returns:
dict: 包含workspace_id的字典
"""
if not self.config.has_section('TAPD'):
raise ValueError("配置文件缺少[TAPD]节")
if not self.config.has_option('TAPD', 'workspace_id'):
raise ValueError("配置文件[TAPD]节缺少workspace_id配置项")
workspace_id = self.config.get('TAPD', 'workspace_id').strip()
if not workspace_id:
raise ValueError("workspace_id配置项不能为空")
return {
'workspace_id': workspace_id
}
def get_smartsheet_config(self):
"""
获取智能表格配置任务二版本支持多个docid
Returns:
dict: 包含docid_list的字典
Raises:
ValueError: 配置项缺失时抛出
"""
if not self.config.has_section('SmartSheet'):
raise ValueError("配置文件缺少[SmartSheet]节")
if not self.config.has_option('SmartSheet', 'docid'):
raise ValueError("配置文件[SmartSheet]节缺少docid配置项")
docid_raw = self.config.get('SmartSheet', 'docid').strip()
if not docid_raw:
raise ValueError("docid配置项不能为空")
# 解析逗号分隔的多个docid
docid_list = [d.strip() for d in docid_raw.split(',') if d.strip()]
if not docid_list:
raise ValueError("docid配置项解析后为空")
return {
'docid': docid_list[0], # 保持向后兼容返回第一个docid
'docid_list': docid_list # 新增返回所有docid列表
}
def get_docid_list(self) -> List[str]:
"""
获取所有配置的docid列表
Returns:
List[str]: docid列表
"""
return self.get_smartsheet_config()['docid_list']
def get_schedule_config(self):
"""
获取调度配置任务二版本只需要sync_interval
Returns:
dict: 包含sync_interval的字典
"""
default_sync_interval = 15
if not self.config.has_section('Schedule'):
return {'sync_interval': default_sync_interval}
if not self.config.has_option('Schedule', 'sync_interval'):
return {'sync_interval': default_sync_interval}
sync_interval_str = self.config.get('Schedule', 'sync_interval').strip()
try:
sync_interval = int(sync_interval_str)
except ValueError:
raise ValueError(f"sync_interval必须为整数当前值: {sync_interval_str}")
if sync_interval <= 0:
raise ValueError(f"sync_interval必须为正整数当前值: {sync_interval}")
return {'sync_interval': sync_interval}
def get_wework_config(self):
"""
获取企业微信推送配置
Returns:
dict: 包含agentid和receivers的字典如果配置不存在则返回None
"""
if not self.config.has_section('wework'):
return None
agentid = self.config.get('wework', 'agentid', fallback='').strip()
receivers = self.config.get('wework', 'receivers', fallback='').strip()
# 如果配置不完整返回None
if not agentid or not receivers:
return None
return {
'agentid': agentid,
'receivers': receivers
}
def get_all_config(self):
"""获取所有配置"""
return {
'tapd': self.get_tapd_config(),
'smartsheet': self.get_smartsheet_config(),
'schedule': self.get_schedule_config(),
'wework': self.get_wework_config()
}
def print_config(self):
"""打印当前配置信息"""
print("\n=== 任务二配置信息 ===")
try:
tapd_config = self.get_tapd_config()
print(f"[TAPD]")
print(f" workspace_id: {tapd_config['workspace_id']}")
except ValueError as e:
print(f"[TAPD] 配置错误: {e}")
try:
smartsheet_config = self.get_smartsheet_config()
print(f"[SmartSheet]")
docid_list = smartsheet_config['docid_list']
print(f" docid数量: {len(docid_list)}")
for i, docid in enumerate(docid_list, 1):
# 显示docid的前20个字符便于识别
display_id = docid[:20] + "..." if len(docid) > 20 else docid
print(f" docid_{i}: {display_id}")
except ValueError as e:
print(f"[SmartSheet] 配置错误: {e}")
try:
schedule_config = self.get_schedule_config()
print(f"[Schedule]")
print(f" sync_interval: {schedule_config['sync_interval']} 分钟")
except ValueError as e:
print(f"[Schedule] 配置错误: {e}")
wework_config = self.get_wework_config()
if wework_config:
print(f"[wework]")
print(f" agentid: {wework_config['agentid']}")
print(f" receivers: {wework_config['receivers']}")
else:
print(f"[wework] 未配置(推送功能将被禁用)")
print("======================\n")
if __name__ == "__main__":
try:
config = Task2ConfigManager()
config.print_config()
except Exception as e:
print(f"错误: {e}")

115
src2/link_parser.py Normal file
View File

@ -0,0 +1,115 @@
"""
TAPD链接解析模块
负责从TAPD链接中提取需求单号
支持的链接格式
1. 列表页弹窗链接: https://www.tapd.cn/tapd_fe/{workspace_id}/story/list?...dialog_preview_id=story_{单号}
2. 详情页链接: https://www.tapd.cn/{workspace_id}/prong/stories/view/{单号}
"""
import re
from typing import Tuple, Optional
# 链接类型常量
LINK_TYPE_DIALOG = "dialog" # 列表页弹窗链接
LINK_TYPE_VIEW = "view" # 详情页链接
LINK_TYPE_UNKNOWN = "unknown"
def parse_tapd_link(url: str) -> Tuple[bool, str, str]:
"""
解析TAPD链接提取需求单号
Args:
url: TAPD链接字符串
Returns:
Tuple[bool, str, str]: (是否成功, 单号或错误信息, 链接类型)
- 成功时: (True, "1234567890", "dialog" "view")
- 失败时: (False, "错误信息", "unknown")
"""
if not url:
return (False, "链接为空", LINK_TYPE_UNKNOWN)
# 确保是字符串类型
if not isinstance(url, str):
url = str(url)
url = url.strip()
if not url:
return (False, "链接为空", LINK_TYPE_UNKNOWN)
# 格式一:列表页弹窗链接
# 匹配 dialog_preview_id=story_(\d+)
pattern_dialog = r'dialog_preview_id=story_(\d+)'
match_dialog = re.search(pattern_dialog, url)
if match_dialog:
story_id = match_dialog.group(1)
return (True, story_id, LINK_TYPE_DIALOG)
# 格式二:详情页链接
# 匹配 /stories/view/(\d+)
pattern_view = r'/stories/view/(\d+)'
match_view = re.search(pattern_view, url)
if match_view:
story_id = match_view.group(1)
return (True, story_id, LINK_TYPE_VIEW)
# 未匹配到任何格式
return (False, f"无法识别的链接格式: {url[:100]}", LINK_TYPE_UNKNOWN)
def extract_story_id(url: str) -> Optional[str]:
"""
从TAPD链接中提取需求单号简化接口
Args:
url: TAPD链接字符串
Returns:
Optional[str]: 成功返回单号失败返回None
"""
success, result, _ = parse_tapd_link(url)
return result if success else None
def is_valid_tapd_link(url: str) -> bool:
"""
检查是否为有效的TAPD链接
Args:
url: TAPD链接字符串
Returns:
bool: 是否为有效链接
"""
success, _, _ = parse_tapd_link(url)
return success
if __name__ == "__main__":
print("=== TAPD链接解析器测试 ===\n")
# 测试用例
test_cases = [
# 格式一:列表页弹窗链接
"https://www.tapd.cn/tapd_fe/58335167/story/list?dialog_preview_id=story_1158335167001044388",
# 格式二:详情页链接
"https://www.tapd.cn/58335167/prong/stories/view/1158335167001044388",
# 无效链接
"https://www.tapd.cn/58335167/bugtrace/bugs/view/123456",
"https://www.google.com",
"",
None,
]
for i, url in enumerate(test_cases, 1):
print(f"测试 {i}: {url}")
success, result, link_type = parse_tapd_link(url)
if success:
print(f" ✓ 解析成功: 单号={result}, 类型={link_type}")
else:
print(f" ✗ 解析失败: {result}")
print()

56
src2/logger.py Normal file
View File

@ -0,0 +1,56 @@
"""
任务二日志模块
创建独立的 APILogger 实例日志写入 logs2/ 目录
设计说明
- 不修改 src/api_logger.py get_logger() 全局单例
- 任务二使用独立的 logger 实例避免与任务一冲突
- 两个任务可以同时运行日志互不干扰
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.api_logger import APILogger
# 任务二日志目录
TASK2_LOG_DIR = project_root / "logs2"
# 任务二专用的 logger 实例(模块级单例)
_task2_logger = None
def get_task2_logger() -> APILogger:
"""
获取任务二专用的日志记录器
Returns:
APILogger: 任务二专用的日志记录器实例
"""
global _task2_logger
if _task2_logger is None:
_task2_logger = APILogger(log_dir=str(TASK2_LOG_DIR))
return _task2_logger
if __name__ == "__main__":
print("=== 任务二日志模块测试 ===\n")
logger = get_task2_logger()
# 测试记录一条日志
logger.log_api_call(
api_type="test",
operation="task2/test_log",
request_data={"test": "任务二日志测试"},
response_data={"status": "ok"},
success=True
)
print(f"日志目录: {TASK2_LOG_DIR}")
print(f"日志文件: {logger._get_today_log_file()}")
print("\n测试完成,请检查 logs2/ 目录")

156
src2/main.py Normal file
View File

@ -0,0 +1,156 @@
"""
任务二主程序入口
TAPD状态实时同步至腾讯智能表格
功能
1. 命令行参数解析
2. 单次执行模式
3. 执行结果统计
"""
import sys
import argparse
from pathlib import Path
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.sync_service import run_once
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='TAPD状态同步工具 - 将TAPD需求状态同步到腾讯智能表格',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
# 执行一次同步
python src2/main.py
# 指定配置文件
python src2/main.py --config /path/to/config.ini
# 测试模式(显示详细信息)
python src2/main.py --test
# 手动传入access_token
python src2/main.py --token YOUR_ACCESS_TOKEN
"""
)
parser.add_argument(
'-c', '--config',
default=None,
help='配置文件路径(默认: config/config_task2.ini'
)
parser.add_argument(
'-t', '--token',
default=None,
help='手动传入access_token默认自动获取'
)
parser.add_argument(
'--test',
action='store_true',
help='启用测试模式显示详细的API调用信息'
)
return parser.parse_args()
def print_result_summary(result: dict):
"""打印执行结果摘要"""
print("\n" + "=" * 60)
print("执行结果摘要")
print("=" * 60)
if result["success"]:
print("状态: ✓ 成功")
else:
print("状态: ✗ 失败")
if result.get("error_message"):
print(f"错误: {result['error_message']}")
print(f"\n子表统计:")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f"\n记录统计:")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
print(f" 同步失败: {result['records_failed']}")
# 显示各子表详情
if result.get("sheet_results"):
print(f"\n子表详情:")
for sheet in result["sheet_results"]:
status = "跳过" if sheet["skipped"] else "完成"
print(f" - {sheet['sheet_title']}: {status}")
if sheet["skipped"]:
print(f" 原因: {sheet['skip_reason']}")
else:
print(f" 链接: {sheet['records_with_link']} | "
f"更新: {sheet['records_updated']} | "
f"失败: {sheet['records_failed']}")
print("=" * 60)
def main():
"""主函数"""
print("\n" + "=" * 60)
print("TAPD状态同步工具 (任务二)")
print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
try:
# 解析命令行参数
args = parse_arguments()
# 初始化配置管理器
print("\n正在加载配置...")
config_manager = Task2ConfigManager(config_path=args.config)
config_manager.print_config()
# 获取access_token
access_token = args.token
if access_token is None:
print("正在获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" ✓ access_token获取成功")
# 执行同步
print("\n开始执行同步...")
result = run_once(
config_manager=config_manager,
access_token=access_token,
test_mode=args.test
)
# 打印结果摘要
print_result_summary(result)
# 返回状态码
return 0 if result["success"] else 1
except KeyboardInterrupt:
print("\n\n用户中断执行")
return 130
except Exception as e:
print(f"\n✗ 执行失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

136
src2/notifier.py Normal file
View File

@ -0,0 +1,136 @@
"""
任务二企业微信消息推送模块
用于发送同步失败通知
"""
import sys
from pathlib import Path
from typing import List, Dict
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.wework_notifier import WeWorkNotifier
def send_sync_failure_notification(access_token: str, agentid: str,
receivers: str, failed_records: List[Dict]) -> bool:
"""
发送同步失败通知
Args:
access_token: 企业微信access_token
agentid: 应用ID
receivers: 接收人列表
failed_records: 失败记录列表每条记录包含
- sheet_title: 子表标题
- record_id: 记录ID
- tapd_link: TAPD链接
- error_message: 失败原因
Returns:
bool: 是否发送成功
"""
if not failed_records:
return True
# 构造消息内容
content = _build_sync_failure_message(failed_records)
# 使用任务一的推送器发送消息
notifier = WeWorkNotifier(access_token, agentid, receivers)
return notifier._send_text_message(content)
def _build_sync_failure_message(failed_records: List[Dict]) -> str:
"""
构造同步失败消息内容支持多表格多子表分组
Args:
failed_records: 失败记录列表每条记录可包含
- doc_index: 表格序号可选
- docid_short: 表格ID简写可选
- sheet_title: 子表标题
- record_id: 记录ID
- tapd_link: TAPD链接
- error_message: 失败原因
Returns:
str: 格式化的消息内容
"""
# 按表格和子表分组
records_by_doc = {}
for record in failed_records:
doc_index = record.get('doc_index', 1)
docid_short = record.get('docid_short', '')
doc_key = (doc_index, docid_short)
if doc_key not in records_by_doc:
records_by_doc[doc_key] = {}
sheet_title = record.get('sheet_title', '未知子表')
if sheet_title not in records_by_doc[doc_key]:
records_by_doc[doc_key][sheet_title] = []
records_by_doc[doc_key][sheet_title].append(record)
# 消息头部
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
total_count = len(failed_records)
doc_count = len(records_by_doc)
lines = [
"【autoTAPD 同步失败通知】",
f"时间: {timestamp}",
f"失败数量: {total_count}",
]
# 如果有多个表格,显示表格数量
if doc_count > 1:
lines.append(f"涉及表格: {doc_count}")
lines.extend([
"",
"以下记录同步失败,请检查:",
"=" * 40
])
# 按表格和子表分组显示失败记录
global_idx = 1
for (doc_index, docid_short), sheets in sorted(records_by_doc.items()):
# 如果有多个表格,显示表格标识
if doc_count > 1:
doc_label = f"表格{doc_index}"
if docid_short:
doc_label += f" ({docid_short})"
lines.append(f"\n{'#'*20}")
lines.append(f"# {doc_label}")
lines.append(f"{'#'*20}")
for sheet_title, sheet_records in sheets.items():
lines.append(f"\n【子表:{sheet_title}")
lines.append("")
for record in sheet_records:
record_id = record.get('record_id', '未知')
tapd_link = record.get('tapd_link', '(无链接)')
error_message = record.get('error_message', '未知错误')
lines.append(f"[{global_idx}] 记录ID: {record_id}")
lines.append(f"TAPD链接: {tapd_link}")
lines.append(f"失败原因: {error_message}")
lines.append("")
global_idx += 1
lines.append("=" * 40)
lines.append("系统将在下次同步时自动重试失败记录。")
return "\n".join(lines)
if __name__ == "__main__":
print("=== 任务二推送模块 ===")
print("此模块提供同步失败通知功能")
print("请通过 sync_service 或 main.py 调用")

354
src2/scheduler.py Normal file
View File

@ -0,0 +1,354 @@
"""
任务二调度器模块
负责定时执行TAPD状态同步任务
功能
1. 按配置频率定时执行
2. 优雅退出Ctrl+C
3. 运行统计
"""
import sys
import time
import signal
import argparse
from pathlib import Path
from datetime import datetime
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 导入schedule库
try:
import schedule
except ImportError:
print("错误: 缺少schedule库")
print("请运行: pip install schedule")
sys.exit(1)
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.sync_service import run_once
class Task2Scheduler:
"""任务二调度器"""
def __init__(self, config_path=None, verbose=False):
"""
初始化调度器
Args:
config_path: 配置文件路径
verbose: 是否显示详细信息
"""
self.config_path = config_path
self.verbose = verbose
self.running = True
# 统计信息
self.stats = {
'start_time': None,
'total_runs': 0,
'success_runs': 0,
'failed_runs': 0,
'total_records_synced': 0,
'total_records_updated': 0,
'last_run_time': None
}
# 初始化配置管理器
self._init_config()
# 初始化TokenManager
self._init_token_manager()
# 获取调度配置
self.sync_interval = self.config['schedule']['sync_interval']
def _init_config(self):
"""初始化配置"""
try:
print("正在加载配置文件...")
self.config_manager = Task2ConfigManager(config_path=self.config_path)
self.config = self.config_manager.get_all_config()
print("✓ 配置文件加载成功")
except Exception as e:
print(f"✗ 配置文件加载失败: {e}")
raise
def _init_token_manager(self):
"""初始化TokenManager"""
try:
print("正在初始化TokenManager...")
self.token_manager = TokenManager()
print("✓ TokenManager初始化成功")
except Exception as e:
print(f"✗ TokenManager初始化失败: {e}")
raise
def job(self):
"""执行一次同步任务"""
print("\n" + "=" * 80)
print(f"开始执行同步任务 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
try:
# 获取access_token
access_token = self.token_manager.get_token()
# 执行一次同步流程
result = run_once(
config_manager=self.config_manager,
access_token=access_token,
test_mode=self.verbose
)
# 更新统计信息
self.stats['total_runs'] += 1
self.stats['last_run_time'] = datetime.now()
if result['success']:
self.stats['success_runs'] += 1
self.stats['total_records_synced'] += result['records_synced']
self.stats['total_records_updated'] += result['records_updated']
print("\n" + "-" * 80)
print("本次执行统计:")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
print(f" 同步失败: {result['records_failed']}")
print("-" * 80)
else:
self.stats['failed_runs'] += 1
print("\n" + "-" * 80)
print("本次执行失败:")
print(f" 错误信息: {result.get('error_message', '未知错误')}")
print("-" * 80)
except Exception as e:
self.stats['total_runs'] += 1
self.stats['failed_runs'] += 1
self.stats['last_run_time'] = datetime.now()
print("\n" + "-" * 80)
print("本次执行异常:")
print(f" 错误类型: {type(e).__name__}")
print(f" 错误信息: {e}")
print("-" * 80)
if self.verbose:
import traceback
print("\n详细错误信息:")
traceback.print_exc()
# 显示下次执行时间
self._show_next_run_time()
def _show_next_run_time(self):
"""显示下次执行时间"""
next_run = schedule.idle_seconds()
if next_run is not None:
next_run_time = datetime.now().timestamp() + next_run
next_run_str = datetime.fromtimestamp(next_run_time).strftime('%Y-%m-%d %H:%M:%S')
print(f"\n下次执行时间: {next_run_str} (约 {int(next_run / 60)} 分钟后)")
def _startup_check(self):
"""启动检查"""
print("\n" + "=" * 80)
print("启动检查")
print("=" * 80)
checks_passed = True
# 1. 检查配置文件
print("\n[1/3] 检查配置文件...")
try:
print(f" ✓ 配置文件已加载: {self.config_path or '默认路径'}")
print(f" ✓ TAPD workspace_id: {self.config['tapd']['workspace_id']}")
print(f" ✓ SmartSheet docid: {self.config['smartsheet']['docid'][:20]}...")
print(f" ✓ 同步间隔: {self.sync_interval} 分钟")
except Exception as e:
print(f" ✗ 配置检查失败: {e}")
checks_passed = False
# 2. 检查环境变量
print("\n[2/3] 检查环境变量...")
import os
required_env_vars = {
'CORPID': '企业微信CorpID',
'CORPSECRET': '企业微信CorpSecret',
'TAPD_API_USER': 'TAPD API用户名',
'TAPD_API_PASSWORD': 'TAPD API密码'
}
for env_var, description in required_env_vars.items():
if os.environ.get(env_var):
print(f"{description} ({env_var}): 已设置")
else:
print(f"{description} ({env_var}): 未设置")
checks_passed = False
# 3. 测试access_token获取
print("\n[3/3] 测试access_token获取...")
try:
access_token = self.token_manager.get_token()
print(f" ✓ access_token获取成功: {access_token[:10]}...(已隐藏)")
except Exception as e:
print(f" ✗ access_token获取失败: {e}")
checks_passed = False
print("\n" + "=" * 80)
if checks_passed:
print("启动检查通过 ✓")
else:
print("启动检查失败 ✗")
print("\n请检查上述错误并修复后重试")
return False
print("=" * 80)
return True
def start(self):
"""启动调度器"""
print("\n" + "=" * 80)
print("TAPD状态同步调度器 (任务二)")
print("版本: 1.0.0 (第四阶段)")
print("=" * 80)
# 执行启动检查
if not self._startup_check():
print("\n调度器启动失败")
sys.exit(1)
# 记录启动时间
self.stats['start_time'] = datetime.now()
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# 配置定时任务
schedule.every(self.sync_interval).minutes.do(self.job)
print(f"\n调度器已启动:")
print(f" - 同步任务: 立即执行一次,然后每 {self.sync_interval} 分钟执行一次")
print("按 Ctrl+C 停止调度器")
print()
# 立即执行一次同步任务
self.job()
# 进入调度循环
while self.running:
schedule.run_pending()
time.sleep(1)
def _signal_handler(self, signum, frame):
"""信号处理函数"""
print("\n\n" + "=" * 80)
print("收到停止信号,正在优雅退出...")
print("=" * 80)
self.running = False
self._print_final_stats()
def _print_final_stats(self):
"""打印最终统计信息"""
print("\n" + "=" * 80)
print("运行统计")
print("=" * 80)
if self.stats['start_time']:
start_time_str = self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')
print(f"启动时间: {start_time_str}")
if self.stats['last_run_time']:
last_run_str = self.stats['last_run_time'].strftime('%Y-%m-%d %H:%M:%S')
print(f"最后执行: {last_run_str}")
if self.stats['start_time']:
running_time = datetime.now() - self.stats['start_time']
hours = int(running_time.total_seconds() // 3600)
minutes = int((running_time.total_seconds() % 3600) // 60)
print(f"运行时长: {hours} 小时 {minutes} 分钟")
print(f"\n同步任务统计:")
print(f" 总执行次数: {self.stats['total_runs']}")
print(f" 成功次数: {self.stats['success_runs']}")
print(f" 失败次数: {self.stats['failed_runs']}")
print(f" 总同步记录: {self.stats['total_records_synced']}")
print(f" 总更新记录: {self.stats['total_records_updated']}")
if self.stats['total_runs'] > 0:
success_rate = (self.stats['success_runs'] / self.stats['total_runs']) * 100
print(f" 成功率: {success_rate:.1f}%")
print("=" * 80)
print("调度器已停止")
print("=" * 80)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='TAPD状态同步调度器 - 定时执行同步任务',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
# 使用默认配置启动调度器
python src2/scheduler.py
# 指定配置文件路径
python src2/scheduler.py --config /path/to/config.ini
# 显示详细信息
python src2/scheduler.py --verbose
"""
)
parser.add_argument(
'-c', '--config',
default=None,
help='配置文件路径(默认: config/config_task2.ini'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='显示详细输出信息'
)
return parser.parse_args()
def main():
"""主函数"""
try:
# 解析命令行参数
args = parse_arguments()
# 创建并启动调度器
scheduler = Task2Scheduler(
config_path=args.config,
verbose=args.verbose
)
scheduler.start()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
print(f"\n✗ 调度器启动失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

45
src2/smartsheet.py Normal file
View File

@ -0,0 +1,45 @@
"""
任务二专用的智能表格API模块
继承自任务一的 SmartSheetAPI使用任务二专用的日志记录器
设计说明
- 继承 src.smartsheet.SmartSheetAPI 的所有功能
- 重写 __init__ 方法使用 get_task2_logger() 替代 get_logger()
- 确保所有智能表格 API 调用日志写入 logs2/ 目录
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.smartsheet import SmartSheetAPI
from src2.logger import get_task2_logger
class SmartSheetAPITask2(SmartSheetAPI):
"""任务二专用的智能表格API类"""
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格API任务二专用
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式显示API返回结果
"""
# 调用父类初始化
super().__init__(access_token, docid, test_mode)
# 替换为任务二专用的日志记录器
self.logger = get_task2_logger()
if __name__ == "__main__":
print("=== 任务二智能表格API模块 ===\n")
print("此模块继承自 src.smartsheet.SmartSheetAPI")
print("使用任务二专用的日志记录器,日志写入 logs2/ 目录")
print("\n请通过 SmartSheetSync 类使用此模块")

461
src2/smartsheet_sync.py Normal file
View File

@ -0,0 +1,461 @@
"""
任务二智能表格同步模块
负责智能表格的数据读取和回写
功能
1. 检测必要字段是否存在
2. 读取所有记录
3. 提取TAPD链接
4. 构造更新记录
5. 批量回写状态信息
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src2.smartsheet import SmartSheetAPITask2
from src2.link_parser import parse_tapd_link, extract_story_id
from src2.logger import get_task2_logger
# ============================================================
# 字段名称常量(与智能表格列名完全一致)
# ============================================================
FIELD_TAPD_LINK = "TAPD链接" # 用户填写,解析单号
FIELD_TAPD_STATUS = "TAPD状态(🈲勿手改)" # 工具回写
FIELD_OWNER = "处理人(🈲勿手改)" # 工具回写
FIELD_BEGIN_DATE = "TAPD预计开始日期(🈲勿手改)" # 工具回写
FIELD_DUE_DATE = "TAPD预计完成日期(🈲勿手改)" # 工具回写
FIELD_PLAN = "发布计划(🈲勿手改)" # 工具回写TAPD发布计划字段
FIELD_SYNC_STATUS = "同步状态(🈲勿手改)" # 工具回写,标记同步结果
# 必要字段列表
REQUIRED_FIELDS = [
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
FIELD_PLAN,
FIELD_SYNC_STATUS,
]
class SmartSheetSync:
"""智能表格同步类"""
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格同步模块
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式
"""
self.api = SmartSheetAPITask2(access_token, docid, test_mode)
self.logger = get_task2_logger()
self.test_mode = test_mode
def check_required_fields(self, fields: List[Dict]) -> Tuple[bool, List[str], Dict[str, str]]:
"""
检测必要字段是否存在
Args:
fields: 字段列表从get_fields获取
Returns:
Tuple[bool, List[str], Dict[str, str]]:
- 是否所有必要字段都存在
- 缺失的字段列表
- 字段名称到字段ID的映射
"""
# 构建字段映射
field_mapping = {}
for field in fields:
field_title = field.get('field_title', '')
field_id = field.get('field_id', '')
if field_title and field_id:
field_mapping[field_title] = field_id
# 检查必要字段
missing_fields = []
for required_field in REQUIRED_FIELDS:
if required_field not in field_mapping:
missing_fields.append(required_field)
all_present = len(missing_fields) == 0
if all_present:
print(f" ✓ 所有必要字段都存在")
else:
print(f" ⚠ 缺少必要字段: {', '.join(missing_fields)}")
return (all_present, missing_fields, field_mapping)
def get_all_records(self, sheet_id: str) -> List[Dict]:
"""
获取子表的所有记录支持分页
Args:
sheet_id: 子表ID
Returns:
List[Dict]: 所有记录列表
"""
print(f"正在获取所有记录...")
all_records = []
offset = 0
limit = 100
while True:
result = self.api.get_records(sheet_id, limit=limit, offset=offset)
records = result['records']
total = result['total']
all_records.extend(records)
print(f" - 已获取 {len(all_records)}/{total} 条记录")
if len(all_records) >= total:
break
offset += limit
print(f" ✓ 共获取 {len(all_records)} 条记录")
return all_records
def extract_tapd_link(self, record: Dict) -> Optional[str]:
"""
从记录中提取TAPD链接
Args:
record: 记录对象
Returns:
Optional[str]: TAPD链接字符串如果不存在则返回None
"""
link_value = self.api.get_field_value_by_title(record, FIELD_TAPD_LINK)
if not link_value:
return None
# 链接字段可能是字符串或包含url的对象
if isinstance(link_value, str):
return link_value
elif isinstance(link_value, dict):
# 可能是 {url: "...", text: "..."} 格式
return link_value.get('url') or link_value.get('text')
elif isinstance(link_value, list):
# 可能是列表格式
if len(link_value) > 0:
first_item = link_value[0]
if isinstance(first_item, dict):
return first_item.get('url') or first_item.get('text')
elif isinstance(first_item, str):
return first_item
return None
def build_update_record(self, record_id: str, status: str = None,
owner: str = None, begin_date: str = None,
due_date: str = None, plan: str = None,
sync_status: str = None) -> Dict:
"""
构造更新记录的数据结构
Args:
record_id: 记录ID
status: TAPD状态中文
owner: 处理人
begin_date: 预计开始日期
due_date: 预计完成日期
plan: 计划中文名称
sync_status: 同步状态"成功" "失败"
Returns:
Dict: 更新记录的数据结构
"""
values = {}
# 处理字段值:
# - None: 不更新该字段(跳过)
# - 空字符串 "": 清空该字段(传入空数组)
# - 非空字符串: 更新为新值
if status is not None:
if status == "":
values[FIELD_TAPD_STATUS] = []
else:
values[FIELD_TAPD_STATUS] = [{"type": "text", "text": status}]
if owner is not None:
if owner == "":
values[FIELD_OWNER] = []
else:
values[FIELD_OWNER] = [{"type": "text", "text": owner}]
if begin_date is not None:
if begin_date == "":
values[FIELD_BEGIN_DATE] = []
else:
values[FIELD_BEGIN_DATE] = [{"type": "text", "text": begin_date}]
if due_date is not None:
if due_date == "":
values[FIELD_DUE_DATE] = []
else:
values[FIELD_DUE_DATE] = [{"type": "text", "text": due_date}]
if plan is not None:
if plan == "":
values[FIELD_PLAN] = []
else:
values[FIELD_PLAN] = [{"type": "text", "text": plan}]
if sync_status is not None:
if sync_status == "":
values[FIELD_SYNC_STATUS] = []
else:
values[FIELD_SYNC_STATUS] = [{"type": "text", "text": sync_status}]
return {
"record_id": record_id,
"values": values
}
def batch_update_records(self, sheet_id: str, update_records: List[Dict]) -> Dict:
"""
批量回写状态信息使用任务一的API带debug参数
Args:
sheet_id: 子表ID
update_records: 需要更新的记录列表
Returns:
Dict: 更新结果
"""
if not update_records:
print(" ⚠ 没有需要更新的记录")
return {"records": []}
# 直接使用任务一的 update_records 方法已添加debug=1
return self.api.update_records(sheet_id, update_records)
def get_records_with_tapd_link(self, sheet_id: str,
all_records: List[Dict] = None) -> List[Dict]:
"""
获取所有包含TAPD链接的新记录同步状态为空
Args:
sheet_id: 子表ID
all_records: 可选已获取的所有记录列表避免重复获取
Returns:
List[Dict]: 包含TAPD链接的记录列表
"""
print(f"正在获取包含TAPD链接的新记录...")
if all_records is None:
all_records = self.get_all_records(sheet_id)
records_with_link = []
skipped_synced_count = 0
for record in all_records:
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
# 检查同步状态字段,如果不为空则跳过
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
if sync_status is not None and sync_status != "":
skipped_synced_count += 1
continue
record_id = record.get('record_id', '')
# 解析链接
success, result, link_type = parse_tapd_link(tapd_link)
record_info = {
"record": record,
"record_id": record_id,
"tapd_link": tapd_link,
"parse_success": success,
}
if success:
record_info["story_id"] = result
record_info["link_type"] = link_type
else:
record_info["story_id"] = None
record_info["parse_error"] = result
records_with_link.append(record_info)
# 统计
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" ✓ 找到 {len(records_with_link)} 条包含TAPD链接的记录")
if skipped_synced_count > 0:
print(f" - 跳过已同步记录: {skipped_synced_count}")
print(f" - 链接解析成功: {success_count}")
if fail_count > 0:
print(f" - 链接解析失败: {fail_count}")
return records_with_link
def get_current_field_values(self, record: Dict) -> Dict[str, Any]:
"""
获取记录当前的字段值
Args:
record: 记录对象
Returns:
Dict: 当前字段值
"""
return {
FIELD_TAPD_STATUS: self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS),
FIELD_OWNER: self.api.get_field_value_by_title(record, FIELD_OWNER),
FIELD_BEGIN_DATE: self.api.get_field_value_by_title(record, FIELD_BEGIN_DATE),
FIELD_DUE_DATE: self.api.get_field_value_by_title(record, FIELD_DUE_DATE),
FIELD_PLAN: self.api.get_field_value_by_title(record, FIELD_PLAN),
}
def get_synced_records_for_update(self, sheet_id: str,
terminal_statuses: List[str],
all_records: List[Dict] = None) -> List[Dict]:
"""
获取需要持续同步的已同步记录
筛选条件
- 同步状态 = "成功"
- TAPD状态 不在终态列表中
Args:
sheet_id: 子表ID
terminal_statuses: 终态列表 ['已完成', '取消']
all_records: 可选已获取的所有记录列表避免重复获取
Returns:
List[Dict]: 需要持续同步的记录列表
"""
print(f"正在获取需要持续同步的记录...")
if all_records is None:
all_records = self.get_all_records(sheet_id)
records_for_update = []
skipped_terminal_count = 0
for record in all_records:
# 检查同步状态是否为成功(兼容新旧格式)
# 旧格式: "成功"
# 新格式: "✅ 同步成功 01-14 15:30"
sync_status = self.api.get_field_value_by_title(record, FIELD_SYNC_STATUS)
sync_status_str = str(sync_status) if sync_status else ""
if not (sync_status == "成功" or "同步成功" in sync_status_str):
continue
# 检查TAPD链接是否存在
tapd_link = self.extract_tapd_link(record)
if not tapd_link:
continue
# 检查TAPD状态是否为终态
tapd_status = self.api.get_field_value_by_title(record, FIELD_TAPD_STATUS)
if tapd_status in terminal_statuses:
skipped_terminal_count += 1
continue
# 解析链接获取story_id
success, result, link_type = parse_tapd_link(tapd_link)
if not success:
continue
record_info = {
"record": record,
"record_id": record.get('record_id', ''),
"tapd_link": tapd_link,
"story_id": result,
"current_status": tapd_status,
}
records_for_update.append(record_info)
print(f" ✓ 找到 {len(records_for_update)} 条需要持续同步的记录")
if skipped_terminal_count > 0:
print(f" - 跳过终态记录: {skipped_terminal_count}")
return records_for_update
def process_sheet(api: SmartSheetSync, sheet_id: str, sheet_title: str) -> Dict:
"""
处理单个子表的同步流程
Args:
api: SmartSheetSync实例
sheet_id: 子表ID
sheet_title: 子表标题
Returns:
Dict: 处理结果统计
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title}")
print(f"{'='*60}")
result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"success": False,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"parse_success": 0,
"parse_fail": 0,
}
# 1. 获取字段信息
fields = api.api.get_fields(sheet_id)
# 2. 检查必要字段
all_present, missing_fields, field_mapping = api.check_required_fields(fields)
if not all_present:
result["skipped"] = True
result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {result['skip_reason']}")
return result
# 3. 获取包含TAPD链接的记录
records_with_link = api.get_records_with_tapd_link(sheet_id)
result["records_with_link"] = len(records_with_link)
result["parse_success"] = sum(1 for r in records_with_link if r["parse_success"])
result["parse_fail"] = result["records_with_link"] - result["parse_success"]
result["success"] = True
return result
if __name__ == "__main__":
print("=== 智能表格同步模块测试 ===\n")
print("此模块提供以下功能:")
print("1. check_required_fields() - 检测必要字段")
print("2. get_all_records() - 获取所有记录")
print("3. extract_tapd_link() - 提取TAPD链接")
print("4. build_update_record() - 构造更新记录")
print("5. batch_update_records() - 批量回写")
print("6. get_records_with_tapd_link() - 获取包含链接的记录")
print("\n请运行 test_phase3.py 进行完整测试")

812
src2/sync_service.py Normal file
View File

@ -0,0 +1,812 @@
"""
任务二同步服务模块
整合链接解析TAPD查询表格回写实现完整的同步流程
功能
1. 单次同步流程
2. 执行统计
3. 错误处理
"""
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta, timezone
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.logger import get_task2_logger
from src2.tapd_api import TAPDStoryApi, TERMINAL_STATUSES, StoryNotFoundException
from src2.smartsheet_sync import SmartSheetSync, REQUIRED_FIELDS
class CacheEntry:
"""TAPD查询缓存条目"""
def __init__(self, success: bool, data: Optional[Dict] = None,
error: Optional[Exception] = None):
self.success = success
self.data = data
self.error = error
self.timestamp = datetime.now()
class SyncService:
"""TAPD状态同步服务支持多表格同步"""
def __init__(self, config_manager: Task2ConfigManager = None,
access_token: str = None, test_mode: bool = False):
"""
初始化同步服务
Args:
config_manager: 配置管理器如果为None则自动创建
access_token: 企业微信access_token如果为None则自动获取
test_mode: 是否启用测试模式
"""
self.test_mode = test_mode
self.logger = get_task2_logger()
# 初始化配置管理器
if config_manager is None:
self.config_manager = Task2ConfigManager()
else:
self.config_manager = config_manager
# 获取配置
self.config = self.config_manager.get_all_config()
self.workspace_id = self.config['tapd']['workspace_id']
# 获取所有docid列表
self.docid_list = self.config['smartsheet']['docid_list']
print(f" 配置了 {len(self.docid_list)} 个智能表格")
# 获取access_token
if access_token is None:
token_manager = TokenManager()
self.access_token = token_manager.get_token()
else:
self.access_token = access_token
# 初始化TAPD API所有表格共用
self.tapd_api = TAPDStoryApi(self.workspace_id, test_mode=test_mode)
# 获取计划字段映射(每次同步时实时获取,所有表格共用)
print(f" 正在获取计划字段映射...")
try:
self.plan_mapping = self.tapd_api.get_plan_mapping()
print(f" ✓ 计划字段映射获取完成,共 {len(self.plan_mapping)} 个选项")
except Exception as e:
print(f" ⚠ 计划字段映射获取失败: {e},将使用空映射")
self.plan_mapping = {}
# TAPD查询缓存在sync_once中初始化
self._story_cache: Dict[str, CacheEntry] = {}
self._cache_stats = {
"total_queries": 0,
"cache_hits": 0,
"cache_misses": 0,
"api_calls": 0,
"cached_failures": 0
}
print(f" ✓ 同步服务初始化完成")
def _get_beijing_time_str(self) -> str:
"""
获取北京时间格式字符串MM-dd HH:mm
Returns:
str: 格式化的北京时间字符串例如 "01-14 15:30"
"""
# 北京时间是 UTC+8
beijing_tz = timezone(timedelta(hours=8))
beijing_time = datetime.now(beijing_tz)
return beijing_time.strftime("%m-%d %H:%M")
def sync_once(self) -> Dict[str, Any]:
"""
执行一次完整的同步流程遍历所有配置的智能表格
Returns:
Dict: 同步结果统计
"""
# 初始化本次同步的缓存
self._story_cache = {}
self._cache_stats = {
"total_queries": 0,
"cache_hits": 0,
"cache_misses": 0,
"api_calls": 0,
"cached_failures": 0
}
result = {
"success": False,
"start_time": datetime.now().isoformat(),
"end_time": None,
"docs_total": len(self.docid_list),
"docs_success": 0,
"docs_failed": 0,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"doc_results": [] # 每个表格的详细结果
}
all_failed_records = [] # 汇总所有表格的失败记录
# 遍历所有配置的智能表格
for doc_index, docid in enumerate(self.docid_list, 1):
doc_result = self._sync_single_doc(docid, doc_index, len(self.docid_list))
result["doc_results"].append(doc_result)
if doc_result["success"]:
result["docs_success"] += 1
# 累加统计数据
result["sheets_processed"] += doc_result["sheets_processed"]
result["sheets_skipped"] += doc_result["sheets_skipped"]
result["total_records"] += doc_result["total_records"]
result["records_with_link"] += doc_result["records_with_link"]
result["records_synced"] += doc_result["records_synced"]
result["records_updated"] += doc_result["records_updated"]
result["records_failed"] += doc_result["records_failed"]
# 收集失败记录
all_failed_records.extend(doc_result.get("all_failed_records", []))
else:
result["docs_failed"] += 1
# 判断整体是否成功(至少有一个表格成功)
result["success"] = result["docs_success"] > 0
# 发送失败通知(汇总所有表格的失败记录)
if all_failed_records:
self._send_failure_notification(all_failed_records)
# 记录缓存统计
self._log_cache_statistics()
result["cache_stats"] = self._cache_stats.copy()
result["end_time"] = datetime.now().isoformat()
return result
def _sync_single_doc(self, docid: str, doc_index: int, total_docs: int) -> Dict[str, Any]:
"""
同步单个智能表格
Args:
docid: 智能表格文档ID
doc_index: 当前表格序号从1开始
total_docs: 表格总数
Returns:
Dict: 单个表格的同步结果
"""
# 显示docid的前16个字符便于识别
display_id = docid[:16] + "..." if len(docid) > 16 else docid
print(f"\n{'#'*70}")
print(f"# [表格 {doc_index}/{total_docs}] docid: {display_id}")
print(f"{'#'*70}")
doc_result = {
"docid": docid,
"doc_index": doc_index,
"success": False,
"sheets_processed": 0,
"sheets_skipped": 0,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"error_message": None,
"sheet_results": [],
"all_failed_records": []
}
try:
# 为当前表格创建SmartSheetSync实例
smartsheet = SmartSheetSync(self.access_token, docid, test_mode=self.test_mode)
self.current_smartsheet = smartsheet # 供_process_sheet等方法使用
# 获取所有子表
print("\n正在获取子表列表...")
sheets = smartsheet.api.get_sheet_list()
print(f" ✓ 找到 {len(sheets)} 个子表")
# 处理每个子表
for sheet in sheets:
sheet_id = sheet.get('sheet_id', '')
sheet_title = sheet.get('title', '未命名')
sheet_result = self._process_sheet(sheet_id, sheet_title, doc_index)
doc_result["sheet_results"].append(sheet_result)
if sheet_result["skipped"]:
doc_result["sheets_skipped"] += 1
else:
doc_result["sheets_processed"] += 1
doc_result["total_records"] += sheet_result["total_records"]
doc_result["records_with_link"] += sheet_result["records_with_link"]
doc_result["records_synced"] += sheet_result["records_synced"]
doc_result["records_updated"] += sheet_result["records_updated"]
doc_result["records_failed"] += sheet_result["records_failed"]
# 收集失败记录(添加表格标识)
for failed in sheet_result.get("failed_records", []):
failed["doc_index"] = doc_index
failed["docid_short"] = display_id
doc_result["all_failed_records"].append(failed)
doc_result["success"] = True
print(f"\n✓ [表格 {doc_index}/{total_docs}] 同步完成")
except Exception as e:
doc_result["error_message"] = str(e)
print(f"\n✗ [表格 {doc_index}/{total_docs}] 同步失败: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
return doc_result
def _process_sheet(self, sheet_id: str, sheet_title: str, doc_index: int = 1) -> Dict[str, Any]:
"""
处理单个子表的同步
Args:
sheet_id: 子表ID
sheet_title: 子表标题
doc_index: 表格序号用于日志显示
Returns:
Dict: 子表处理结果
"""
print(f"\n{'='*60}")
print(f"处理子表: {sheet_title} (表格{doc_index})")
print(f"{'='*60}")
sheet_result = {
"sheet_id": sheet_id,
"sheet_title": sheet_title,
"skipped": False,
"skip_reason": None,
"total_records": 0,
"records_with_link": 0,
"records_synced": 0,
"records_updated": 0,
"records_failed": 0,
"details": [],
"failed_records": [] # 收集失败记录的详细信息
}
try:
# 1. 获取字段信息并检查必要字段
fields = self.current_smartsheet.api.get_fields(sheet_id)
all_present, missing_fields, field_mapping = self.current_smartsheet.check_required_fields(fields)
if not all_present:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"缺少必要字段: {', '.join(missing_fields)}"
print(f" ⚠ 跳过此子表: {sheet_result['skip_reason']}")
return sheet_result
# 2. 获取所有记录(只获取一次,供新记录同步和持续同步共用)
print(f"正在获取所有记录...")
all_records = self.current_smartsheet.get_all_records(sheet_id)
# 3. 获取包含TAPD链接的新记录同步状态为空
records_with_link = self.current_smartsheet.get_records_with_tapd_link(
sheet_id, all_records=all_records
)
sheet_result["records_with_link"] = len(records_with_link)
# 4. 处理新记录
if records_with_link:
print(f"\n--- 新记录同步 ---")
success_records = [] # 成功同步的记录
failed_records_info = [] # 失败的记录信息列表包含ID和状态
for record_info in records_with_link:
record_result = self._process_record(record_info)
sheet_result["details"].append(record_result)
if record_result["success"]:
sheet_result["records_synced"] += 1
if record_result["update_record"]:
success_records.append(record_result["update_record"])
sheet_result["records_updated"] += 1
else:
sheet_result["records_failed"] += 1
# 收集失败记录的ID和对应的同步状态
failed_records_info.append({
"record_id": record_info["record_id"],
"sync_status": record_result.get("sync_status", "⚠️ 同步失败请联系PM")
})
# 收集失败记录的详细信息用于推送
failed_record = {
"sheet_title": sheet_title,
"record_id": record_info["record_id"],
"tapd_link": record_info.get("tapd_link", "(无链接)"),
"error_message": record_result.get("error_message", "未知错误")
}
sheet_result["failed_records"].append(failed_record)
# 4. 批量回写成功记录
if success_records:
print(f"\n正在回写 {len(success_records)} 条成功记录...")
try:
self.current_smartsheet.batch_update_records(sheet_id, success_records)
print(f" ✓ 成功记录回写完成")
except Exception as e:
print(f" ✗ 成功记录回写失败: {e}")
# 5. 批量回写失败记录(根据不同失败原因回写不同状态)
if failed_records_info:
print(f"\n正在回写 {len(failed_records_info)} 条失败记录的状态...")
try:
failed_updates = [
self.current_smartsheet.build_update_record(
record_id=info["record_id"],
sync_status=info["sync_status"]
)
for info in failed_records_info
]
self.current_smartsheet.batch_update_records(sheet_id, failed_updates)
print(f" ✓ 失败记录状态回写完成")
except Exception as e:
print(f" ✗ 失败记录状态回写失败: {e}")
else:
print(f" 没有新记录需要同步")
# 7. 持续同步:更新已同步记录的最新状态
print(f"\n--- 持续同步 ---")
update_result = self._process_synced_records_update(sheet_id, all_records=all_records)
sheet_result["continuous_sync"] = update_result
sheet_result["total_records"] = len(records_with_link)
except Exception as e:
sheet_result["skipped"] = True
sheet_result["skip_reason"] = f"处理异常: {str(e)}"
print(f" ✗ 处理子表异常: {e}")
return sheet_result
def _get_story_with_cache(self, story_id: str) -> Dict:
"""
带缓存的获取需求详情
Args:
story_id: 需求ID
Returns:
Dict: 需求详细信息
Raises:
StoryNotFoundException: 需求不存在
Exception: 其他API错误
"""
self._cache_stats["total_queries"] += 1
# 检查缓存
if story_id in self._story_cache:
cache_entry = self._story_cache[story_id]
self._cache_stats["cache_hits"] += 1
if cache_entry.success:
# 缓存命中 - 成功记录
if self.test_mode:
print(f" [缓存命中] story_id={story_id}")
return cache_entry.data
else:
# 缓存命中 - 失败记录
self._cache_stats["cached_failures"] += 1
if self.test_mode:
print(f" [缓存命中-失败] story_id={story_id}")
raise cache_entry.error
# 缓存未命中调用API
self._cache_stats["cache_misses"] += 1
self._cache_stats["api_calls"] += 1
if self.test_mode:
print(f" [API调用] story_id={story_id}")
try:
story_info = self.tapd_api.get_story(story_id)
# 缓存成功结果
self._story_cache[story_id] = CacheEntry(success=True, data=story_info)
return story_info
except StoryNotFoundException as e:
# 缓存确定性失败(单号无效)
self._story_cache[story_id] = CacheEntry(success=False, error=e)
raise
except Exception as e:
# 不缓存非确定性失败网络错误、API限流等
raise
def _process_record(self, record_info: Dict) -> Dict[str, Any]:
"""
处理单条记录的同步
Args:
record_info: 记录信息包含解析结果
Returns:
Dict: 记录处理结果
"""
record_result = {
"record_id": record_info["record_id"],
"tapd_link": record_info["tapd_link"],
"success": False,
"update_record": None,
"error_message": None,
"story_info": None,
"sync_status": None # 用于失败记录的状态回写
}
# 检查链接解析结果
if not record_info["parse_success"]:
record_result["error_message"] = record_info.get("parse_error", "链接解析失败")
record_result["sync_status"] = "❌ 单号无效"
# 记录链接解析失败日志
self.logger.log_api_call(
api_type="task2",
operation="link_parse_failure",
request_data={
"record_id": record_info["record_id"],
"tapd_link": record_info["tapd_link"]
},
response_data={},
success=False,
error_message=record_result["error_message"]
)
return record_result
story_id = record_info["story_id"]
try:
# 查询TAPD获取需求信息使用缓存
story_info = self._get_story_with_cache(story_id)
record_result["story_info"] = story_info
# 提取需要同步的字段None 转为空字符串)
status = story_info.get('status') or ''
owner = story_info.get('owner') or ''
begin_date = story_info.get('begin') or ''
due_date = story_info.get('due') or ''
# 提取发布计划字段并转换为中文名称
plan_id = story_info.get('release_id') or ''
plan_name = self.tapd_api.map_plan_id_to_name(plan_id)
# 获取当前字段值,判断是否需要更新
current_values = self.current_smartsheet.get_current_field_values(record_info["record"])
needs_update = self._check_needs_update(
current_values, status, owner, begin_date, due_date, plan_name
)
# 生成同步成功状态(包含时间戳)
time_str = self._get_beijing_time_str()
sync_status_success = f"✅ 同步成功 {time_str}"
# 构造更新记录(包含业务字段 + 同步状态=成功+时间戳)
# 即使业务字段没有变化,也要写入同步状态
update_record = self.current_smartsheet.build_update_record(
record_id=record_info["record_id"],
status=status,
owner=owner,
begin_date=begin_date,
due_date=due_date,
plan=plan_name,
sync_status=sync_status_success
)
record_result["update_record"] = update_record
record_result["success"] = True
except StoryNotFoundException as e:
# 单号无效TAPD中未找到该需求
record_result["error_message"] = str(e)
record_result["sync_status"] = "❌ 单号无效"
# 记录TAPD查询失败日志
self.logger.log_api_call(
api_type="task2",
operation="sync_record_failure",
request_data={
"record_id": record_info["record_id"],
"story_id": story_id
},
response_data={},
success=False,
error_message=record_result["error_message"]
)
except Exception as e:
# 其他异常API调用失败等
record_result["error_message"] = str(e)
record_result["sync_status"] = "⚠️ 同步失败请联系PM"
# 记录TAPD查询失败日志
self.logger.log_api_call(
api_type="task2",
operation="sync_record_failure",
request_data={
"record_id": record_info["record_id"],
"story_id": story_id
},
response_data={},
success=False,
error_message=record_result["error_message"]
)
return record_result
def _check_needs_update(self, current_values: Dict,
status: str, owner: str,
begin_date: str, due_date: str,
plan: str = "") -> bool:
"""
检查是否需要更新记录
Args:
current_values: 当前字段值
status: 新状态
owner: 新处理人
begin_date: 新开始日期
due_date: 新结束日期
plan: 新计划
Returns:
bool: 是否需要更新
"""
# 提取当前值的文本内容
def extract_text(value):
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, list) and len(value) > 0:
first = value[0]
if isinstance(first, dict):
return first.get('text', '')
return str(first)
if isinstance(value, dict):
return value.get('text', '')
return str(value)
current_status = extract_text(current_values.get('TAPD状态'))
current_owner = extract_text(current_values.get('处理人'))
current_begin = extract_text(current_values.get('TAPD预计开始日期'))
current_due = extract_text(current_values.get('TAPD预计完成日期'))
current_plan = extract_text(current_values.get('计划'))
# 比较是否有变化
if current_status != status:
return True
if current_owner != owner:
return True
if current_begin != begin_date:
return True
if current_due != due_date:
return True
if current_plan != plan:
return True
return False
def _process_synced_records_update(self, sheet_id: str,
all_records: List[Dict] = None) -> Dict[str, Any]:
"""
处理已同步记录的状态更新持续同步
Args:
sheet_id: 子表ID
all_records: 可选已获取的所有记录列表
Returns:
Dict: 更新结果统计
"""
result = {
"checked": 0,
"updated": 0,
"failed": 0,
}
# 获取需要持续同步的记录
records = self.current_smartsheet.get_synced_records_for_update(
sheet_id, TERMINAL_STATUSES, all_records=all_records
)
if not records:
print(f" 没有需要持续同步的记录")
return result
result["checked"] = len(records)
updates = []
for record_info in records:
try:
# 查询TAPD最新状态使用缓存
story_info = self._get_story_with_cache(record_info["story_id"])
# 提取最新字段值None 转为空字符串)
new_status = story_info.get('status') or ''
new_owner = story_info.get('owner') or ''
new_begin = story_info.get('begin') or ''
new_due = story_info.get('due') or ''
plan_id = story_info.get('release_id') or ''
new_plan = self.tapd_api.map_plan_id_to_name(plan_id)
# 获取当前值并比较
current = self.current_smartsheet.get_current_field_values(record_info["record"])
# 调试:打印当前值和新值(含类型)
print(f"\n [DEBUG] 记录 {record_info['record_id']} 字段比较:")
print(f" 结束日期: 当前='{current.get('TAPD预计完成日期')}' (type={type(current.get('TAPD预计完成日期'))}) vs 新='{new_due}' (type={type(new_due)})")
needs_update = self._check_needs_update(
current, new_status, new_owner, new_begin, new_due, new_plan
)
print(f" needs_update = {needs_update}")
if needs_update:
# 生成同步成功状态(包含时间戳)
time_str = self._get_beijing_time_str()
sync_status_success = f"✅ 同步成功 {time_str}"
update_record = self.current_smartsheet.build_update_record(
record_id=record_info["record_id"],
status=new_status,
owner=new_owner,
begin_date=new_begin,
due_date=new_due,
plan=new_plan,
sync_status=sync_status_success
)
updates.append(update_record)
except Exception as e:
result["failed"] += 1
error_msg = str(e)
# 记录持续同步失败日志
self.logger.log_api_call(
api_type="task2",
operation="continuous_sync_failure",
request_data={
"record_id": record_info["record_id"],
"story_id": record_info["story_id"]
},
response_data={},
success=False,
error_message=error_msg
)
if self.test_mode:
print(f" ✗ 记录 {record_info['record_id']} 更新失败: {e}")
# 批量更新
if updates:
print(f" 正在更新 {len(updates)} 条记录...")
self.current_smartsheet.batch_update_records(sheet_id, updates)
result["updated"] = len(updates)
print(f" ✓ 持续同步更新完成")
else:
print(f" ✓ 所有记录状态均未变化")
return result
def _log_cache_statistics(self):
"""记录缓存统计信息到日志"""
stats = self._cache_stats
if stats["total_queries"] == 0:
return
hit_rate = (stats["cache_hits"] / stats["total_queries"]) * 100
print(f"\n{'='*60}")
print(f"TAPD查询缓存统计")
print(f"{'='*60}")
print(f" 总查询次数: {stats['total_queries']}")
print(f" 缓存命中: {stats['cache_hits']} ({hit_rate:.1f}%)")
print(f" 缓存未命中: {stats['cache_misses']}")
print(f" 实际API调用: {stats['api_calls']}")
print(f" 缓存失败记录命中: {stats['cached_failures']}")
print(f" 缓存条目数: {len(self._story_cache)}")
print(f"{'='*60}\n")
# 记录到日志文件
self.logger.log_api_call(
api_type="task2",
operation="cache_statistics",
request_data={},
response_data=stats,
success=True,
error_message=None
)
def _send_failure_notification(self, failed_records: List[Dict]) -> None:
"""
发送同步失败通知
Args:
failed_records: 失败记录列表
"""
try:
# 获取企业微信配置
wework_config = self.config.get('wework')
if not wework_config:
print(" 未配置企业微信推送,跳过失败通知")
return
agentid = wework_config.get('agentid')
receivers = wework_config.get('receivers')
if not agentid or not receivers:
print(" ⚠ 企业微信推送配置不完整,跳过失败通知")
return
# 发送推送通知
print(f"\n正在发送同步失败通知...")
from src2.notifier import send_sync_failure_notification
success = send_sync_failure_notification(
self.access_token,
agentid,
receivers,
failed_records
)
if success:
print(f" ✓ 失败通知已发送")
else:
print(f" ✗ 失败通知发送失败")
except Exception as e:
print(f" ✗ 发送失败通知时出错: {e}")
if self.test_mode:
import traceback
traceback.print_exc()
def run_once(config_manager: Task2ConfigManager = None,
access_token: str = None,
test_mode: bool = False) -> Dict[str, Any]:
"""
执行一次同步流程供调度器调用
Args:
config_manager: 配置管理器
access_token: access_token
test_mode: 测试模式
Returns:
Dict: 同步结果
"""
service = SyncService(
config_manager=config_manager,
access_token=access_token,
test_mode=test_mode
)
return service.sync_once()
if __name__ == "__main__":
print("=== 任务二同步服务测试 ===\n")
print("请使用 main.py 或 scheduler.py 运行同步服务")
print("或使用 test_phase4.py 进行测试")

381
src2/tapd_api.py Normal file
View File

@ -0,0 +1,381 @@
"""
TAPD API调用模块任务二专用
负责与TAPD Open API交互查询需求Story信息
与任务一的区别
- 任务一创建和管理Bug单
- 任务二查询需求Story状态信息
"""
import os
import requests
import time
from typing import Dict, Optional, Any
from requests.auth import HTTPBasicAuth
# 导入任务二专用的日志模块
from src2.logger import get_task2_logger
# ============================================================
# 自定义异常类
# ============================================================
class StoryNotFoundException(Exception):
"""当TAPD需求Story不存在时抛出的异常"""
pass
# TAPD状态值映射表
# 将API返回的状态代码转换为中文显示文本
STATUS_MAPPING = {
"status_5": "进行中",
"status_7": "未开始",
"status_8": "已完成",
"status_9": "待验收",
"status_10": "联调",
"status_12": "取消",
"status_13": "待评审",
}
# 需求终态列表(这些状态不需要持续同步)
TERMINAL_STATUSES = ['已完成', '取消']
def map_status(status_code: str) -> str:
"""
将TAPD状态代码转换为中文显示文本
Args:
status_code: TAPD API返回的状态代码 "status_5"
Returns:
str: 中文显示文本 "进行中"未知状态返回原始值
"""
if not status_code:
return "未知"
return STATUS_MAPPING.get(status_code, status_code)
class TAPDStoryApi:
"""TAPD需求API封装类任务二专用"""
# TAPD API基础URL与任务一相同
BASE_URL = "https://tapd-api.bilibili.co/tapd"
# 发布计划字段名称
PLAN_FIELD_NAME = "release_id"
def __init__(self, workspace_id: str, test_mode: bool = False):
"""
初始化TAPD Story API
Args:
workspace_id: TAPD项目ID
test_mode: 是否启用测试模式显示API请求和响应
Raises:
ValueError: 环境变量未设置时抛出
"""
self.workspace_id = workspace_id
self.test_mode = test_mode
self.session = requests.Session()
# 从环境变量读取认证信息(与任务一共用)
self.api_user = os.environ.get('TAPD_API_USER')
self.api_password = os.environ.get('TAPD_API_PASSWORD')
if not self.api_user or not self.api_password:
raise ValueError(
"TAPD认证信息未设置。请设置环境变量:\n"
" - TAPD_API_USER\n"
" - TAPD_API_PASSWORD"
)
# 设置Basic Auth
self.auth = HTTPBasicAuth(self.api_user, self.api_password)
# 初始化任务二专用的日志记录器
self.logger = get_task2_logger()
# 计划字段映射缓存ID -> 中文名称)
self._plan_mapping = None
print(f" ✓ TAPD Story API初始化完成 (workspace_id: {workspace_id})")
if test_mode:
print(f" ⚠ 测试模式已启用将显示所有API调用的详细信息")
def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
发起TAPD API GET请求的通用方法支持429错误重试
Args:
endpoint: API端点 "stories"
params: URL查询参数
Returns:
Dict: API响应数据
Raises:
RuntimeError: API调用失败时抛出
"""
url = f"{self.BASE_URL}/{endpoint}"
# 准备日志记录的请求数据
log_request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
# 测试模式:显示请求信息
if self.test_mode:
print("\n" + "=" * 60)
print(f"【测试模式】TAPD API调用: {endpoint}")
print("=" * 60)
print(f"请求URL: {url}")
if params:
print(f"URL参数:")
for key, value in params.items():
print(f" {key}: {value}")
# 429错误重试逻辑最多重试1次
max_retries = 1
retry_count = 0
while retry_count <= max_retries:
try:
response = self.session.get(
url,
params=params,
auth=self.auth,
timeout=30
)
# 测试模式:显示响应信息
if self.test_mode:
print(f"\n响应状态码: {response.status_code}")
try:
import json
result = response.json()
print(f"响应数据:")
print(json.dumps(result, ensure_ascii=False, indent=2))
except:
print(f"响应内容: {response.text[:500]}")
print("=" * 60)
# 检查是否是429错误在raise_for_status之前检查
if response.status_code == 429:
if retry_count < max_retries:
retry_count += 1
wait_seconds = 60
print(f"\n⚠️ 触发TAPD API限流 (429 Too Many Requests)")
print(f" [开始等待] 等待 {wait_seconds} 秒后重试... (第 {retry_count}/{max_retries} 次重试)")
print(f" [重要] 在等待期间,代码会阻塞在这里,不会继续处理其他记录")
# 记录429错误日志
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={"status_code": 429, "retry_count": retry_count},
success=False,
error_message=f"429 Too Many Requests, 等待{wait_seconds}秒后重试"
)
time.sleep(wait_seconds)
print(f" [等待结束] 开始重试请求...")
continue # 重试
else:
# 已达到最大重试次数,抛出异常
error_msg = "TAPD API限流 (429 Too Many Requests),重试后仍然失败"
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={"status_code": 429, "retry_count": retry_count},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
# 对于非429错误调用raise_for_status检查HTTP状态
response.raise_for_status()
result = response.json()
# 检查TAPD API返回的状态
if result.get('status') != 1:
error_msg = result.get('info', '未知错误')
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=error_msg
)
raise RuntimeError(f"TAPD API调用失败: {error_msg}")
# 记录成功日志
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=True
)
return result
except requests.exceptions.Timeout:
error_msg = f"TAPD API请求超时: {endpoint}"
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
except requests.exceptions.RequestException as e:
# 如果是HTTPError且状态码是429由上面的status_code检查处理
# 这里不应该到达因为429在response.status_code检查时已处理
error_msg = f"TAPD API请求失败: {e}"
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
# 理论上不会到达这里
raise RuntimeError("TAPD API请求失败未知错误")
def get_story(self, story_id: str) -> Dict:
"""
获取需求详情
Args:
story_id: 需求ID
Returns:
Dict: 需求详细信息
Raises:
RuntimeError: 获取失败时抛出
"""
params = {
'workspace_id': self.workspace_id,
'id': story_id
}
result = self._make_request("stories", params=params)
# TAPD API返回格式: {"status": 1, "data": [{"Story": {...}}]}
data = result.get('data', [])
if not isinstance(data, list) or len(data) == 0:
raise StoryNotFoundException(f"未找到需求: {story_id}")
# 取第一个元素
first_item = data[0]
# 提取Story对象
if isinstance(first_item, dict) and 'Story' in first_item:
story_info = first_item['Story']
else:
raise RuntimeError(f"API返回数据格式异常: {first_item}")
if not story_info:
raise StoryNotFoundException(f"未找到需求: {story_id}")
# 转换状态为中文
raw_status = story_info.get('status', '')
story_info['raw_status'] = raw_status
story_info['status'] = map_status(raw_status)
return story_info
def get_story_url(self, story_id: str) -> str:
"""
生成需求的访问URL
Args:
story_id: 需求ID
Returns:
str: 需求的访问URL
"""
return f"https://www.tapd.cn/{self.workspace_id}/prong/stories/view/{story_id}"
def get_story_fields_info(self) -> Dict:
"""
获取需求所有字段及候选值
Returns:
Dict: 字段信息包含各字段的名称选项等
Raises:
RuntimeError: 获取失败时抛出
"""
params = {
'workspace_id': self.workspace_id
}
result = self._make_request("stories/get_fields_info", params=params)
return result.get('data', {})
def get_plan_mapping(self) -> Dict[str, str]:
"""
获取发布计划字段的ID到中文名称映射
Returns:
Dict[str, str]: 发布计划ID到中文名称的映射
例如: {"1010104801000069739": "v2test", ...}
"""
# 获取字段信息
fields_info = self.get_story_fields_info()
# 提取计划字段的options
plan_field = fields_info.get(self.PLAN_FIELD_NAME, {})
options = plan_field.get('options', {})
# 缓存映射
self._plan_mapping = options
if self.test_mode:
print(f"\n【测试模式】计划字段映射:")
for plan_id, plan_name in options.items():
print(f" {plan_id} -> {plan_name}")
return options
def map_plan_id_to_name(self, plan_id: str) -> str:
"""
将发布计划ID转换为中文名称
Args:
plan_id: 发布计划ID "1010104801000069739"
Returns:
str: 中文名称 "v2test"未找到则返回空字符串
"""
if not plan_id or plan_id == "0":
return ""
# 如果映射未初始化,先获取
if self._plan_mapping is None:
self.get_plan_mapping()
return self._plan_mapping.get(plan_id, "")
if __name__ == "__main__":
print("=== TAPD Story API 测试 ===\n")
print("请使用 test_phase2.py 进行完整测试")

210
src2/test_phase2.py Normal file
View File

@ -0,0 +1,210 @@
"""
任务二第二阶段验证脚本
测试链接解析和TAPD API功能
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src2.link_parser import parse_tapd_link, extract_story_id, is_valid_tapd_link
from src2.tapd_api import TAPDStoryApi, map_status, STATUS_MAPPING
def test_link_parser():
"""测试链接解析功能"""
print("=" * 60)
print("测试1: 链接解析器")
print("=" * 60)
test_cases = [
# 格式一:列表页弹窗链接
(
"https://www.tapd.cn/tapd_fe/58335167/story/list?dialog_preview_id=story_1158335167001044388",
True,
"1158335167001044388",
"dialog"
),
# 格式二:详情页链接
(
"https://www.tapd.cn/58335167/prong/stories/view/1158335167001044388",
True,
"1158335167001044388",
"view"
),
# 无效链接Bug链接
(
"https://www.tapd.cn/58335167/bugtrace/bugs/view/123456",
False,
None,
"unknown"
),
# 无效链接:其他网站
(
"https://www.google.com",
False,
None,
"unknown"
),
# 空链接
(
"",
False,
None,
"unknown"
),
]
passed = 0
failed = 0
for i, (url, expected_success, expected_id, expected_type) in enumerate(test_cases, 1):
success, result, link_type = parse_tapd_link(url)
# 检查结果
if success == expected_success and link_type == expected_type:
if success and result == expected_id:
print(f" [{i}] PASS: {url[:50]}...")
passed += 1
elif not success:
print(f" [{i}] PASS: 正确识别无效链接")
passed += 1
else:
print(f" [{i}] FAIL: 单号不匹配 (期望={expected_id}, 实际={result})")
failed += 1
else:
print(f" [{i}] FAIL: {url[:50]}...")
print(f" 期望: success={expected_success}, type={expected_type}")
print(f" 实际: success={success}, type={link_type}")
failed += 1
print(f"\n链接解析测试结果: {passed} 通过, {failed} 失败")
return failed == 0
def test_status_mapping():
"""测试状态映射功能"""
print("\n" + "=" * 60)
print("测试2: 状态映射")
print("=" * 60)
test_cases = [
("status_5", "进行中"),
("status_7", "未开始"),
("status_8", "已完成"),
("status_9", "待验收"),
("status_10", "联调"),
("status_12", "取消"),
("status_13", "待评审"),
("status_99", "status_99"), # 未知状态返回原值
("", "未知"),
(None, "未知"),
]
passed = 0
failed = 0
for status_code, expected in test_cases:
result = map_status(status_code)
if result == expected:
print(f" PASS: {status_code} -> {result}")
passed += 1
else:
print(f" FAIL: {status_code} -> {result} (期望: {expected})")
failed += 1
print(f"\n状态映射测试结果: {passed} 通过, {failed} 失败")
return failed == 0
def test_tapd_api(story_id: str = None):
"""测试TAPD API功能"""
print("\n" + "=" * 60)
print("测试3: TAPD API")
print("=" * 60)
# 从配置读取workspace_id
from src2.config import Task2ConfigManager
config = Task2ConfigManager()
tapd_config = config.get_tapd_config()
workspace_id = tapd_config['workspace_id']
print(f" workspace_id: {workspace_id}")
try:
# 初始化API
api = TAPDStoryApi(workspace_id, test_mode=True)
print(" ✓ API初始化成功")
except ValueError as e:
print(f" ✗ API初始化失败: {e}")
return False
if not story_id:
print("\n 跳过需求查询测试未提供story_id")
print(" 用法: python test_phase2.py <story_id>")
return True
# 测试获取需求详情
print(f"\n 测试获取需求: {story_id}")
try:
story = api.get_story(story_id)
print(f" ✓ 获取成功")
print(f" - ID: {story.get('id')}")
print(f" - 名称: {story.get('name')}")
print(f" - 状态: {story.get('status')}")
print(f" - 处理人: {story.get('owner')}")
print(f" - 预计开始: {story.get('begin')}")
print(f" - 预计结束: {story.get('due')}")
return True
except RuntimeError as e:
print(f" ✗ 获取失败: {e}")
return False
def main():
"""主函数"""
print("=" * 60)
print("任务二第二阶段验证")
print("=" * 60)
# 获取命令行参数
story_id = None
if len(sys.argv) > 1:
story_id = sys.argv[1]
results = []
# 测试1: 链接解析
results.append(("链接解析", test_link_parser()))
# 测试2: 状态映射
results.append(("状态映射", test_status_mapping()))
# 测试3: TAPD API
results.append(("TAPD API", test_tapd_api(story_id)))
# 汇总结果
print("\n" + "=" * 60)
print("验收结果汇总")
print("=" * 60)
all_passed = True
for name, passed in results:
status = "✓ PASS" if passed else "✗ FAIL"
print(f" {status}: {name}")
if not passed:
all_passed = False
if all_passed:
print("\n所有测试通过!第二阶段验收完成。")
else:
print("\n部分测试失败,请检查。")
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())

330
src2/test_phase3.py Normal file
View File

@ -0,0 +1,330 @@
"""
第三阶段验证脚本智能表格读写功能测试
验证项
1. 字段检测 - 检查必要字段是否存在
2. 记录读取 - 获取所有记录
3. TAPD链接提取 - 从记录中提取链接并解析
4. 数据回写测试 - 构造更新记录可选执行
用法
python src2/test_phase3.py # 只读测试(不修改数据)
python src2/test_phase3.py --write # 包含回写测试(会修改一条记录)
"""
import sys
import argparse
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
from src2.smartsheet_sync import (
SmartSheetSync,
REQUIRED_FIELDS,
FIELD_TAPD_LINK,
FIELD_TAPD_STATUS,
FIELD_OWNER,
FIELD_BEGIN_DATE,
FIELD_DUE_DATE,
)
def print_separator(title: str = ""):
"""打印分隔线"""
print("\n" + "=" * 60)
if title:
print(f" {title}")
print("=" * 60)
def test_field_detection(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试1字段检测
Returns:
bool: 测试是否通过
"""
print_separator("测试1字段检测")
print(f"必要字段列表:")
for field in REQUIRED_FIELDS:
print(f" - {field}")
print()
# 获取字段信息
fields = sync.api.get_fields(sheet_id)
# 检查必要字段
all_present, missing_fields, field_mapping = sync.check_required_fields(fields)
print(f"\n字段映射结果:")
for field_name in REQUIRED_FIELDS:
field_id = field_mapping.get(field_name, "未找到")
status = "" if field_name in field_mapping else ""
print(f" {status} {field_name}: {field_id}")
if all_present:
print(f"\n✓ 测试通过:所有必要字段都存在")
return True
else:
print(f"\n✗ 测试失败:缺少字段 {missing_fields}")
return False
def test_record_reading(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试2记录读取
Returns:
bool: 测试是否通过
"""
print_separator("测试2记录读取")
try:
records = sync.get_all_records(sheet_id)
print(f"\n记录读取结果:")
print(f" - 总记录数: {len(records)}")
if len(records) > 0:
print(f"\n第一条记录示例:")
first_record = records[0]
record_id = first_record.get('record_id', 'N/A')
print(f" - record_id: {record_id}")
# 显示部分字段值
values = first_record.get('values', {})
print(f" - 字段数量: {len(values)}")
# 显示前5个字段
for i, (key, value) in enumerate(values.items()):
if i >= 5:
print(f" - ... 还有 {len(values) - 5} 个字段")
break
print(f" - {key}: {str(value)[:50]}...")
print(f"\n✓ 测试通过:成功读取 {len(records)} 条记录")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_tapd_link_extraction(sync: SmartSheetSync, sheet_id: str) -> bool:
"""
测试3TAPD链接提取
Returns:
bool: 测试是否通过
"""
print_separator("测试3TAPD链接提取")
try:
records_with_link = sync.get_records_with_tapd_link(sheet_id)
print(f"\n链接提取结果:")
print(f" - 包含链接的记录数: {len(records_with_link)}")
# 统计解析结果
success_count = sum(1 for r in records_with_link if r["parse_success"])
fail_count = len(records_with_link) - success_count
print(f" - 解析成功: {success_count}")
print(f" - 解析失败: {fail_count}")
# 显示前3条成功解析的记录
success_records = [r for r in records_with_link if r["parse_success"]]
if success_records:
print(f"\n成功解析的记录示例最多3条")
for i, record_info in enumerate(success_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 单号: {record_info['story_id']}")
print(f" 类型: {record_info.get('link_type', 'N/A')}")
# 显示解析失败的记录
fail_records = [r for r in records_with_link if not r["parse_success"]]
if fail_records:
print(f"\n解析失败的记录最多3条")
for i, record_info in enumerate(fail_records[:3]):
print(f"\n [{i+1}] record_id: {record_info['record_id']}")
print(f" 链接: {record_info['tapd_link'][:60]}...")
print(f" 错误: {record_info.get('parse_error', 'N/A')}")
print(f"\n✓ 测试通过成功提取并解析TAPD链接")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
import traceback
traceback.print_exc()
return False
def test_update_record_structure(sync: SmartSheetSync) -> bool:
"""
测试4更新记录结构构造不实际写入
Returns:
bool: 测试是否通过
"""
print_separator("测试4更新记录结构构造")
try:
# 构造一个测试更新记录
test_record = sync.build_update_record(
record_id="test_record_id_123",
status="进行中",
owner="张三",
begin_date="2025-01-01",
due_date="2025-01-15"
)
print(f"构造的更新记录结构:")
print(f" record_id: {test_record['record_id']}")
print(f" values:")
for key, value in test_record['values'].items():
print(f" {key}: {value}")
# 验证结构
assert 'record_id' in test_record
assert 'values' in test_record
assert FIELD_TAPD_STATUS in test_record['values']
assert FIELD_OWNER in test_record['values']
assert FIELD_BEGIN_DATE in test_record['values']
assert FIELD_DUE_DATE in test_record['values']
print(f"\n✓ 测试通过:更新记录结构正确")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def test_multi_sheet_support(sync: SmartSheetSync) -> bool:
"""
测试5多子表支持
Returns:
bool: 测试是否通过
"""
print_separator("测试5多子表支持")
try:
# 获取所有子表
sheet_list = sync.api.get_sheet_list()
print(f"子表列表:")
for i, sheet in enumerate(sheet_list):
sheet_id = sheet.get('sheet_id', 'N/A')
title = sheet.get('title', 'N/A')
print(f" [{i+1}] {title} (ID: {sheet_id})")
print(f"\n共找到 {len(sheet_list)} 个子表")
if len(sheet_list) == 0:
print(f"\n⚠ 警告:没有找到子表")
return False
print(f"\n✓ 测试通过:成功获取子表列表")
return True
except Exception as e:
print(f"\n✗ 测试失败:{e}")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="第三阶段验证脚本")
parser.add_argument("--write", action="store_true",
help="执行回写测试(会修改数据)")
args = parser.parse_args()
print("=" * 60)
print(" 任务二 第三阶段验证:智能表格读写功能")
print("=" * 60)
# 1. 加载配置
print("\n[初始化] 加载配置...")
config = Task2ConfigManager()
smartsheet_config = config.get_smartsheet_config()
docid = smartsheet_config.get('docid')
print(f" docid: {docid[:20]}...")
# 2. 获取token
print("\n[初始化] 获取access_token...")
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" token: {access_token[:20]}...")
# 3. 初始化同步模块
print("\n[初始化] 初始化SmartSheetSync...")
sync = SmartSheetSync(access_token, docid, test_mode=False)
print(" ✓ 初始化完成")
# 4. 获取子表列表
sheet_list = sync.api.get_sheet_list()
if not sheet_list:
print("\n✗ 错误:没有找到子表")
return 1
# 使用第一个子表进行测试
first_sheet = sheet_list[0]
sheet_id = first_sheet.get('sheet_id')
sheet_title = first_sheet.get('title')
print(f"\n使用子表进行测试: {sheet_title}")
# 5. 运行测试
results = []
# 测试5先执行多子表支持
results.append(("多子表支持", test_multi_sheet_support(sync)))
# 测试1字段检测
results.append(("字段检测", test_field_detection(sync, sheet_id)))
# 测试2记录读取
results.append(("记录读取", test_record_reading(sync, sheet_id)))
# 测试3TAPD链接提取
results.append(("TAPD链接提取", test_tapd_link_extraction(sync, sheet_id)))
# 测试4更新记录结构
results.append(("更新记录结构", test_update_record_structure(sync)))
# 6. 输出测试结果汇总
print_separator("测试结果汇总")
passed = 0
failed = 0
for name, result in results:
status = "✓ 通过" if result else "✗ 失败"
print(f" {status}: {name}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if failed == 0:
print("\n" + "=" * 60)
print(" ✓ 第三阶段验证全部通过!")
print("=" * 60)
return 0
else:
print("\n" + "=" * 60)
print(" ✗ 部分测试失败,请检查")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())

160
src2/test_phase4.py Normal file
View File

@ -0,0 +1,160 @@
"""
任务二第四阶段验证脚本
验证同步服务与主程序的完整功能
验证项
1. 同步服务初始化
2. 单次同步流程
3. 调度器初始化
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_sync_service_init():
"""测试1: 同步服务初始化"""
print("\n" + "=" * 60)
print("测试1: 同步服务初始化")
print("=" * 60)
try:
from src2.sync_service import SyncService
from src2.config import Task2ConfigManager
# 初始化配置
config_manager = Task2ConfigManager()
print("✓ 配置管理器初始化成功")
# 初始化同步服务(测试模式)
service = SyncService(
config_manager=config_manager,
test_mode=True
)
print("✓ 同步服务初始化成功")
# 验证属性
assert service.workspace_id is not None
assert service.docid is not None
assert service.access_token is not None
print("✓ 服务属性验证通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_sync_once():
"""测试2: 单次同步流程"""
print("\n" + "=" * 60)
print("测试2: 单次同步流程")
print("=" * 60)
try:
from src2.sync_service import run_once
# 执行一次同步
print("正在执行同步...")
result = run_once(test_mode=False)
# 验证结果结构
assert 'success' in result
assert 'sheets_processed' in result
assert 'records_with_link' in result
assert 'records_synced' in result
print("✓ 结果结构验证通过")
# 打印结果摘要
print(f"\n同步结果:")
print(f" 成功: {result['success']}")
print(f" 处理子表: {result['sheets_processed']}")
print(f" 跳过子表: {result['sheets_skipped']}")
print(f" 包含链接: {result['records_with_link']}")
print(f" 同步成功: {result['records_synced']}")
print(f" 需要更新: {result['records_updated']}")
return result['success']
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_scheduler_init():
"""测试3: 调度器初始化"""
print("\n" + "=" * 60)
print("测试3: 调度器初始化")
print("=" * 60)
try:
from src2.scheduler import Task2Scheduler
# 初始化调度器(不启动)
scheduler = Task2Scheduler(verbose=False)
print("✓ 调度器初始化成功")
# 验证属性
assert scheduler.config is not None
assert scheduler.sync_interval > 0
print(f"✓ 同步间隔: {scheduler.sync_interval} 分钟")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("\n" + "=" * 60)
print("任务二第四阶段验证")
print("=" * 60)
results = {}
# 测试1: 同步服务初始化
results['sync_service_init'] = test_sync_service_init()
# 测试2: 单次同步流程
results['sync_once'] = test_sync_once()
# 测试3: 调度器初始化
results['scheduler_init'] = test_scheduler_init()
# 打印总结
print("\n" + "=" * 60)
print("验证结果总结")
print("=" * 60)
all_passed = True
for test_name, passed in results.items():
status = "✓ 通过" if passed else "✗ 失败"
print(f" {test_name}: {status}")
if not passed:
all_passed = False
print("=" * 60)
if all_passed:
print("所有测试通过!第四阶段验收完成。")
else:
print("部分测试失败,请检查错误信息。")
print("=" * 60)
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())

220
src2/test_setup.py Normal file
View File

@ -0,0 +1,220 @@
"""
任务二第一阶段验证脚本
验证基础框架搭建是否正确
"""
import sys
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_directory_structure():
"""测试1: 验证目录结构"""
print("=" * 50)
print("测试1: 验证目录结构")
print("=" * 50)
src2_dir = project_root / "src2"
logs2_dir = project_root / "logs2"
config_file = project_root / "config" / "config_task2.ini"
results = []
# 检查 src2 目录
if src2_dir.exists() and src2_dir.is_dir():
print(f" [OK] src2/ 目录存在")
results.append(True)
else:
print(f" [FAIL] src2/ 目录不存在")
results.append(False)
# 检查 logs2 目录
if logs2_dir.exists() and logs2_dir.is_dir():
print(f" [OK] logs2/ 目录存在")
results.append(True)
else:
print(f" [FAIL] logs2/ 目录不存在")
results.append(False)
# 检查配置文件
if config_file.exists():
print(f" [OK] config/config_task2.ini 存在")
results.append(True)
else:
print(f" [FAIL] config/config_task2.ini 不存在")
results.append(False)
return all(results)
def test_config_read():
"""测试2: 验证配置文件读取"""
print("\n" + "=" * 50)
print("测试2: 验证配置文件读取")
print("=" * 50)
try:
from src2.config import Task2ConfigManager
config = Task2ConfigManager()
# 读取TAPD配置
tapd_config = config.get_tapd_config()
print(f" [OK] TAPD配置读取成功")
print(f" workspace_id: {tapd_config['workspace_id']}")
# 读取SmartSheet配置
smartsheet_config = config.get_smartsheet_config()
print(f" [OK] SmartSheet配置读取成功")
print(f" docid: {smartsheet_config['docid']}")
# 读取Schedule配置
schedule_config = config.get_schedule_config()
print(f" [OK] Schedule配置读取成功")
print(f" sync_interval: {schedule_config['sync_interval']} 分钟")
return True
except Exception as e:
print(f" [FAIL] 配置读取失败: {e}")
return False
def test_logger():
"""测试3: 验证日志写入"""
print("\n" + "=" * 50)
print("测试3: 验证日志写入到 logs2/")
print("=" * 50)
try:
from src2.logger import get_task2_logger, TASK2_LOG_DIR
logger = get_task2_logger()
# 写入测试日志
logger.log_api_call(
api_type="test",
operation="task2/setup_test",
request_data={"test": "验证脚本测试"},
response_data={"status": "success"},
success=True
)
# 检查日志文件是否创建
log_file = logger._get_today_log_file()
if log_file.exists():
print(f" [OK] 日志写入成功")
print(f" 日志目录: {TASK2_LOG_DIR}")
print(f" 日志文件: {log_file.name}")
return True
else:
print(f" [FAIL] 日志文件未创建")
return False
except Exception as e:
print(f" [FAIL] 日志测试失败: {e}")
return False
def test_token_manager():
"""测试4: 验证Token管理器复用"""
print("\n" + "=" * 50)
print("测试4: 验证Token管理器复用")
print("=" * 50)
try:
from src.token_manager import TokenManager
# 创建TokenManager实例使用默认缓存路径
token_manager = TokenManager()
print(f" [OK] TokenManager导入成功")
print(f" 缓存文件: {token_manager.cache_file_path}")
# 尝试获取token
token = token_manager.get_token()
print(f" [OK] Token获取成功")
print(f" Token前20字符: {token[:20]}...")
return True
except ValueError as e:
print(f" [WARN] 环境变量未设置: {e}")
print(f" 这不影响框架搭建,后续运行时需要设置")
return True # 环境变量未设置不算失败
except Exception as e:
print(f" [FAIL] Token测试失败: {e}")
return False
def test_smartsheet_api():
"""测试5: 验证SmartSheetAPI复用"""
print("\n" + "=" * 50)
print("测试5: 验证SmartSheetAPI复用")
print("=" * 50)
try:
from src.smartsheet import SmartSheetAPI
from src.token_manager import TokenManager
from src2.config import Task2ConfigManager
print(f" [OK] SmartSheetAPI导入成功")
# 获取配置
config = Task2ConfigManager()
docid = config.get_smartsheet_config()['docid']
# 获取token
token_manager = TokenManager()
token = token_manager.get_token()
# 创建SmartSheetAPI实例
api = SmartSheetAPI(token, docid)
print(f" [OK] SmartSheetAPI实例创建成功")
print(f" docid: {docid}")
return True
except Exception as e:
print(f" [FAIL] SmartSheetAPI测试失败: {e}")
return False
def main():
"""运行所有测试"""
print("\n" + "=" * 50)
print("任务二第一阶段验证")
print("=" * 50)
results = {
"目录结构": test_directory_structure(),
"配置读取": test_config_read(),
"日志写入": test_logger(),
"Token管理": test_token_manager(),
"SmartSheetAPI": test_smartsheet_api()
}
# 汇总结果
print("\n" + "=" * 50)
print("验证结果汇总")
print("=" * 50)
passed = 0
failed = 0
for name, result in results.items():
status = "[OK]" if result else "[FAIL]"
print(f" {status} {name}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if failed == 0:
print("\n第一阶段验收通过!")
else:
print("\n请检查失败项并修复")
return failed == 0
if __name__ == "__main__":
main()

130
src2/test_update_records.py Normal file
View File

@ -0,0 +1,130 @@
"""
测试 update_records 功能的独立脚本
用于调试智能表格的记录更新功能
"""
import sys
import json
import requests
from pathlib import Path
# 将项目根目录添加到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.token_manager import TokenManager
def test_update_records():
"""测试更新记录功能"""
print("=" * 60)
print("测试 update_records 功能")
print("=" * 60)
# 1. 获取 access_token
print("\n[1/3] 获取 access_token...")
try:
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f" ✓ access_token: {access_token[:20]}...(已隐藏)")
except Exception as e:
print(f" ✗ 获取失败: {e}")
return False
# 2. 准备测试数据
print("\n[2/3] 准备测试数据...")
BASE_URL = "https://qyapi.weixin.qq.com/cgi-bin/wedoc"
# 测试数据
data = {
"docid": "dcOsT3czWy0YEDg38vlDqwVCTjv0kzwC_GU2XmT9wSZctQ0ZJQUAV7vMQ3ljZx-n_NqxzEEYG2DiLAvNdNsHJwgQ",
"sheet_id": "56YEeR",
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": [
{
"record_id": "rNrk6o",
"values": {
"TAPD状态": [
{
"type": "text",
"text": "已完成"
}
],
"处理人": [
{
"type": "text",
"text": ""
}
],
"TAPD预计完成日期": [
{
"type": "text",
"text": "2025-11-28"
}
]
}
}
]
}
print(" ✓ 测试数据已准备")
print(f" - docid: {data['docid'][:20]}...")
print(f" - sheet_id: {data['sheet_id']}")
print(f" - 记录数: {len(data['records'])}")
print(f" - record_id: {data['records'][0]['record_id']}")
# 3. 发送请求
print("\n[3/3] 发送 update_records 请求...")
# 构造完整URL带debug=1
url = f"{BASE_URL}/smartsheet/update_records?access_token={access_token}&debug=1"
print(f"\n请求信息:")
print(f" URL: {BASE_URL}/smartsheet/update_records?access_token=***&debug=1")
print(f" Method: POST")
print(f"\n请求数据:")
print(json.dumps(data, ensure_ascii=False, indent=2))
try:
# 发送POST请求
print("\n正在发送请求...")
response = requests.post(url, json=data, timeout=30)
# 打印响应信息
print(f"\n响应信息:")
print(f" 状态码: {response.status_code}")
print(f"\n响应数据:")
result = response.json()
print(json.dumps(result, ensure_ascii=False, indent=2))
# 检查结果
print("\n" + "=" * 60)
if result.get('errcode', 0) == 0:
print("✓ 测试成功!")
updated_records = result.get('records', [])
print(f" 更新记录数: {len(updated_records)}")
return True
else:
print("✗ 测试失败!")
print(f" 错误码: {result.get('errcode')}")
print(f" 错误信息: {result.get('errmsg')}")
return False
except Exception as e:
print(f"\n✗ 请求异常: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
success = test_update_records()
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())