'日志迭代phase2'

This commit is contained in:
zelong 2026-03-10 14:23:17 +08:00
parent 0769cf4755
commit 403b8fee0d
6 changed files with 274 additions and 151 deletions

View File

@ -94,7 +94,7 @@
## 4. 模块接口演进计划(摘要)
- 阶段1新增全局日志内核模块定义统一接口。已完成
- 阶段2`src/api_logger.py` 改造成兼容层,保证旧调用可用。
- 阶段2`src/api_logger.py` 改造成兼容层,保证旧调用可用。(已完成)
- 阶段3`src2/logger.py` 与任务二编排层切换到统一内核,修复串目录。
- 阶段4更新查看工具与文档支持 `jsonl + sync_id`
@ -109,3 +109,9 @@
### 2026-02-28更新
- 新增 `core/global_log_system.py``core/__init__.py` 模块说明。
- 标记阶段1完成统一日志内核已落地待阶段2/3接线。
### 2026-02-28更新2
- 标记阶段2完成任务一已接入统一日志内核。
- `src/scheduler.py``job/sync_job` 已接入同步边界与统计事件。
- `src/main.py``run_once` 已接入手动兜底同步边界。
- `src/smartsheet.py` 已消除同请求双记录矛盾。

View File

@ -103,3 +103,43 @@
- **可回滚点**`core/` 新增为独立改动,可单独回滚。
- **关联文档**`docs/日志系统重构实施方案.md`
- **备注**:下一阶段优先完成任务一接入并验证“单次调用单条记录”。
## 阶段2接入任务一src
- **阶段名称**:日志系统重构 - 阶段2
- **日期**2026-02-28
- **负责人**Codex
- **目标**:将任务一接入全局日志内核,补齐同步分隔与统计,并修复双记录矛盾。
### 变更清单
- **新增文件**:无
- **修改文件**
- `src/api_logger.py`
- `src/main.py`
- `src/scheduler.py`
- `src/smartsheet.py`
- `docs/全局迭代日志.md`
- `docs/全局框架文档.md`
- **删除文件**:无
### 关键改动说明
- **日志结构变更**:任务一日志由旧 JSON 数组切换为 jsonl 事件流(通过兼容层接入)。
- **接口/调用链变更**
- `src/api_logger.py` 保留旧接口,内部转发到全局内核。
- `src/scheduler.py``job/sync_job` 增加 `start_sync/end_sync_with_stats`
- `src/main.py``run_once` 增加“无外层同步时自动兜底”的同步边界。
- **兼容性说明**:旧调用 `log_api_call(...)` 保持可用;历史 `api_type` 通过映射落入 `module` 三类。
### 验收结果
- **通过项**
- 任务一生产链路scheduler 触发)已具备每次同步分隔与统计写入。
- 任务一 API 日志已接入统一内核。
- `src/smartsheet.py` 已修复“同一次请求先 success 后 failure”的双记录问题。
- **未通过项**
- 任务二串目录问题尚未处理待阶段3
- **遗留风险**
- `main.py` 直跑与 scheduler 路径都可触发同步边界,后续需确保运维使用一致入口。
### 回滚与追踪
- **可回滚点**:可按文件粒度回滚(`src/api_logger.py``src/scheduler.py` 为关键点)。
- **关联文档**`docs/日志系统重构实施方案.md`
- **备注**:下一阶段优先处理任务二串目录与通知链路复用问题。

View File

