From 9c448ed2062662b51f34feb6508170a945c6a5ae Mon Sep 17 00:00:00 2001 From: zelong <2895587166@qq.com> Date: Wed, 24 Dec 2025 16:28:39 +0800 Subject: [PATCH] 'fix_bug_status_and_auto' --- src/api_logger.py | 127 ++++++++++++++ src/api_test.py | 391 +++++++++++++++++++++++++++++++++++++++++-- src/main.py | 78 ++++++--- src/mapper.py | 2 +- src/smartsheet.py | 49 +++++- src/status_mapper.py | 130 ++++++++++++++ src/sync_status.py | 18 +- src/tapd_api.py | 76 ++++++++- 8 files changed, 814 insertions(+), 57 deletions(-) create mode 100644 src/api_logger.py create mode 100644 src/status_mapper.py diff --git a/src/api_logger.py b/src/api_logger.py new file mode 100644 index 0000000..3fecf2b --- /dev/null +++ b/src/api_logger.py @@ -0,0 +1,127 @@ +""" +API调用日志记录模块 +负责记录所有API调用的请求和响应到JSON文件 +""" + +import json +import os +from datetime import datetime +from typing import Dict, Any, Optional +from pathlib import Path + + +class APILogger: + """API调用日志记录器""" + + def __init__(self, log_file: Optional[str] = None): + """ + 初始化日志记录器 + + Args: + log_file: 日志文件路径,如果为None则使用默认路径 + """ + if log_file is None: + # 默认路径:项目根目录/logs/api_log.json + project_root = Path(__file__).parent.parent + log_dir = project_root / "logs" + log_dir.mkdir(exist_ok=True) + self.log_file = log_dir / "api_log.json" + else: + self.log_file = Path(log_file) + + # 确保日志文件存在 + self._init_log_file() + + def _init_log_file(self): + """初始化日志文件""" + if not self.log_file.exists(): + with open(self.log_file, 'w', encoding='utf-8') as f: + json.dump({"records": []}, f, ensure_ascii=False, indent=2) + + def log_api_call(self, api_type: str, operation: str, + request_data: Dict[str, Any], + response_data: Dict[str, Any], + success: bool = True, + error_message: Optional[str] = None): + """ + 记录API调用 + + Args: + api_type: API类型(如 "smartsheet", "tapd", "wework") + operation: 操作名称(如 "get_records", "create_bug") + request_data: 请求数据(包含url、method、params等) + response_data: 响应数据 + success: 是否成功 + error_message: 错误信息(如果失败) + """ + try: + # 读取现有记录 + with open(self.log_file, 'r', encoding='utf-8') as f: + log_data = json.load(f) + + # 添加新记录 + record = { + "api_type": api_type, + "operation": operation, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "success": success, + "request": request_data, + "response": response_data + } + + if error_message: + record["error_message"] = error_message + + log_data["records"].append(record) + + # 写回文件 + with open(self.log_file, 'w', encoding='utf-8') as f: + json.dump(log_data, f, ensure_ascii=False, indent=2) + + except Exception as e: + # 日志记录失败不应该影响主流程 + print(f"⚠ API日志记录失败: {str(e)}") + + +# 全局单例 +_global_logger = None + + +def get_logger() -> APILogger: + """ + 获取全局API日志记录器单例 + + Returns: + APILogger: 日志记录器实例 + """ + global _global_logger + if _global_logger is None: + _global_logger = APILogger() + return _global_logger + + +if __name__ == "__main__": + # 测试代码 + print("=== API日志记录器测试 ===\n") + + logger = APILogger() + + # 测试记录一个成功的API调用 + logger.log_api_call( + api_type="smartsheet", + operation="get_records", + request_data={ + "url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records", + "method": "POST", + "params": {"docid": "test123", "sheet_id": "sheet456"} + }, + response_data={ + "errcode": 0, + "errmsg": "ok", + "records": [] + }, + success=True + ) + + print("✓ 成功记录API调用") + print(f"日志文件: {logger.log_file}") diff --git a/src/api_test.py b/src/api_test.py index d5b6364..9617e71 100644 --- a/src/api_test.py +++ b/src/api_test.py @@ -8,7 +8,15 @@ import requests import json import os +import sys from datetime import datetime +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from src.config import ConfigManager class WeWorkAPITester: @@ -52,6 +60,66 @@ class WeWorkAPITester: except Exception as e: print(f"✗ 记录日志失败: {str(e)}") + def load_token_from_cache(self): + """ + 从缓存文件读取access_token + + Returns: + bool: 是否成功读取token + """ + print("\n=== 从缓存读取access_token ===") + + # token缓存文件路径 + cache_file = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + 'config', + 'token_cache.json' + ) + + try: + # 检查文件是否存在 + if not os.path.exists(cache_file): + print(f"✗ 缓存文件不存在: {cache_file}") + return False + + # 读取缓存文件 + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + + access_token = cache_data.get('access_token') + fetch_time = cache_data.get('fetch_time') + + if not access_token: + print("✗ 缓存文件中没有access_token") + return False + + # 检查token是否过期(7200秒 = 2小时) + import time + current_time = time.time() + elapsed_time = current_time - fetch_time + remaining_time = 7200 - elapsed_time + + if remaining_time <= 0: + print("✗ 缓存的token已过期") + print(f" 获取时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fetch_time))}") + print(f" 已过期 {int(-remaining_time)} 秒") + return False + + # token有效,加载到内存 + self.access_token = access_token + print(f"✓ 成功从缓存读取access_token") + print(f" Token: {self.access_token[:20]}...") + print(f" 获取时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fetch_time))}") + print(f" 剩余有效期: {int(remaining_time)}秒 ({int(remaining_time//60)}分钟)") + return True + + except json.JSONDecodeError: + print(f"✗ 缓存文件格式错误") + return False + except Exception as e: + print(f"✗ 读取缓存失败: {str(e)}") + return False + def get_access_token(self, corpid, corpsecret): """ 获取access_token @@ -267,42 +335,309 @@ class WeWorkAPITester: print(f"✗ 请求异常: {str(e)}") return False + def send_message(self, content): + """ + 发送应用消息 + + Args: + content: 消息内容 + + Returns: + bool: 是否成功 + """ + print("\n=== 发送应用消息 ===") + + if not self.access_token: + print("✗ 请先获取access_token") + return False + + # 从配置文件读取agentid和receivers + try: + config_manager = ConfigManager() + config = config_manager.config + if not config.has_section('wework'): + print("✗ 配置文件缺少[wework]节") + return False + if not config.has_option('wework', 'agentid'): + print("✗ 配置文件[wework]节缺少agentid配置项") + return False + if not config.has_option('wework', 'receivers'): + print("✗ 配置文件[wework]节缺少receivers配置项") + return False + + agentid = config.get('wework', 'agentid').strip() + receivers = config.get('wework', 'receivers').strip() + + print(f"✓ 从配置文件读取agentid: {agentid}") + print(f"✓ 从配置文件读取receivers: {receivers}") + except Exception as e: + print(f"✗ 读取配置失败: {e}") + return False + + url = f"{self.base_url}/message/send" + params = {"access_token": self.access_token} + + # 构造请求体(使用配置文件中的receivers) + data = { + "touser": receivers, + "msgtype": "text", + "agentid": int(agentid), + "text": { + "content": content + }, + "safe": 0, + "enable_id_trans": 0, + "enable_duplicate_check": 0, + "duplicate_check_interval": 1800 + } + + try: + response = requests.post(url, params=params, json=data, timeout=10) + response_data = response.json() + + # 记录API调用 + request_data = { + "url": url, + "params": {"access_token": "***"}, + "body": data + } + self._log_api_call("send_message", request_data, response_data) + + # 检查返回结果 + if response_data.get("errcode") == 0: + print(f"✓ 消息发送成功") + print(f" 消息内容: {content}") + return True + else: + print(f"✗ 发送失败") + print(f" 错误码: {response_data.get('errcode')}") + print(f" 错误信息: {response_data.get('errmsg')}") + return False + + except Exception as e: + print(f"✗ 请求异常: {str(e)}") + return False + + +class TAPDAPITester: + """TAPD API测试类""" + + def __init__(self): + self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json') + self.base_url = "https://tapd-api.bilibili.co/tapd" + self.api_user = None + self.api_password = None + self.workspace_id = None + + def _init_auth(self): + """初始化TAPD认证信息""" + # 从环境变量读取认证信息 + self.api_user = os.environ.get('TAPD_API_USER') + self.api_password = os.environ.get('TAPD_API_PASSWORD') + + if not self.api_user or not self.api_password: + print("✗ TAPD认证信息未设置") + print(" 请设置环境变量:") + print(" - TAPD_API_USER") + print(" - TAPD_API_PASSWORD") + return False + + print(f"✓ TAPD认证信息已加载 (用户: {self.api_user})") + return True + + def _init_workspace_id(self): + """从配置文件读取workspace_id""" + try: + config_manager = ConfigManager() + tapd_config = config_manager.get_tapd_config() + self.workspace_id = tapd_config['workspace_id'] + print(f"✓ 从配置文件读取workspace_id: {self.workspace_id}") + return True + except Exception as e: + print(f"✗ 读取workspace_id失败: {e}") + return False + + def _log_api_call(self, operation, request_data, response_data): + """记录API调用到JSON文件""" + try: + # 读取现有记录 + with open(self.log_file, 'r', encoding='utf-8') as f: + log_data = json.load(f) + + # 添加新记录 + record = { + "operation": operation, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "request": request_data, + "response": response_data + } + log_data["records"].append(record) + + # 写回文件 + with open(self.log_file, 'w', encoding='utf-8') as f: + json.dump(log_data, f, ensure_ascii=False, indent=2) + + print(f"✓ API调用已记录到日志文件") + except Exception as e: + print(f"✗ 记录日志失败: {str(e)}") + + def get_bug_custom_fields(self): + """ + 获取TAPD缺陷的所有字段配置及候选值 + + Returns: + dict: 字段配置信息,失败返回None + """ + print("\n=== 获取TAPD缺陷字段配置 ===") + + # 初始化认证信息 + if not self._init_auth(): + return None + + # 初始化workspace_id + if not self._init_workspace_id(): + return None + + url = f"{self.base_url}/bugs/get_fields_info" + params = { + "workspace_id": self.workspace_id + } + + try: + from requests.auth import HTTPBasicAuth + auth = HTTPBasicAuth(self.api_user, self.api_password) + + print(f"\n正在请求TAPD API...") + print(f" URL: {url}") + print(f" workspace_id: {self.workspace_id}") + + response = requests.get(url, params=params, auth=auth, timeout=30) + response_data = response.json() + + # 记录API调用 + request_data = { + "url": url, + "method": "GET", + "params": params, + "auth_user": self.api_user + } + self._log_api_call("get_bug_custom_fields", request_data, response_data) + + # 检查返回结果 + if response_data.get("status") == 1: + data = response_data.get("data", {}) + print(f"\n✓ 成功获取字段配置") + + # 显示字段统计 + if isinstance(data, dict): + field_count = len(data) + print(f" 共 {field_count} 个字段配置") + + # 保存到单独的文件 + output_file = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + 'logs', + 'tapd_bug_fields.json' + ) + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + print(f" ✓ 字段配置已保存到: {output_file}") + + # 显示部分字段信息 + print(f"\n字段列表预览:") + print("-" * 80) + for idx, (field_name, field_info) in enumerate(list(data.items())[:10], 1): + field_label = field_info.get('label', '(无标签)') + html_type = field_info.get('html_type', '(未知类型)') + print(f" {idx}. {field_name} ({field_label}) - 类型: {html_type}") + + # 如果有候选值,显示 + options = field_info.get('options', []) + if options: + # options可能是字典或列表 + if isinstance(options, dict): + option_list = list(options.values())[:5] + total_count = len(options) + elif isinstance(options, list): + option_list = options[:5] + total_count = len(options) + else: + option_list = [] + total_count = 0 + + if option_list: + print(f" 候选值: {', '.join(str(o) for o in option_list)}" + + (f" ...等{total_count}个" if total_count > 5 else "")) + + if field_count > 10: + print(f" ... 还有 {field_count - 10} 个字段,详见输出文件") + print("-" * 80) + + return data + else: + print(f"\n✗ 获取失败") + print(f" 状态码: {response_data.get('status')}") + print(f" 错误信息: {response_data.get('info', '未知错误')}") + return None + + except Exception as e: + print(f"\n✗ 请求异常: {str(e)}") + import traceback + traceback.print_exc() + return None + def print_menu(): """打印菜单""" print("\n" + "="*50) - print("企业微信智能表格API测试工具") + print("API测试工具") print("="*50) + print("【企业微信API】") print("1. 获取access_token") print("2. 新建文档") print("3. 重命名文档") print("4. 删除文档") - print("5. 查看日志文件") + print("5. 发送应用消息") + print("\n【TAPD API】") + print("6. 获取缺陷字段配置") + print("\n【其他】") + print("7. 查看日志文件") print("0. 退出") print("="*50) def main(): """主函数""" - tester = WeWorkAPITester() + wework_tester = WeWorkAPITester() + tapd_tester = TAPDAPITester() while True: print_menu() - choice = input("\n请选择操作 (0-5): ").strip() + choice = input("\n请选择操作 (0-7): ").strip() if choice == "0": print("\n感谢使用,再见!") break elif choice == "1": - print("\n请输入企业微信认证信息:") - corpid = input("企业ID (corpid): ").strip() - corpsecret = input("应用密钥 (corpsecret): ").strip() + print("\n=== 获取access_token ===") + print("1. 从API获取(需要输入corpid和corpsecret)") + print("2. 从缓存读取") + sub_choice = input("请选择 (1/2): ").strip() - if corpid and corpsecret: - tester.get_access_token(corpid, corpsecret) + if sub_choice == "1": + print("\n请输入企业微信认证信息:") + corpid = input("企业ID (corpid): ").strip() + corpsecret = input("应用密钥 (corpsecret): ").strip() + + if corpid and corpsecret: + wework_tester.get_access_token(corpid, corpsecret) + else: + print("✗ corpid和corpsecret不能为空") + elif sub_choice == "2": + wework_tester.load_token_from_cache() else: - print("✗ corpid和corpsecret不能为空") + print("✗ 无效的选择") elif choice == "2": doc_name = input("\n请输入文档名称: ").strip() @@ -317,14 +652,14 @@ def main(): doc_type_input = input("请选择文档类型 (直接回车默认为10): ").strip() doc_type = int(doc_type_input) if doc_type_input else 10 - tester.create_doc(doc_name, doc_type) + wework_tester.create_doc(doc_name, doc_type) elif choice == "3": docid = input("\n请输入文档ID: ").strip() new_name = input("请输入新的文档名称: ").strip() if docid and new_name: - tester.rename_doc(docid, new_name) + wework_tester.rename_doc(docid, new_name) else: print("✗ 文档ID和新名称不能为空") @@ -334,16 +669,29 @@ def main(): if docid: confirm = input(f"确认要删除文档 {docid} 吗? (y/n): ").strip().lower() if confirm == 'y': - tester.delete_doc(docid) + wework_tester.delete_doc(docid) else: print("已取消删除操作") else: print("✗ 文档ID不能为空") elif choice == "5": + # 发送应用消息 + content = input("\n请输入消息内容: ").strip() + if not content: + print("✗ 消息内容不能为空") + continue + + wework_tester.send_message(content) + + elif choice == "6": + # 获取TAPD缺陷字段配置 + tapd_tester.get_bug_custom_fields() + + elif choice == "7": print("\n=== 查看日志文件 ===") try: - with open(tester.log_file, 'r', encoding='utf-8') as f: + with open(wework_tester.log_file, 'r', encoding='utf-8') as f: log_data = json.load(f) records = log_data.get("records", []) @@ -355,12 +703,21 @@ def main(): print(f"记录 {i}:") print(f" 操作: {record.get('operation')}") print(f" 时间: {record.get('timestamp')}") - print(f" 响应: errcode={record.get('response', {}).get('errcode')}, " - f"errmsg={record.get('response', {}).get('errmsg')}") + + # 根据不同的操作类型显示不同的响应信息 + response = record.get('response', {}) + if 'errcode' in response: + # 企业微信API响应 + print(f" 响应: errcode={response.get('errcode')}, " + f"errmsg={response.get('errmsg')}") + elif 'status' in response: + # TAPD API响应 + print(f" 响应: status={response.get('status')}, " + f"info={response.get('info', 'ok')}") print() if len(records) > 10: - print(f"(仅显示最近10条,完整日志请查看: {tester.log_file})") + print(f"(仅显示最近10条,完整日志请查看: {wework_tester.log_file})") except Exception as e: print(f"✗ 读取日志文件失败: {str(e)}") diff --git a/src/main.py b/src/main.py index e61d4cd..35fa2c5 100644 --- a/src/main.py +++ b/src/main.py @@ -18,6 +18,7 @@ from src.validator import RecordValidator from src.tapd_api import TAPDApi from src.mapper import FieldMapper, BugCreationResult from src.token_manager import TokenManager +from src.status_mapper import BugStatusMapper def parse_arguments(): @@ -192,8 +193,13 @@ def scan_and_validate_records(access_token: str, docid: str, verbose: bool = Fal if len(records) == 0: print(" ✓ 没有待开单的记录") return { - 'valid_records': [], - 'invalid_records': [] + 'validation_result': { + 'valid_records': [], + 'invalid_records': [] + }, + 'fields': fields, + 'field_mapping': field_mapping, + 'sheet_id': sheet_id } # 5. 提取记录数据并校验 @@ -288,13 +294,14 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test # 生成bug URL bug_url = tapd_api.get_bug_url(bug_id) - # 获取bug状态 - bug_status = bug_info.get('status', '新建') + # 获取bug状态(英文)并映射为中文 + bug_status_en = bug_info.get('status', 'new') + bug_status_cn = BugStatusMapper.to_chinese(bug_status_en) print(f" ✓ 创建成功!") print(f" Bug ID: {bug_id}") print(f" Bug URL: {bug_url}") - print(f" Bug 状态: {bug_status}") + print(f" Bug 状态: {bug_status_cn} ({bug_status_en})") # 记录成功结果 result = BugCreationResult( @@ -303,8 +310,8 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test bug_id=bug_id, bug_url=bug_url ) - # 添加bug状态信息 - result.bug_status = bug_status + # 添加bug状态信息(存储中文) + result.bug_status = bug_status_cn success_results.append(result) except ValueError as e: @@ -415,7 +422,7 @@ def write_back_results(access_token: str, docid: str, sheet_id: str, "text": result.bug_id, "link": result.bug_url }], - "bug状态": [{"type": "text", "text": getattr(result, 'bug_status', '新建')}] + "bug状态": [{"type": "text", "text": getattr(result, 'bug_status', '新')}] } } update_records.append(record_update) @@ -525,7 +532,10 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F if len(validation_result['valid_records']) > 0: validator.print_valid_records_summary(validation_result['valid_records']) - # 3. 创建TAPD bug单 + # 3. 创建TAPD bug单(仅针对校验通过的记录) + creation_success_results = [] + creation_failed_results = [] + if len(validation_result['valid_records']) > 0: creation_result = create_tapd_bugs( validation_result['valid_records'], @@ -538,20 +548,39 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F result['bugs_created'] = len(creation_result['success_results']) result['bugs_failed'] = len(creation_result['failed_results']) - # 4. 回写结果到智能表格 - sheet_id = scan_result.get('sheet_id') - if sheet_id: - writeback_result = write_back_results( - access_token, - all_config['smartsheet']['docid'], - sheet_id, - creation_result['success_results'], - creation_result['failed_results'], - test_mode - ) + creation_success_results = creation_result['success_results'] + creation_failed_results = creation_result['failed_results'] - # 更新统计信息 - result['writeback_success'] = writeback_result.get('success_count', 0) + writeback_result.get('failed_count', 0) + # 4. 处理校验失败的记录(转换为BugCreationResult格式) + validation_failed_results = [] + for invalid_record in validation_result['invalid_records']: + record_data = invalid_record['record_data'] + missing_fields = invalid_record['missing_fields'] + + validation_failed_result = BugCreationResult( + record_id=record_data.get('record_id', '未知'), + success=False, + error_message=f"校验失败,缺失字段: {', '.join(missing_fields)}" + ) + validation_failed_results.append(validation_failed_result) + + # 5. 回写结果到智能表格(包括校验失败的记录) + sheet_id = scan_result.get('sheet_id') + if sheet_id and (len(creation_success_results) > 0 or len(creation_failed_results) > 0 or len(validation_failed_results) > 0): + # 合并创建失败和校验失败的记录 + all_failed_results = creation_failed_results + validation_failed_results + + writeback_result = write_back_results( + access_token, + all_config['smartsheet']['docid'], + sheet_id, + creation_success_results, + all_failed_results, + test_mode + ) + + # 更新统计信息 + result['writeback_success'] = writeback_result.get('success_count', 0) + writeback_result.get('failed_count', 0) # 显示最终统计 print("\n" + "=" * 60) @@ -561,9 +590,12 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F print(f"✓ 成功获取字段信息") print(f"✓ 成功筛选待开单记录") print(f"✓ 成功校验必填项") - print(f"✓ 成功创建 {result['bugs_created']} 个TAPD bug单") + if result['bugs_created'] > 0: + print(f"✓ 成功创建 {result['bugs_created']} 个TAPD bug单") if result['bugs_failed'] > 0: print(f"⚠ {result['bugs_failed']} 个bug单创建失败") + if len(validation_failed_results) > 0: + print(f"⚠ {len(validation_failed_results)} 条记录校验失败") if result['writeback_success'] > 0: print(f"✓ 成功回写 {result['writeback_success']} 条记录到智能表格") print("=" * 60) diff --git a/src/mapper.py b/src/mapper.py index af429ab..390fcc3 100644 --- a/src/mapper.py +++ b/src/mapper.py @@ -86,7 +86,7 @@ class FieldMapper: priority = record_data.get('优先级', '').strip() if not priority: raise ValueError("优先级不能为空") - tapd_data['priority'] = priority + tapd_data['priority_label'] = priority # 4. 严重程度(必填) severity = record_data.get('严重程度', '').strip() diff --git a/src/smartsheet.py b/src/smartsheet.py index f3f885e..9b18ed3 100644 --- a/src/smartsheet.py +++ b/src/smartsheet.py @@ -5,6 +5,7 @@ import requests from typing import Dict, List, Optional, Any +from src.api_logger import get_logger class SmartSheetAPI: @@ -26,6 +27,7 @@ class SmartSheetAPI: self.docid = docid self.test_mode = test_mode self.session = requests.Session() + self.logger = get_logger() def _make_request(self, endpoint: str, method: str = "POST", data: Optional[Dict] = None) -> Dict: """ @@ -44,6 +46,13 @@ class SmartSheetAPI: """ url = f"{self.BASE_URL}/{endpoint}?access_token={self.access_token}" + # 准备日志记录的请求数据(隐藏access_token) + log_request_data = { + "url": f"{self.BASE_URL}/{endpoint}", + "method": method, + "data": data + } + # 测试模式:显示请求信息 if self.test_mode: print("\n" + "=" * 80) @@ -65,12 +74,20 @@ class SmartSheetAPI: response.raise_for_status() result = response.json() + # 记录API调用日志(成功) + self.logger.log_api_call( + api_type="smartsheet", + operation=endpoint, + request_data=log_request_data, + response_data=result, + success=True + ) + # 测试模式:显示响应信息 if self.test_mode: import json print(f"\n响应状态码: {response.status_code}") print(f"响应数据:") - # 测试模式下显示完整的响应数据 print(json.dumps(result, ensure_ascii=False, indent=2)) print("=" * 80) @@ -82,9 +99,29 @@ class SmartSheetAPI: return result except requests.exceptions.Timeout: - raise RuntimeError(f"API请求超时: {endpoint}") + error_msg = f"API请求超时: {endpoint}" + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="smartsheet", + operation=endpoint, + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + raise RuntimeError(error_msg) except requests.exceptions.RequestException as e: - raise RuntimeError(f"API请求失败: {e}") + error_msg = f"API请求失败: {e}" + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="smartsheet", + operation=endpoint, + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + raise RuntimeError(error_msg) def get_sheet_list(self) -> List[Dict]: """ @@ -299,12 +336,12 @@ class SmartSheetAPI: print(f" ✓ 过滤后找到 {len(all_records)} 条已开单记录") - # 过滤掉终态的记录(bug状态为rejected、closed或空) + # 过滤掉终态的记录(bug状态为"取消"、"验证通过"或空) filtered_records = [] skipped_count = 0 - # 定义终态列表 - terminal_statuses = ['rejected', 'closed'] + # 定义终态列表(中文) + terminal_statuses = ['取消', '验证通过'] for record in all_records: bug_status = self.get_field_value_by_title(record, 'bug状态') diff --git a/src/status_mapper.py b/src/status_mapper.py new file mode 100644 index 0000000..0fc9d90 --- /dev/null +++ b/src/status_mapper.py @@ -0,0 +1,130 @@ +""" +TAPD Bug状态映射模块 +负责将TAPD API返回的英文状态映射为中文显示值 +""" + +from typing import Optional + + +class BugStatusMapper: + """Bug状态映射器""" + + # TAPD Bug状态映射表:英文key -> 中文显示值 + STATUS_MAP = { + "new": "新", + "in_progress": "处理中", + "resolved": "已解决 待验证", + "reopened": "重新打开", + "rejected": "取消", + "closed": "验证通过" + } + + @classmethod + def to_chinese(cls, english_status: str) -> str: + """ + 将英文状态转换为中文 + + Args: + english_status: TAPD API返回的英文状态(如 "new", "in_progress") + + Returns: + str: 中文状态(如 "新", "处理中") + 如果状态未知,返回原始值 + """ + if not english_status: + return "未知" + + # 转换为小写并去除空格 + status_key = english_status.strip().lower() + + # 查找映射 + chinese_status = cls.STATUS_MAP.get(status_key) + + if chinese_status: + return chinese_status + else: + # 如果找不到映射,返回原始值(可能是新增的状态) + return english_status + + @classmethod + def to_english(cls, chinese_status: str) -> Optional[str]: + """ + 将中文状态转换为英文(反向映射) + + Args: + chinese_status: 中文状态(如 "新", "处理中") + + Returns: + Optional[str]: 英文状态(如 "new", "in_progress") + 如果状态未知,返回None + """ + if not chinese_status: + return None + + # 反向查找 + for english, chinese in cls.STATUS_MAP.items(): + if chinese == chinese_status.strip(): + return english + + return None + + @classmethod + def is_terminal_status(cls, status: str) -> bool: + """ + 判断是否为终态状态(不需要继续同步的状态) + + Args: + status: 状态值(支持中文或英文) + + Returns: + bool: 是否为终态 + """ + # 终态列表(英文) + terminal_statuses_en = ["rejected", "closed"] + + # 终态列表(中文) + terminal_statuses_cn = ["取消", "验证通过"] + + status_lower = status.strip().lower() + + return (status_lower in terminal_statuses_en or + status.strip() in terminal_statuses_cn) + + @classmethod + def get_all_statuses(cls) -> dict: + """ + 获取所有状态映射 + + Returns: + dict: 完整的状态映射字典 + """ + return cls.STATUS_MAP.copy() + + +if __name__ == "__main__": + # 测试代码 + print("=== TAPD Bug状态映射测试 ===\n") + + # 测试英文转中文 + print("【英文 -> 中文】") + test_statuses = ["new", "in_progress", "resolved", "reopened", "rejected", "closed", "unknown"] + for status in test_statuses: + chinese = BugStatusMapper.to_chinese(status) + print(f" {status:<15} -> {chinese}") + + print("\n【中文 -> 英文】") + test_statuses_cn = ["新", "处理中", "已解决 待验证", "重新打开", "取消", "验证通过", "未知状态"] + for status in test_statuses_cn: + english = BugStatusMapper.to_english(status) + print(f" {status:<20} -> {english}") + + print("\n【终态判断】") + test_all = ["new", "新", "rejected", "取消", "closed", "验证通过", "in_progress", "处理中"] + for status in test_all: + is_terminal = BugStatusMapper.is_terminal_status(status) + print(f" {status:<20} -> {'终态' if is_terminal else '非终态'}") + + print("\n【所有状态映射】") + all_statuses = BugStatusMapper.get_all_statuses() + for en, cn in all_statuses.items(): + print(f" {en:<15} -> {cn}") diff --git a/src/sync_status.py b/src/sync_status.py index 4d258d2..5f1c559 100644 --- a/src/sync_status.py +++ b/src/sync_status.py @@ -13,6 +13,7 @@ sys.path.insert(0, str(project_root)) from src.smartsheet import SmartSheetAPI from src.tapd_api import TAPDApi +from src.status_mapper import BugStatusMapper class BugStatusSyncer: @@ -135,19 +136,22 @@ class BugStatusSyncer: # 查询TAPD的最新状态 print(f" → 正在查询TAPD最新状态...") bug_info = self.tapd_api.get_bug(bug_id) - latest_status = bug_info.get('status', '') + latest_status_en = bug_info.get('status', '') - print(f" ✓ TAPD最新状态: {latest_status}") + # 将英文状态映射为中文 + latest_status_cn = BugStatusMapper.to_chinese(latest_status_en) - # 对比状态是否变化 - if latest_status != current_status: - print(f" ⚠ 状态已变化: {current_status} → {latest_status}") + print(f" ✓ TAPD最新状态: {latest_status_cn} ({latest_status_en})") - # 添加到更新列表 + # 对比状态是否变化(使用中文状态对比) + if latest_status_cn != current_status: + print(f" ⚠ 状态已变化: {current_status} → {latest_status_cn}") + + # 添加到更新列表(回写中文状态) update_record = { "record_id": record_id, "values": { - "bug状态": [{"type": "text", "text": latest_status}] + "bug状态": [{"type": "text", "text": latest_status_cn}] } } updates.append(update_record) diff --git a/src/tapd_api.py b/src/tapd_api.py index 8fd5fba..7d767e0 100644 --- a/src/tapd_api.py +++ b/src/tapd_api.py @@ -7,6 +7,7 @@ import os import requests from typing import Dict, Optional, Any from requests.auth import HTTPBasicAuth +from src.api_logger import get_logger class TAPDApi: @@ -44,6 +45,9 @@ class TAPDApi: # 设置Basic Auth self.auth = HTTPBasicAuth(self.api_user, self.api_password) + # 初始化日志记录器 + self.logger = get_logger() + print(f" ✓ TAPD API初始化完成 (workspace_id: {workspace_id})") if test_mode: print(f" ⚠ 测试模式已启用:将显示所有TAPD API调用的详细信息") @@ -67,6 +71,15 @@ class TAPDApi: """ url = f"{self.BASE_URL}/{endpoint}" + # 准备日志记录的请求数据(隐藏认证信息) + log_request_data = { + "url": url, + "method": method, + "params": params, + "data": data, + "auth_user": self.api_user + } + # 测试模式:显示请求信息 if self.test_mode: print("\n" + "=" * 80) @@ -129,21 +142,78 @@ class TAPDApi: # 检查TAPD API返回的状态 if result.get('status') != 1: error_msg = result.get('info', '未知错误') + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="tapd", + operation=endpoint, + request_data=log_request_data, + response_data=result, + success=False, + error_message=error_msg + ) raise RuntimeError(f"TAPD API调用失败: {error_msg}") + # 记录API调用日志(成功) + self.logger.log_api_call( + api_type="tapd", + operation=endpoint, + request_data=log_request_data, + response_data=result, + success=True + ) + return result except requests.exceptions.Timeout: - raise RuntimeError(f"TAPD API请求超时: {endpoint}") + error_msg = f"TAPD API请求超时: {endpoint}" + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="tapd", + operation=endpoint, + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + raise RuntimeError(error_msg) except requests.exceptions.HTTPError as e: error_msg = f"TAPD API HTTP错误: {e}" if hasattr(e.response, 'text'): error_msg += f"\n响应内容: {e.response.text[:200]}" + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="tapd", + operation=endpoint, + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) raise RuntimeError(error_msg) except requests.exceptions.RequestException as e: - raise RuntimeError(f"TAPD API请求失败: {e}") + error_msg = f"TAPD API请求失败: {e}" + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="tapd", + operation=endpoint, + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + raise RuntimeError(error_msg) except ValueError as e: - raise RuntimeError(f"TAPD API响应解析失败: {e}") + error_msg = f"TAPD API响应解析失败: {e}" + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="tapd", + operation=endpoint, + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + raise RuntimeError(error_msg) def create_bug(self, bug_data: Dict[str, Any]) -> Dict: """