@ -1,170 +1,152 @@
"""
API调用日志记录模块
负责记录所有API调用的请求和响应到JSON文件
任务一日志兼容层
说明
1. 对外保持 APILogger / get_logger 接口不变
2. 内部接入 core.global_log_system jsonl 日志内核
3. 支持同步边界start_sync / end_sync_with_stats
"""
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Optional
from core.global_log_system import GlobalLogSystem
class APILogger:
"""API调用日志记录器"""
"""API 日志记录器(兼容旧接口)"""
def __init__(self, log_dir: Optional[str] = None):
"""
初始化日志记录器
def __init__(self,
log_dir: Optional[str] = None,
task_name: Optional[str] = None):
project_root = Path(__file__).parent.parent
Args:
log_dir: 日志目录路径如果为None则使用默认路径
"""
if log_dir is None:
# 默认路径:项目根目录/logs/
project_root = Path(__file__).parent.parent
self.log_dir = project_root / "logs"
resolved_log_dir = project_root / "logs"
else:
self.log_dir = Path(log_dir)
resolved_log_dir = Path(log_dir)
# 确保日志目录存在
self.log_dir.mkdir(exist_ok=True)
inferred_task_name = self._infer_task_name(task_name, resolved_log_dir)
self.core = GlobalLogSystem(task_name=inferred_task_name, log_dir=resolved_log_dir)
@property
def task_name(self) -> str:
return self.core.task_name
@property
def log_dir(self) -> Path:
return self.core.log_dir
def _get_today_log_file(self) -> Path:
"""
获取今天的日志文件路径
"""兼容旧方法:返回当天 jsonl 文件路径"""
return self.core._get_today_log_file()
Returns:
Path: 今天的日志文件路径格式api_log_YYYY-MM-DD.json
"""
today = datetime.now().strftime("%Y-%m-%d")
log_file = self.log_dir / f"api_log_{today}.json"
def start_sync(self,
trigger: str,
metadata: Optional[Dict[str, Any]] = None,
sync_id: Optional[str] = None) -> str:
return self.core.start_sync(trigger=trigger, metadata=metadata, sync_id=sync_id)
# 如果文件不存在,初始化
if not log_file.exists():
with open(log_file, 'w', encoding='utf-8') as f:
json.dump({"records": []}, f, ensure_ascii=False, indent=2)
def end_sync_with_stats(self,
stats: Dict[str, Any],
success: bool,
error_message: Optional[str] = None,
sync_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None) -> None:
self.core.end_sync_with_stats(
stats=stats,
success=success,
error_message=error_message,
sync_id=sync_id,
extra=extra,
)
return log_file
def get_active_sync_id(self) -> Optional[str]:
return self.core._active_sync_id
def log_api_call(self, api_type: str, operation: str,
def log_api_call(self,
api_type: str,
operation: str,
request_data: Dict[str, Any],
response_data: Dict[str, Any],
success: bool = True,
error_message: Optional[str] = None):
error_message: Optional[str] = None,
duration_ms: Optional[int] = None,
extra: Optional[Dict[str, Any]] = None) -> None:
"""
记录API调用使用追加式写入避免读取整个文件
Args:
api_type: API类型 "smartsheet", "tapd", "wework"
operation: 操作名称 "get_records", "create_bug"
request_data: 请求数据包含urlmethodparams等
response_data: 响应数据
success: 是否成功
error_message: 错误信息如果失败
兼容旧接口
- api_type 允许历史值 task1/task2/test
- 内部统一映射为 module: smartsheet/tapd/wework
"""
try:
# 获取今天的日志文件
log_file = self._get_today_log_file()
module = self._resolve_module(api_type, operation, request_data)
# 构造新记录
record = {
"api_type": api_type,
"operation": operation,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"success": success,
"request": request_data,
"response": response_data
}
payload_extra = dict(extra or {})
if api_type not in GlobalLogSystem.ALLOWED_MODULES:
payload_extra["original_api_type"] = api_type
if error_message:
record["error_message"] = error_message
self.core.log_api(
module=module,
operation=operation,
request_data=request_data,
response_data=response_data,
success=success,
error_message=error_message,
duration_ms=duration_ms,
extra=payload_extra,
)
# 使用追加式写入
with open(log_file, 'r+', encoding='utf-8') as f:
# 定位到文件末尾
f.seek(0, 2)
file_size = f.tell()
def _resolve_module(self,
api_type: str,
operation: str,
request_data: Optional[Dict[str, Any]]) -> str:
normalized_type = (api_type or "").strip().lower()
if normalized_type in GlobalLogSystem.ALLOWED_MODULES:
return normalized_type
if file_size == 0:
# 空文件,写入初始结构
f.write('{"records": [\n')
f.write(json.dumps(record, ensure_ascii=False, indent=2))
f.write('\n]}')
else:
# 回退到最后的 ]}
f.seek(file_size - 3)
# 添加逗号和新记录
f.write(',\n')
f.write(json.dumps(record, ensure_ascii=False, indent=2))
f.write('\n]}')
request_url = ""
if isinstance(request_data, dict):
request_url = str(request_data.get("url", "")).lower()
except Exception as e:
# 日志记录失败不应该影响主流程
print(f"⚠ API日志记录失败: {str(e)}")
operation_lower = (operation or "").lower()
if "tapd" in request_url:
return "tapd"
if "wedoc" in request_url or "smartsheet" in request_url:
return "smartsheet"
if "qyapi.weixin.qq.com/cgi-bin/message" in request_url:
return "wework"
if "qyapi.weixin.qq.com/cgi-bin/gettoken" in request_url:
return "wework"
if any(key in operation_lower for key in ["tapd", "bug", "story", "attachment"]):
return "tapd"
if any(key in operation_lower for key in ["wework", "token", "message", "notify"]):
return "wework"
if any(key in operation_lower for key in ["sheet", "record", "field", "validation"]):
return "smartsheet"
return "smartsheet"
def _infer_task_name(self, task_name: Optional[str], log_dir: Path) -> str:
if task_name:
return task_name
dir_name = log_dir.name.lower()
if dir_name == "logs2":
return "task2"
return "task1"
# 全局单例
_global_logger = None
_global_logger: Optional[APILogger] = None
def get_logger() -> APILogger:
"""
获取全局API日志记录器单例
Returns:
APILogger: 日志记录器实例
"""
"""获取任务一默认日志记录器(单例)"""
global _global_logger
if _global_logger is None:
_global_logger = APILogger()
return _global_logger
if __name__ == "__main__":
# 测试代码
print("=== API日志记录器测试 ===\n")
logger = APILogger()
# 测试记录一个成功的API调用
print("测试1: 记录成功的API调用...")
logger.log_api_call(
api_type="smartsheet",
operation="get_records",
request_data={
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records",
"method": "POST",
"params": {"docid": "test123", "sheet_id": "sheet456"}
},
response_data={
"errcode": 0,
"errmsg": "ok",
"records": []
},
success=True
)
print("✓ 成功记录API调用")
# 测试记录一个失败的API调用
print("\n测试2: 记录失败的API调用...")
logger.log_api_call(
api_type="tapd",
operation="create_bug",
request_data={
"url": "https://api.tapd.cn/bugs",
"method": "POST",
"data": {"title": "测试bug"}
},
response_data={
"status": 0,
"info": "参数错误"
},
success=False,
error_message="缺少必填参数workspace_id"
)
print("✓ 成功记录失败的API调用")
log_file = logger._get_today_log_file()
print(f"\n日志文件: {log_file}")
print(f"日志目录: {logger.log_dir}")

View File

@ -608,6 +608,35 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
'error_message': None
}
logger = get_logger()
started_sync_here = False
if not logger.get_active_sync_id():
logger.start_sync(
trigger="task1_run_once_manual",
metadata={
"entry": "src/main.py:run_once",
"test_mode": test_mode,
},
)
started_sync_here = True
def _finalize_sync_and_return() -> Dict:
if started_sync_here:
logger.end_sync_with_stats(
stats={
"scanned_count": result.get('scanned_count', 0),
"valid_count": result.get('valid_count', 0),
"invalid_count": result.get('invalid_count', 0),
"bugs_created": result.get('bugs_created', 0),
"bugs_failed": result.get('bugs_failed', 0),
"writeback_success": result.get('writeback_success', 0),
},
success=result.get('success', False),
error_message=result.get('error_message'),
extra={"source": "run_once"},
)
return result
try:
# 获取配置信息
all_config = config_manager.get_all_config()
@ -789,7 +818,7 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
print("=" * 60)
result['success'] = True
return result
return _finalize_sync_and_return()
except FileNotFoundError as e:
result['error_message'] = f"文件未找到: {e}"
@ -797,7 +826,7 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
print("\n解决方案:")
print(" 1. 检查配置文件是否存在")
print(" 2. 使用 --config 参数指定正确的配置文件路径")
return result
return _finalize_sync_and_return()
except ValueError as e:
result['error_message'] = f"参数错误: {e}"
@ -806,7 +835,7 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
print(" 1. 检查配置文件中的配置项是否完整")
print(" 2. 确保所有必填项都已填写")
print(" 3. 检查access_token是否正确")
return result
return _finalize_sync_and_return()
except Exception as e:
result['error_message'] = f"未预期的错误: {type(e).__name__}: {e}"
@ -816,7 +845,7 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
import traceback
print("\n详细错误信息:")
traceback.print_exc()
return result
return _finalize_sync_and_return()
def main():

View File

@ -26,6 +26,7 @@ from src.config import ConfigManager
from src.token_manager import TokenManager
from src.main import run_once
from src.sync_status import BugStatusSyncer
from src.api_logger import get_logger
class AutoTAPDScheduler:
@ -90,6 +91,14 @@ class AutoTAPDScheduler:
print(f"开始执行开单任务 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
logger = get_logger()
logger.start_sync(
trigger="task1_scheduler_job",
metadata={"entry": "src/scheduler.py:job"}
)
result = None
try:
# 获取access_token
access_token = self.token_manager.get_token()
@ -127,6 +136,20 @@ class AutoTAPDScheduler:
print(f" 错误信息: {result.get('error_message', '未知错误')}")
print("-" * 80)
logger.end_sync_with_stats(
stats={
"scanned_count": result.get('scanned_count', 0),
"valid_count": result.get('valid_count', 0),
"invalid_count": result.get('invalid_count', 0),
"bugs_created": result.get('bugs_created', 0),
"bugs_failed": result.get('bugs_failed', 0),
"writeback_success": result.get('writeback_success', 0),
},
success=result.get('success', False),
error_message=result.get('error_message'),
extra={"source": "scheduler.job"}
)
except Exception as e:
# 捕获所有异常,确保不影响后续执行
self.stats['total_runs'] += 1
@ -144,6 +167,20 @@ class AutoTAPDScheduler:
print("\n详细错误信息:")
traceback.print_exc()
logger.end_sync_with_stats(
stats={
"scanned_count": 0,
"valid_count": 0,
"invalid_count": 0,
"bugs_created": 0,
"bugs_failed": 0,
"writeback_success": 0,
},
success=False,
error_message=f"{type(e).__name__}: {e}",
extra={"source": "scheduler.job"}
)
# 显示下次执行时间
next_run = schedule.idle_seconds()
if next_run is not None:
@ -157,6 +194,14 @@ class AutoTAPDScheduler:
print(f"开始执行bug状态同步 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
logger = get_logger()
logger.start_sync(
trigger="task1_scheduler_sync_job",
metadata={"entry": "src/scheduler.py:sync_job"}
)
result = None
try:
# 获取access_token
access_token = self.token_manager.get_token()
@ -193,6 +238,16 @@ class AutoTAPDScheduler:
print(f" 错误信息: {result.get('error_message', '未知错误')}")
print("-" * 80)
logger.end_sync_with_stats(
stats={
"checked_count": result.get('checked_count', 0),
"updated_count": result.get('updated_count', 0),
},
success=result.get('success', False),
error_message=result.get('error_message'),
extra={"source": "scheduler.sync_job"}
)
except Exception as e:
# 捕获所有异常,确保不影响后续执行
self.stats['total_sync_runs'] += 1
@ -210,6 +265,16 @@ class AutoTAPDScheduler:
print("\n详细错误信息:")
traceback.print_exc()
logger.end_sync_with_stats(
stats={
"checked_count": 0,
"updated_count": 0,
},
success=False,
error_message=f"{type(e).__name__}: {e}",
extra={"source": "scheduler.sync_job"}
)
# 显示下次执行时间
next_run = schedule.idle_seconds()
if next_run is not None:

View File

@ -75,6 +75,21 @@ class SmartSheetAPI:
response.raise_for_status()
result = response.json()
# 先判断业务错误,避免同一次请求出现 success/failure 双记录
if result.get('errcode', 0) != 0:
error_msg = result.get('errmsg', '未知错误')
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=f"errcode={result['errcode']}, errmsg={error_msg}"
)
raise RuntimeError(
f"API调用失败: errcode={result['errcode']}, errmsg={error_msg}"
)
# 记录API调用日志成功
self.logger.log_api_call(
api_type="smartsheet",
@ -92,20 +107,6 @@ class SmartSheetAPI:
print(json.dumps(result, ensure_ascii=False, indent=2))
print("=" * 80)
# 检查企业微信API返回的错误码
if result.get('errcode', 0) != 0:
error_msg = result.get('errmsg', '未知错误')
# 记录API调用日志业务错误
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=f"errcode={result['errcode']}, errmsg={error_msg}"
)
raise RuntimeError(f"API调用失败: errcode={result['errcode']}, errmsg={error_msg}")
return result
except requests.exceptions.Timeout: