diff --git a/TAPD接口文档.md b/TAPD接口文档.md index cfcc281..4d40c82 100644 --- a/TAPD接口文档.md +++ b/TAPD接口文档.md @@ -1,6 +1,6 @@ -**文档内地址为tapd-api.bilibili.co/tapd请修改为公司地址 https://tapd-api.bilibili.co/tapd** +**文档内地址为tapd-api.bilibili.co/tapd请修改为公司地址 https://tapd-api.bilibili.co/tapd/** # 过滤器FilterSpec @@ -572,32 +572,32 @@ curl -u 'api_user:api_password' -d 'name=story_created_by_api&workspace_id=10158 返回缺陷所有字段及候选值(枚举值),即通常理解的字段的 "英文Key" 和 "中文值". -# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#url)url +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#url)url ``` https://tapd-api.bilibili.co/bugs/get_fields_info ``` -# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#支持格式)支持格式 +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#支持格式)支持格式 JSON/XML(默认JSON格式) -# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#http请求方式)HTTP请求方式 +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#http请求方式)HTTP请求方式 GET -# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求数限制)请求数限制 +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求数限制)请求数限制 默认返回所有数据 -# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求参数)请求参数 +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求参数)请求参数 | 字段名 | 必选 | 类型及范围 | 说明 | 特殊规则 | | :----------: | :--: | :--------: | :----------------------------------------------------------: | :------: | | workspace_id | `是` | integer | 项目ID | | | all_options | 否 | integer | 是否也返回已关闭的选项。all_options=1 则返回。默认是 0,不返回,与TAPD界面对齐 | | -# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#调用示例及返回结果)调用示例及返回结果 +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#调用示例及返回结果)调用示例及返回结果 ## [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#获取项目下的缺陷字段)获取项目下的缺陷字段 @@ -2527,4 +2527,194 @@ curl -u 'api_user:api_password' 'https://tapd-api.bilibili.co/tapd/bugs?workspac ], "info": "success" } -``` \ No newline at end of file +``` + + + +# 附件 + +## 获取附件 + +### 说明 + +返回符合查询条件的所有的附件 + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#url)url + +``` +https://tapd-api.bilibili.co/tapd/attachments +``` + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#支持格式)支持格式 + +JSON/XML(默认JSON格式) + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#http请求方式)HTTP请求方式 + +GET + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#请求数限制)请求数限制 + +默认返回 30 条。可通过传 limit 参数设置,最大取 200。也可以传 page 参数翻页 + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#请求参数)请求参数 + +| 字段名 | 必选 | 类型及范围 | 说明 | 特殊规则 | +| :----------: | :--: | :--------: | :--------: | :------: | +| workspace_id | `是` | integer | 项目ID | | +| id | 否 | integer | ID | | +| type | 否 | string | 类型 | | +| entry_id | 否 | integer | 依赖对象ID | | +| filename | 否 | integer | 附件名称 | | +| owner | 否 | string | 上传人 | | + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#调用示例及返回结果)调用示例及返回结果 + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#curl-使用-basic-auth-鉴权调用示例)curl 使用 Basic Auth 鉴权调用示例 + +``` +curl -u 'api_user:api_password' 'https://api.tapd.cn/attachments?workspace_id=10104801' +``` + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#返回结果)返回结果 + +```json +{ + "status": 1, + "data": [ + { + "Attachment": { + "id": "1210104801000028203", + "type": "wiki_description", + "entry_id": "6100014242115511668", + "filename": "OneDrive.mp4", + "content_type": "video/mp4", + "created": "2021-04-08 15:51:27", + "workspace_id": "10104801", + "owner": "anyechen" + } + }, + { + "Attachment": { + "id": "1210104801000002153", + "type": "wiki", + "entry_id": "1210104801000017645", + "filename": "raingeek.jpg", + "content_type": "image/jpeg", + "created": "2017-11-23 17:01:36", + "workspace_id": "10104801", + "owner": "anyechen" + } + } + ], + "info": "success" +} +``` + +### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#附件字段说明)附件字段说明 + +#### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#附件重要字段说明)附件重要字段说明 + +| 字段 | 说明 | +| :----------: | :--------: | +| id | 附件ID | +| type | 类型 | +| entry_id | 依赖对象ID | +| created | 创建时间 | +| filename | 附件名称 | +| content_type | 内容类型 | +| workspace_id | 项目ID | +| owner | 上传人 | + +. + +## 上传附件 + +### 接口概览 + +- **API 地址**: `https://tapd-api.bilibili.co/tapd/files/upload_attachment` +- **请求方式**: `POST` +- **Content-Type**: `multipart/form-data` + +### **请求参数** + +| 字段名 | 是否必填 | 类型及范围 | 说明 | +| ------------ | -------- | ---------- | ---------------- | +| workspace_id | 是 | integer | 项目ID | +| file | 是 | 文件 | 文件 | +| type | 是 | string | story/bug/task | +| entry_id | 是 | integer | 需求/缺陷/任务ID | +| owner | 否 | string | 附件创建人昵称 | +| overwrite | 否 | bool | 同名文件是否覆盖 | + + + +### **调用示例及返回结果** + +#### **把本地的 uu.jpg 上传到项目 755** + +### **curl 使用 Basic Auth 鉴权调用示例** + + + +``` +curl -u 'api_user:api_password' -F 'workspace_id=755' -F 'type=task' -F 'entry_id=1000000755859140551' -F 'file=@uu.jpg' 'api.tapd.cn/files/upload_attachment' +``` + + + + + + + +### **curl 使用 OAuth Access Token 鉴权调用示例** + + + +``` +curl -H 'Authorization: Bearer ACCESS_TOKEN' -F 'workspace_id=755' -F 'type=task' -F 'entry_id=1000000755859140551' -F 'file=@uu.jpg' 'api.tapd.cn/files/upload_attachment' +``` + + + + + +### **返回结果** + + + +``` +{ + "status": 1, + "data": { + "Attachment": { + "id": "1000000755503455439", + "type": "task", + "entry_id": 1000000755859140551, + "filename": "uu.jpg", + "description": "", + "content_type": "image/jpeg", + "created": "2021-09-07 21:36:08", + "workspace_id": 755, + "owner": "" + } + }, + "info": "success" +} + +``` + +### **字段说明** + +#### **返回字段说明** + +| 字段 | 说明 | +| ------------ | ---------------------------- | +| type | 业务对象类型:stroy/bug/task | +| entry_id | 业务对象ID | +| filename | 文件名 | +| description | 附件描述 | +| content_type | 文件类型 | +| created | 创建时间 | +| workspace_id | 项目ID | +| owner | 附件上传人 | \ No newline at end of file diff --git a/config/config.ini b/config/config.ini index cad2c68..1f69308 100644 --- a/config/config.ini +++ b/config/config.ini @@ -14,6 +14,20 @@ docid = dcRybSHojZR9-b5ePgDp33yr29bQy6BtQiVJ-nSGUM-ot6FSpq-TGW9jEn_f7ORLcFWRj9zv [Schedule] # 开单扫描频率(分钟) -scan_interval = 60 +scan_interval = 30 # bug状态同步频率(分钟) -sync_interval = 60 +sync_interval = 30 + +[Attachment] +# 临时文件目录(相对于项目根目录) +temp_dir = temp/attachments +# 单个文件大小限制(MB) +max_file_size = 150 +# 下载超时时间(秒) +download_timeout = 60 +# 上传超时时间(秒) +upload_timeout = 120 +# 下载失败重试次数 +download_retry = 3 +# 上传失败重试次数 +upload_retry = 3 diff --git a/config/token_cache.json b/config/token_cache.json index 791a953..3a16428 100644 --- a/config/token_cache.json +++ b/config/token_cache.json @@ -1,4 +1,4 @@ { - "access_token": "t7JYQBuy7p3Wg_hRj6xSchih9NRzuactFsxY7U3CMJG3Tkt5cm-BUvO0-e74oMUbZG5s4ouahAdB-AHT2eSljlrD1YzjOnxAvzbOjBFrGT-yEArT9fU6mli8uhBmBk7BteKfHZStZ7d3cs1bGSTnjkNS5vVpyxUu4gcLT8Ya1S9DcMdCBrKnBwmLDcoI2_TkLvKhW67b_zo2EVa0erZhHlL2klT0ajyVhwfB0aeXhF8", - "fetch_time": 1766572400.099746 + "access_token": "SPFfB9jMxleGiJn76FQH6v5pploseAJXTCVkTVVxl1_PEmHZJiqXsNGpEyGAK4qmYe3WRxYJ57xgCQLRCHopVfeoDfP87IgxVCytCqQABGES5ndG05SVkrI-9evg8Z4kbstlsiRMmfPGGGoNUgL1kUoZc2No0FYytm8FTfulnAXfiTExzoF8OCTdEPc9mA0g8JKFhlkiS2F0agBESS_2_ewbcZvA0i44-ChTKRBdRa0", + "fetch_time": 1767599732.4166858 } \ No newline at end of file diff --git a/src/api_logger.py b/src/api_logger.py index 3fecf2b..6f25a1d 100644 --- a/src/api_logger.py +++ b/src/api_logger.py @@ -13,38 +13,47 @@ from pathlib import Path class APILogger: """API调用日志记录器""" - def __init__(self, log_file: Optional[str] = None): + def __init__(self, log_dir: Optional[str] = None): """ 初始化日志记录器 Args: - log_file: 日志文件路径,如果为None则使用默认路径 + log_dir: 日志目录路径,如果为None则使用默认路径 """ - if log_file is None: - # 默认路径:项目根目录/logs/api_log.json + if log_dir is None: + # 默认路径:项目根目录/logs/ project_root = Path(__file__).parent.parent - log_dir = project_root / "logs" - log_dir.mkdir(exist_ok=True) - self.log_file = log_dir / "api_log.json" + self.log_dir = project_root / "logs" else: - self.log_file = Path(log_file) + self.log_dir = Path(log_dir) - # 确保日志文件存在 - self._init_log_file() + # 确保日志目录存在 + self.log_dir.mkdir(exist_ok=True) - def _init_log_file(self): - """初始化日志文件""" - if not self.log_file.exists(): - with open(self.log_file, 'w', encoding='utf-8') as f: + def _get_today_log_file(self) -> Path: + """ + 获取今天的日志文件路径 + + Returns: + Path: 今天的日志文件路径(格式:api_log_YYYY-MM-DD.json) + """ + today = datetime.now().strftime("%Y-%m-%d") + log_file = self.log_dir / f"api_log_{today}.json" + + # 如果文件不存在,初始化 + if not log_file.exists(): + with open(log_file, 'w', encoding='utf-8') as f: json.dump({"records": []}, f, ensure_ascii=False, indent=2) + return log_file + def log_api_call(self, api_type: str, operation: str, request_data: Dict[str, Any], response_data: Dict[str, Any], success: bool = True, error_message: Optional[str] = None): """ - 记录API调用 + 记录API调用(使用追加式写入,避免读取整个文件) Args: api_type: API类型(如 "smartsheet", "tapd", "wework") @@ -55,11 +64,10 @@ class APILogger: error_message: 错误信息(如果失败) """ try: - # 读取现有记录 - with open(self.log_file, 'r', encoding='utf-8') as f: - log_data = json.load(f) + # 获取今天的日志文件 + log_file = self._get_today_log_file() - # 添加新记录 + # 构造新记录 record = { "api_type": api_type, "operation": operation, @@ -72,11 +80,24 @@ class APILogger: if error_message: record["error_message"] = error_message - log_data["records"].append(record) + # 使用追加式写入 + with open(log_file, 'r+', encoding='utf-8') as f: + # 定位到文件末尾 + f.seek(0, 2) + file_size = f.tell() - # 写回文件 - with open(self.log_file, 'w', encoding='utf-8') as f: - json.dump(log_data, f, ensure_ascii=False, indent=2) + if file_size == 0: + # 空文件,写入初始结构 + f.write('{"records": [\n') + f.write(json.dumps(record, ensure_ascii=False, indent=2)) + f.write('\n]}') + else: + # 回退到最后的 ]} + f.seek(file_size - 3) + # 添加逗号和新记录 + f.write(',\n') + f.write(json.dumps(record, ensure_ascii=False, indent=2)) + f.write('\n]}') except Exception as e: # 日志记录失败不应该影响主流程 @@ -107,6 +128,7 @@ if __name__ == "__main__": logger = APILogger() # 测试记录一个成功的API调用 + print("测试1: 记录成功的API调用...") logger.log_api_call( api_type="smartsheet", operation="get_records", @@ -122,6 +144,27 @@ if __name__ == "__main__": }, success=True ) - print("✓ 成功记录API调用") - print(f"日志文件: {logger.log_file}") + + # 测试记录一个失败的API调用 + print("\n测试2: 记录失败的API调用...") + logger.log_api_call( + api_type="tapd", + operation="create_bug", + request_data={ + "url": "https://api.tapd.cn/bugs", + "method": "POST", + "data": {"title": "测试bug"} + }, + response_data={ + "status": 0, + "info": "参数错误" + }, + success=False, + error_message="缺少必填参数workspace_id" + ) + print("✓ 成功记录失败的API调用") + + log_file = logger._get_today_log_file() + print(f"\n日志文件: {log_file}") + print(f"日志目录: {logger.log_dir}") diff --git a/src/api_test.py b/src/api_test.py index 9617e71..d0d5bcb 100644 --- a/src/api_test.py +++ b/src/api_test.py @@ -586,6 +586,333 @@ class TAPDAPITester: traceback.print_exc() return None + def upload_attachment(self, file_path, entry_type, entry_id, owner=None, overwrite=False): + """ + 上传附件到TAPD + + 根据TAPD官方文档实现:https://tapd-api.bilibili.co/tapd/files/upload_attachment + + Args: + file_path: 本地文件路径 + entry_type: 业务对象类型,必填,可选值: story/bug/task + entry_id: 业务对象ID(需求/缺陷/任务ID),必填 + owner: 附件创建人昵称 (可选) + overwrite: 同名文件是否覆盖 (可选,默认False) + + Returns: + dict: 上传成功返回附件信息,失败返回None + """ + print("\n=== 上传附件到TAPD ===") + + # 初始化认证信息 + if not self._init_auth(): + return None + + # 初始化workspace_id + if not self._init_workspace_id(): + return None + + # 验证必需参数 + if not entry_type: + print("✗ entry_type 参数不能为空") + print(" 可选值: story/bug/task") + return None + + if entry_type not in ['story', 'bug', 'task']: + print(f"✗ entry_type 参数值无效: {entry_type}") + print(" 可选值: story/bug/task") + return None + + if not entry_id: + print("✗ entry_id 参数不能为空") + print(" 需要提供需求/缺陷/任务的ID") + return None + + # 检查文件是否存在 + if not os.path.exists(file_path): + print(f"✗ 文件不存在: {file_path}") + return None + + # 获取文件信息 + filename = os.path.basename(file_path) + file_size = os.path.getsize(file_path) + content_type = self._get_content_type(filename) + + print(f"\n【文件信息】") + print(f" 文件名: {filename}") + print(f" 文件路径: {file_path}") + print(f" 文件大小: {file_size} 字节 ({file_size/1024:.2f} KB)") + print(f" Content-Type: {content_type}") + + # 使用正确的API端点 + url = f"{self.base_url}/files/upload_attachment" + + # 准备表单数据(按照文档要求) + data = { + 'workspace_id': str(self.workspace_id), + 'type': entry_type, + 'entry_id': str(entry_id) + } + + # 添加可选参数 + if owner: + data['owner'] = owner + if overwrite: + data['overwrite'] = 'true' if overwrite else 'false' + + try: + from requests.auth import HTTPBasicAuth + auth = HTTPBasicAuth(self.api_user, self.api_password) + + print(f"\n【请求信息】") + print(f" 请求URL: {url}") + print(f" 请求方法: POST") + print(f" Content-Type: multipart/form-data") + print(f" 认证方式: Basic Auth") + print(f" 认证用户: {self.api_user}") + print(f"\n【表单数据】") + print(f" workspace_id: {self.workspace_id}") + print(f" type: {entry_type}") + print(f" entry_id: {entry_id}") + if owner: + print(f" owner: {owner}") + if overwrite: + print(f" overwrite: {overwrite}") + print(f" file: {filename} ({content_type})") + + # 打开文件并上传 + print(f"\n【正在上传】") + print(f" 请稍候...") + + with open(file_path, 'rb') as f: + files = { + 'file': (filename, f, content_type) + } + + response = requests.post(url, auth=auth, files=files, data=data, timeout=60) + + # 输出HTTP响应信息 + print(f"\n【HTTP响应】") + print(f" 状态码: {response.status_code}") + print(f" 原因: {response.reason}") + print(f" 响应头:") + for key, value in response.headers.items(): + print(f" {key}: {value}") + + # 输出原始响应内容 + print(f"\n【原始响应内容】") + print(f" {response.text}") + + response_data = response.json() + + # 输出解析后的JSON响应 + print(f"\n【解析后的JSON响应】") + print(json.dumps(response_data, ensure_ascii=False, indent=2)) + + # 记录API调用 + request_data = { + "url": url, + "method": "POST", + "data": { + "workspace_id": self.workspace_id, + "type": entry_type, + "entry_id": entry_id, + "owner": owner, + "overwrite": overwrite, + "filename": filename, + "file_size": file_size, + "content_type": content_type + }, + "auth_user": self.api_user + } + self._log_api_call("upload_attachment", request_data, response_data) + + # 检查返回结果 + print(f"\n【结果分析】") + if response_data.get("status") == 1: + data = response_data.get("data", {}) + print(f"\n✓ 文件上传成功") + + # 显示上传结果 + if isinstance(data, dict): + attachment_info = data.get('Attachment', {}) + att_id = attachment_info.get('id', '(无ID)') + att_filename = attachment_info.get('filename', '(无文件名)') + att_type = attachment_info.get('type', '(无类型)') + att_entry_id = attachment_info.get('entry_id', '(无entry_id)') + att_owner = attachment_info.get('owner', '(未知)') + att_created = attachment_info.get('created', '(未知)') + + print(f" 附件ID: {att_id}") + print(f" 文件名: {att_filename}") + print(f" 业务对象类型: {att_type}") + print(f" 业务对象ID: {att_entry_id}") + print(f" 上传人: {att_owner}") + print(f" 创建时间: {att_created}") + + # 保存到单独的文件 + output_file = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + 'logs', + 'tapd_upload_result.json' + ) + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + print(f" ✓ 上传结果已保存到: {output_file}") + + return data + else: + print(f"\n✗ 上传失败") + print(f" 状态码: {response_data.get('status')}") + print(f" 错误信息: {response_data.get('info', '未知错误')}") + return None + + except Exception as e: + print(f"\n✗ 请求异常: {str(e)}") + import traceback + traceback.print_exc() + return None + + def _get_content_type(self, filename): + """ + 根据文件扩展名获取Content-Type + + Args: + filename: 文件名 + + Returns: + str: Content-Type + """ + import mimetypes + content_type, _ = mimetypes.guess_type(filename) + return content_type or 'application/octet-stream' + + def get_attachments(self, entry_id=None, attachment_type=None, filename=None, owner=None, limit=30): + """ + 获取TAPD附件列表 + + Args: + entry_id: 依赖对象ID (可选) + attachment_type: 附件类型 (可选) + filename: 附件名称 (可选) + owner: 上传人 (可选) + limit: 返回数量限制,默认30,最大200 + + Returns: + list: 附件列表,失败返回None + """ + print("\n=== 获取TAPD附件列表 ===") + + # 初始化认证信息 + if not self._init_auth(): + return None + + # 初始化workspace_id + if not self._init_workspace_id(): + return None + + # 使用公司地址 + url = f"{self.base_url}/attachments" + params = { + "workspace_id": self.workspace_id, + "limit": limit + } + + # 添加可选参数 + if entry_id: + params["entry_id"] = entry_id + if attachment_type: + params["type"] = attachment_type + if filename: + params["filename"] = filename + if owner: + params["owner"] = owner + + try: + from requests.auth import HTTPBasicAuth + auth = HTTPBasicAuth(self.api_user, self.api_password) + + print(f"\n正在请求TAPD API...") + print(f" URL: {url}") + print(f" workspace_id: {self.workspace_id}") + if entry_id: + print(f" entry_id: {entry_id}") + if attachment_type: + print(f" type: {attachment_type}") + if filename: + print(f" filename: {filename}") + if owner: + print(f" owner: {owner}") + + response = requests.get(url, params=params, auth=auth, timeout=30) + response_data = response.json() + + # 记录API调用 + request_data = { + "url": url, + "method": "GET", + "params": params, + "auth_user": self.api_user + } + self._log_api_call("get_attachments", request_data, response_data) + + # 检查返回结果 + if response_data.get("status") == 1: + data = response_data.get("data", []) + print(f"\n✓ 成功获取附件列表") + + # 显示附件统计 + if isinstance(data, list): + attachment_count = len(data) + print(f" 共 {attachment_count} 个附件") + + if attachment_count > 0: + # 保存到单独的文件 + output_file = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + 'logs', + 'tapd_attachments.json' + ) + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + print(f" ✓ 附件列表已保存到: {output_file}") + + # 显示附件信息 + print(f"\n附件列表预览:") + print("-" * 80) + for idx, item in enumerate(data[:10], 1): + attachment = item.get('Attachment', {}) + att_id = attachment.get('id', '(无ID)') + att_filename = attachment.get('filename', '(无文件名)') + att_type = attachment.get('type', '(无类型)') + att_content_type = attachment.get('content_type', '(未知)') + att_owner = attachment.get('owner', '(未知)') + att_created = attachment.get('created', '(未知)') + + print(f" {idx}. {att_filename}") + print(f" ID: {att_id}") + print(f" 类型: {att_type} | 内容类型: {att_content_type}") + print(f" 上传人: {att_owner} | 创建时间: {att_created}") + + if attachment_count > 10: + print(f" ... 还有 {attachment_count - 10} 个附件,详见输出文件") + print("-" * 80) + else: + print(" 没有找到符合条件的附件") + + return data + else: + print(f"\n✗ 获取失败") + print(f" 状态码: {response_data.get('status')}") + print(f" 错误信息: {response_data.get('info', '未知错误')}") + return None + + except Exception as e: + print(f"\n✗ 请求异常: {str(e)}") + import traceback + traceback.print_exc() + return None + def print_menu(): """打印菜单""" @@ -600,8 +927,10 @@ def print_menu(): print("5. 发送应用消息") print("\n【TAPD API】") print("6. 获取缺陷字段配置") + print("7. 获取附件列表") + print("8. 上传附件") print("\n【其他】") - print("7. 查看日志文件") + print("9. 查看日志文件") print("0. 退出") print("="*50) @@ -613,7 +942,7 @@ def main(): while True: print_menu() - choice = input("\n请选择操作 (0-7): ").strip() + choice = input("\n请选择操作 (0-9): ").strip() if choice == "0": print("\n感谢使用,再见!") @@ -689,6 +1018,88 @@ def main(): tapd_tester.get_bug_custom_fields() elif choice == "7": + # 获取TAPD附件列表 + print("\n=== 获取附件列表 ===") + print("是否需要添加筛选条件?") + print("1. 获取所有附件(默认)") + print("2. 按依赖对象ID筛选") + print("3. 按附件类型筛选") + print("4. 按文件名筛选") + print("5. 按上传人筛选") + + filter_choice = input("请选择 (直接回车默认为1): ").strip() + + entry_id = None + attachment_type = None + filename = None + owner = None + + if filter_choice == "2": + entry_id = input("请输入依赖对象ID: ").strip() + elif filter_choice == "3": + attachment_type = input("请输入附件类型: ").strip() + elif filter_choice == "4": + filename = input("请输入文件名: ").strip() + elif filter_choice == "5": + owner = input("请输入上传人: ").strip() + + # 询问返回数量 + limit_input = input("请输入返回数量限制 (直接回车默认为30,最大200): ").strip() + limit = int(limit_input) if limit_input else 30 + + tapd_tester.get_attachments( + entry_id=entry_id, + attachment_type=attachment_type, + filename=filename, + owner=owner, + limit=limit + ) + + elif choice == "8": + # 上传附件到TAPD + print("\n=== 上传附件 ===") + file_path = input("请输入文件路径: ").strip() + + if not file_path: + print("✗ 文件路径不能为空") + continue + + # 询问业务对象类型(必填) + print("\n业务对象类型:") + print(" story - 需求") + print(" bug - 缺陷") + print(" task - 任务") + entry_type = input("请选择业务对象类型 (story/bug/task): ").strip().lower() + + if entry_type not in ['story', 'bug', 'task']: + print("✗ 业务对象类型无效,必须是 story/bug/task 之一") + continue + + # 询问业务对象ID(必填) + type_name_map = {'story': '需求', 'bug': '缺陷', 'task': '任务'} + entry_id = input(f"请输入{type_name_map[entry_type]}ID: ").strip() + + if not entry_id: + print("✗ 业务对象ID不能为空") + continue + + # 询问是否指定上传人(可选) + owner_input = input("请输入上传人用户名 (直接回车使用API调用者): ").strip() + owner = owner_input if owner_input else None + + # 询问是否覆盖同名文件(可选) + overwrite_input = input("是否覆盖同名文件? (y/n,直接回车默认为n): ").strip().lower() + overwrite = True if overwrite_input == 'y' else False + + tapd_tester.upload_attachment( + file_path=file_path, + entry_type=entry_type, + entry_id=entry_id, + owner=owner, + overwrite=overwrite + ) + + elif choice == "9": print("\n=== 查看日志文件 ===") try: with open(wework_tester.log_file, 'r', encoding='utf-8') as f: diff --git a/src/attachment_handler.py b/src/attachment_handler.py new file mode 100644 index 0000000..f5de907 --- /dev/null +++ b/src/attachment_handler.py @@ -0,0 +1,300 @@ +""" +附件处理模块 +负责从智能表格下载附件到本地临时目录 +""" + +import os +import requests +import time +import shutil +from pathlib import Path +from typing import List, Dict, Optional, Tuple +from src.api_logger import get_logger + + +class AttachmentHandler: + """附件处理器""" + + def __init__(self, access_token: str, config: Dict): + """ + 初始化附件处理器 + + Args: + access_token: 企业微信access_token + config: 附件配置字典 + """ + self.access_token = access_token + self.config = config + self.logger = get_logger() + + # 从配置中读取参数 + self.temp_dir = config.get('temp_dir', 'temp/attachments') + self.max_file_size = config.get('max_file_size', 150) * 1024 * 1024 # 转换为字节 + self.download_timeout = config.get('download_timeout', 60) + self.download_retry = config.get('download_retry', 3) + + # 确保临时目录存在 + self._ensure_temp_dir() + + print(f" ✓ 附件处理器初始化完成") + print(f" 临时目录: {self.temp_dir}") + print(f" 最大文件大小: {self.max_file_size / 1024 / 1024:.0f}MB") + print(f" 下载超时: {self.download_timeout}秒") + print(f" 下载重试次数: {self.download_retry}") + + def _ensure_temp_dir(self): + """确保临时目录存在""" + temp_path = Path(self.temp_dir) + if not temp_path.exists(): + temp_path.mkdir(parents=True, exist_ok=True) + print(f" ✓ 创建临时目录: {self.temp_dir}") + + def _get_record_temp_dir(self, record_id: str) -> Path: + """ + 获取记录的临时目录 + + Args: + record_id: 记录ID + + Returns: + Path: 记录的临时目录路径 + """ + record_dir = Path(self.temp_dir) / record_id + if not record_dir.exists(): + record_dir.mkdir(parents=True, exist_ok=True) + return record_dir + + def _check_file_size(self, file_url: str) -> Tuple[bool, int]: + """ + 检查文件大小(通过HEAD请求) + + Args: + file_url: 文件URL + + Returns: + Tuple[bool, int]: (是否通过检查, 文件大小) + """ + try: + response = requests.head(file_url, timeout=10) + content_length = response.headers.get('Content-Length') + + if content_length: + file_size = int(content_length) + if file_size > self.max_file_size: + return (False, file_size) + return (True, file_size) + else: + # 如果无法获取文件大小,允许下载(在下载时再检查) + return (True, 0) + except Exception as e: + print(f" ⚠ 无法检查文件大小: {e}") + # 检查失败,允许下载(在下载时再检查) + return (True, 0) + + def _download_single_file(self, file_url: str, save_path: Path, filename: str) -> Tuple[bool, str]: + """ + 下载单个文件(带重试机制) + + Args: + file_url: 文件URL + save_path: 保存路径 + filename: 文件名 + + Returns: + Tuple[bool, str]: (是否成功, 错误信息或文件路径) + """ + for attempt in range(1, self.download_retry + 1): + try: + if attempt > 1: + print(f" → 重试下载 ({attempt}/{self.download_retry}): {filename}") + time.sleep(attempt * 2) # 递增等待时间 + + # 发起下载请求 + response = requests.get(file_url, stream=True, timeout=self.download_timeout) + response.raise_for_status() + + # 写入文件 + with open(save_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + # 检查下载后的文件大小 + downloaded_size = save_path.stat().st_size + if downloaded_size > self.max_file_size: + save_path.unlink() # 删除超大文件 + error_msg = f"文件大小超过限制: {downloaded_size / 1024 / 1024:.2f}MB > {self.max_file_size / 1024 / 1024:.0f}MB" + return (False, error_msg) + + return (True, str(save_path)) + + except requests.exceptions.Timeout: + error_msg = f"下载超时 (>{self.download_timeout}秒)" + if attempt == self.download_retry: + return (False, error_msg) + except requests.exceptions.RequestException as e: + error_msg = f"下载失败: {e}" + if attempt == self.download_retry: + return (False, error_msg) + except Exception as e: + error_msg = f"未预期的错误: {e}" + if attempt == self.download_retry: + return (False, error_msg) + + return (False, "下载失败(已达最大重试次数)") + + def download_attachments(self, record_data: Dict, record_id: str) -> Dict: + """ + 批量下载附件 + + Args: + record_data: 记录数据 + record_id: 记录ID + + Returns: + Dict: 下载结果,包含success、failed_files、downloaded_files + """ + print(f"\n → 开始下载附件...") + print(f" [DEBUG] 记录ID: {record_id}") + + # 获取附件字段的值 + attachment_value = record_data.get('附件(截图、录屏、日志)') + print(f" [DEBUG] 附件字段值类型: {type(attachment_value)}") + print(f" [DEBUG] 附件字段值: {attachment_value}") + + # 如果附件字段为空,返回成功(无附件需要下载) + if not attachment_value: + print(f" ✓ 附件字段为空,跳过下载") + return { + 'success': True, + 'downloaded_files': [], + 'failed_files': [], + 'error_message': None + } + + # 解析附件字段值,提取文件URL列表 + # 根据智能表格API文档,附件字段的值是一个列表 + file_urls = [] + if isinstance(attachment_value, list): + print(f" [DEBUG] 附件字段是列表,长度: {len(attachment_value)}") + for idx, item in enumerate(attachment_value): + print(f" [DEBUG] 附件项 {idx}: 类型={type(item)}, 值={item}") + # 提取文件URL(具体字段名需要根据实际API返回确认) + if isinstance(item, dict): + file_url = item.get('url') or item.get('link') or item.get('file_url') + filename = item.get('name') or item.get('filename') or f"attachment_{len(file_urls)}" + print(f" [DEBUG] 提取结果: url={file_url}, filename={filename}") + if file_url: + file_urls.append({'url': file_url, 'name': filename}) + else: + print(f" [DEBUG] 附件字段不是列表,无法解析") + + if not file_urls: + print(f" ⚠ 无法解析附件字段,未找到文件URL") + print(f" [DEBUG] 附件字段完整内容: {attachment_value}") + return { + 'success': False, + 'downloaded_files': [], + 'failed_files': [], + 'error_message': "无法解析附件字段" + } + + print(f" 找到 {len(file_urls)} 个附件") + + # 获取记录的临时目录 + record_dir = self._get_record_temp_dir(record_id) + + # 下载所有附件 + downloaded_files = [] + failed_files = [] + + for idx, file_info in enumerate(file_urls, 1): + file_url = file_info['url'] + filename = file_info['name'] + + print(f"\n [{idx}/{len(file_urls)}] 下载: {filename}") + + # 检查文件大小 + size_ok, file_size = self._check_file_size(file_url) + if not size_ok: + error_msg = f"文件大小超过限制: {file_size / 1024 / 1024:.2f}MB" + print(f" ✗ {error_msg}") + failed_files.append({'filename': filename, 'error': error_msg}) + continue + + if file_size > 0: + print(f" 文件大小: {file_size / 1024:.2f}KB") + + # 处理文件名冲突 + save_path = record_dir / filename + counter = 1 + while save_path.exists(): + name_parts = filename.rsplit('.', 1) + if len(name_parts) == 2: + new_filename = f"{name_parts[0]}({counter}).{name_parts[1]}" + else: + new_filename = f"{filename}({counter})" + save_path = record_dir / new_filename + counter += 1 + + # 下载文件 + success, result = self._download_single_file(file_url, save_path, filename) + + if success: + print(f" ✓ 下载成功: {save_path.name}") + downloaded_files.append(str(save_path)) + + # 记录API调用日志 + self.logger.log_api_call( + api_type="smartsheet", + operation="download_attachment", + request_data={'url': file_url, 'filename': filename}, + response_data={'saved_path': str(save_path), 'size': save_path.stat().st_size}, + success=True + ) + else: + print(f" ✗ 下载失败: {result}") + failed_files.append({'filename': filename, 'error': result}) + + # 记录API调用日志 + self.logger.log_api_call( + api_type="smartsheet", + operation="download_attachment", + request_data={'url': file_url, 'filename': filename}, + response_data={}, + success=False, + error_message=result + ) + + # 返回结果 + if len(failed_files) > 0: + print(f"\n ⚠ 下载完成: 成功 {len(downloaded_files)} 个, 失败 {len(failed_files)} 个") + return { + 'success': False, + 'downloaded_files': downloaded_files, + 'failed_files': failed_files, + 'error_message': f"部分附件下载失败 ({len(failed_files)}/{len(file_urls)})" + } + else: + print(f"\n ✓ 所有附件下载成功 ({len(downloaded_files)} 个)") + return { + 'success': True, + 'downloaded_files': downloaded_files, + 'failed_files': [], + 'error_message': None + } + + def cleanup_temp_files(self, record_id: str): + """ + 清理记录的临时文件 + + Args: + record_id: 记录ID + """ + try: + record_dir = Path(self.temp_dir) / record_id + if record_dir.exists(): + shutil.rmtree(record_dir) + print(f" ✓ 清理临时文件: {record_dir}") + except Exception as e: + print(f" ⚠ 清理临时文件失败: {e}") diff --git a/src/config.py b/src/config.py index 7a9007e..f3bdbbe 100644 --- a/src/config.py +++ b/src/config.py @@ -185,6 +185,112 @@ class ConfigManager: return result + def get_attachment_config(self): + """ + 获取附件配置 + + Returns: + dict: 包含附件相关配置的字典 + + Raises: + ValueError: 配置值非法时抛出 + """ + # 默认值 + default_temp_dir = "temp/attachments" + default_max_file_size = 150 + default_download_timeout = 60 + default_upload_timeout = 120 + default_download_retry = 3 + default_upload_retry = 3 + + # 如果没有Attachment节,返回默认值 + if not self.config.has_section('Attachment'): + return { + 'temp_dir': default_temp_dir, + 'max_file_size': default_max_file_size, + 'download_timeout': default_download_timeout, + 'upload_timeout': default_upload_timeout, + 'download_retry': default_download_retry, + 'upload_retry': default_upload_retry + } + + # 读取temp_dir配置 + if not self.config.has_option('Attachment', 'temp_dir'): + temp_dir = default_temp_dir + else: + temp_dir = self.config.get('Attachment', 'temp_dir').strip() + if not temp_dir: + temp_dir = default_temp_dir + + # 读取max_file_size配置 + if not self.config.has_option('Attachment', 'max_file_size'): + max_file_size = default_max_file_size + else: + max_file_size_str = self.config.get('Attachment', 'max_file_size').strip() + try: + max_file_size = int(max_file_size_str) + except ValueError: + raise ValueError(f"max_file_size配置项必须为整数,当前值: {max_file_size_str}") + if max_file_size <= 0: + raise ValueError(f"max_file_size配置项必须为正整数,当前值: {max_file_size}") + + # 读取download_timeout配置 + if not self.config.has_option('Attachment', 'download_timeout'): + download_timeout = default_download_timeout + else: + download_timeout_str = self.config.get('Attachment', 'download_timeout').strip() + try: + download_timeout = int(download_timeout_str) + except ValueError: + raise ValueError(f"download_timeout配置项必须为整数,当前值: {download_timeout_str}") + if download_timeout <= 0: + raise ValueError(f"download_timeout配置项必须为正整数,当前值: {download_timeout}") + + # 读取upload_timeout配置 + if not self.config.has_option('Attachment', 'upload_timeout'): + upload_timeout = default_upload_timeout + else: + upload_timeout_str = self.config.get('Attachment', 'upload_timeout').strip() + try: + upload_timeout = int(upload_timeout_str) + except ValueError: + raise ValueError(f"upload_timeout配置项必须为整数,当前值: {upload_timeout_str}") + if upload_timeout <= 0: + raise ValueError(f"upload_timeout配置项必须为正整数,当前值: {upload_timeout}") + + # 读取download_retry配置 + if not self.config.has_option('Attachment', 'download_retry'): + download_retry = default_download_retry + else: + download_retry_str = self.config.get('Attachment', 'download_retry').strip() + try: + download_retry = int(download_retry_str) + except ValueError: + raise ValueError(f"download_retry配置项必须为整数,当前值: {download_retry_str}") + if download_retry <= 0: + raise ValueError(f"download_retry配置项必须为正整数,当前值: {download_retry}") + + # 读取upload_retry配置 + if not self.config.has_option('Attachment', 'upload_retry'): + upload_retry = default_upload_retry + else: + upload_retry_str = self.config.get('Attachment', 'upload_retry').strip() + try: + upload_retry = int(upload_retry_str) + except ValueError: + raise ValueError(f"upload_retry配置项必须为整数,当前值: {upload_retry_str}") + if upload_retry <= 0: + raise ValueError(f"upload_retry配置项必须为正整数,当前值: {upload_retry}") + + return { + 'temp_dir': temp_dir, + 'max_file_size': max_file_size, + 'download_timeout': download_timeout, + 'upload_timeout': upload_timeout, + 'download_retry': download_retry, + 'upload_retry': upload_retry + } + def get_all_config(self): """ 获取所有配置 @@ -196,7 +302,8 @@ class ConfigManager: 'tapd': self.get_tapd_config(), 'smartsheet': self.get_smartsheet_config(), 'schedule': self.get_schedule_config(), - 'wework': self.get_wework_config() + 'wework': self.get_wework_config(), + 'attachment': self.get_attachment_config() } def print_config(self): diff --git a/src/log_viewer.py b/src/log_viewer.py new file mode 100644 index 0000000..f70204a --- /dev/null +++ b/src/log_viewer.py @@ -0,0 +1,320 @@ +""" +日志查询工具 +提供交互式界面查询API调用日志 +""" + +import json +import os +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Dict, Any, Optional + + +class LogViewer: + """日志查询工具类""" + + def __init__(self, log_dir: Optional[str] = None): + """ + 初始化日志查询工具 + + Args: + log_dir: 日志目录路径,如果为None则使用默认路径 + """ + if log_dir is None: + # 默认路径:项目根目录/logs/ + project_root = Path(__file__).parent.parent + self.log_dir = project_root / "logs" + else: + self.log_dir = Path(log_dir) + + if not self.log_dir.exists(): + raise FileNotFoundError(f"日志目录不存在: {self.log_dir}") + + def list_available_dates(self) -> List[str]: + """ + 列出所有可用的日志文件日期 + + Returns: + 日期列表,格式为YYYY-MM-DD,按时间倒序排列 + """ + log_files = list(self.log_dir.glob("api_log_*.json")) + dates = [] + + for log_file in log_files: + # 从文件名提取日期:api_log_2025-12-25.json -> 2025-12-25 + filename = log_file.stem # 去掉.json后缀 + if filename.startswith("api_log_") and not "archive" in filename: + date_str = filename.replace("api_log_", "") + dates.append(date_str) + + # 按日期倒序排列(最新的在前) + return sorted(dates, reverse=True) + + def load_log(self, date: str) -> List[Dict[str, Any]]: + """ + 加载指定日期的日志 + + Args: + date: 日期字符串,格式为YYYY-MM-DD + + Returns: + 日志记录列表 + """ + log_file = self.log_dir / f"api_log_{date}.json" + + if not log_file.exists(): + raise FileNotFoundError(f"日志文件不存在: {log_file}") + + with open(log_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + return data.get("records", []) + + def get_operation_stats(self, records: List[Dict[str, Any]]) -> Dict[str, int]: + """ + 统计每种操作类型的数量 + + Args: + records: 日志记录列表 + + Returns: + 操作类型统计字典,格式为 {operation: count} + """ + stats = {} + for record in records: + op = record.get('operation', 'unknown') + stats[op] = stats.get(op, 0) + 1 + + return stats + + def filter_by_operation(self, records: List[Dict[str, Any]], operation: str) -> List[Dict[str, Any]]: + """ + 按操作类型过滤记录 + + Args: + records: 日志记录列表 + operation: 操作类型 + + Returns: + 过滤后的记录列表 + """ + return [r for r in records if r.get('operation') == operation] + + def display_record(self, record: Dict[str, Any], index: int, total: int): + """ + 显示单条记录的详细信息 + + Args: + record: 日志记录 + index: 当前记录索引(从1开始) + total: 总记录数 + """ + status = "成功 ✓" if record.get('success', True) else "失败 ✗" + timestamp = record.get('timestamp', 'unknown') + + print(f"\n[{index}/{total}] {timestamp} - {status}") + print("-" * 60) + + print("\n请求内容:") + print(json.dumps(record.get('request', {}), ensure_ascii=False, indent=2)) + + print("\n返回内容:") + print(json.dumps(record.get('response', {}), ensure_ascii=False, indent=2)) + + if 'error_message' in record: + print(f"\n错误信息: {record['error_message']}") + + def browse_records(self, records: List[Dict[str, Any]]): + """ + 交互式浏览记录(从最新的开始) + + Args: + records: 日志记录列表 + """ + if not records: + print("\n没有找到任何记录") + return + + # 从最后一条开始 + index = len(records) - 1 + show_failed_only = False + + while index >= 0: + # 如果只看失败的,跳过成功的记录 + if show_failed_only and records[index].get('success', True): + index -= 1 + continue + + # 显示当前记录 + self.display_record(records[index], index + 1, len(records)) + + # 等待用户输入 + cmd = input("\n[Enter]上一条 [q]退出 [f]只看失败 [a]查看全部: ").strip().lower() + + if cmd == 'q': + break + elif cmd == 'f': + show_failed_only = True + print("\n已切换到只看失败的记录") + elif cmd == 'a': + show_failed_only = False + print("\n已切换到查看全部记录") + + index -= 1 + + if index < 0: + print("\n已经是第一条记录了") + + def run(self): + """主流程:日期选择 → 操作选择 → 浏览记录""" + print("\n" + "=" * 60) + print("=== TAPD 日志查询工具 ===") + print("=" * 60) + + # 第一步:选择日期 + selected_date = self._select_date() + if not selected_date: + return + + # 第二步:加载日志 + try: + print(f"\n正在加载 {selected_date} 的日志...") + records = self.load_log(selected_date) + print(f"共找到 {len(records)} 条API调用记录") + except Exception as e: + print(f"\n✗ 加载日志失败: {str(e)}") + return + + if not records: + print("\n该日期没有日志记录") + return + + # 第三步:选择操作类型 + filtered_records = self._select_operation(records) + if filtered_records is None: + return + + # 第四步:浏览记录 + self.browse_records(filtered_records) + + def _select_date(self) -> Optional[str]: + """ + 日期选择界面 + + Returns: + 选择的日期字符串,如果取消则返回None + """ + dates = self.list_available_dates() + + if not dates: + print("\n没有找到任何日志文件") + return None + + print("\n请选择日期:") + + # 显示最近3天的快捷选项 + today = datetime.now().strftime("%Y-%m-%d") + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + day_before = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d") + + options = [] + if today in dates: + options.append(("1", f"今天 ({today})", today)) + if yesterday in dates: + options.append(("2", f"昨天 ({yesterday})", yesterday)) + if day_before in dates: + options.append(("3", f"前天 ({day_before})", day_before)) + + # 显示选项 + for opt_num, opt_label, _ in options: + print(f"{opt_num}. {opt_label}") + + next_num = len(options) + 1 + print(f"{next_num}. 输入指定日期 (YYYY-MM-DD)") + + # 获取用户输入 + choice = input(f"\n请输入选项 [1-{next_num}]: ").strip() + + # 处理快捷选项 + for opt_num, _, date_value in options: + if choice == opt_num: + return date_value + + # 处理手动输入日期 + if choice == str(next_num): + date_input = input("请输入日期 (YYYY-MM-DD): ").strip() + if date_input in dates: + return date_input + else: + print(f"\n✗ 日志文件不存在: api_log_{date_input}.json") + return None + + print("\n✗ 无效的选项") + return None + + def _select_operation(self, records: List[Dict[str, Any]]) -> Optional[List[Dict[str, Any]]]: + """ + 操作类型选择界面 + + Args: + records: 日志记录列表 + + Returns: + 过滤后的记录列表,如果取消则返回None + """ + # 统计操作类型 + stats = self.get_operation_stats(records) + + # 按数量排序 + sorted_ops = sorted(stats.items(), key=lambda x: x[1], reverse=True) + + print("\n请选择要查询的操作类型:") + + # 显示操作类型列表 + for i, (op, count) in enumerate(sorted_ops, 1): + print(f"{i}. {op} ({count}条)") + + next_num = len(sorted_ops) + 1 + print(f"{next_num}. 查看所有记录") + print(f"{next_num + 1}. 返回上一级") + + # 获取用户输入 + choice = input(f"\n请输入选项 [1-{next_num + 1}]: ").strip() + + try: + choice_num = int(choice) + + if 1 <= choice_num <= len(sorted_ops): + # 选择了特定操作类型 + selected_op = sorted_ops[choice_num - 1][0] + filtered = self.filter_by_operation(records, selected_op) + print(f"\n=== {selected_op} - 共{len(filtered)}条 ===") + return filtered + + elif choice_num == next_num: + # 查看所有记录 + print(f"\n=== 所有记录 - 共{len(records)}条 ===") + return records + + elif choice_num == next_num + 1: + # 返回上一级 + return None + + else: + print("\n✗ 无效的选项") + return None + + except ValueError: + print("\n✗ 请输入数字") + return None + + +if __name__ == "__main__": + try: + viewer = LogViewer() + viewer.run() + except KeyboardInterrupt: + print("\n\n程序已退出") + except Exception as e: + print(f"\n✗ 发生错误: {str(e)}") + diff --git a/src/main.py b/src/main.py index de907ce..1a10f70 100644 --- a/src/main.py +++ b/src/main.py @@ -167,6 +167,11 @@ def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id: if missing_fields: raise RuntimeError(f"子表 {sheet_title} 缺少必需字段: {', '.join(missing_fields)}") + # 检查附件字段是否存在(附件字段必须存在,但可以为空) + has_attachment_field, attachment_field_id = api.check_attachment_field(fields) + if not has_attachment_field: + raise RuntimeError(f"子表 {sheet_title} 缺少附件字段: 附件(截图、录屏、日志)") + # 3. 获取"开单状态"为空的记录 status_field_id = field_mapping['开单状态'] records = api.get_empty_status_records(sheet_id, status_field_id) @@ -181,7 +186,9 @@ def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id: 'fields': fields, 'field_mapping': field_mapping, 'sheet_id': sheet_id, - 'sheet_title': sheet_title + 'sheet_title': sheet_title, + 'has_attachment_field': has_attachment_field, + 'attachment_field_id': attachment_field_id } # 4. 提取记录数据并校验 @@ -199,13 +206,15 @@ def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id: if len(validation_result['invalid_records']) > 0: print(f" - 校验失败: {len(validation_result['invalid_records'])} 条") - # 返回结果,包含字段信息和sheet_id + # 返回结果,包含字段信息、sheet_id和附件字段信息 return { 'validation_result': validation_result, 'fields': fields, 'field_mapping': field_mapping, 'sheet_id': sheet_id, - 'sheet_title': sheet_title + 'sheet_title': sheet_title, + 'has_attachment_field': has_attachment_field, + 'attachment_field_id': attachment_field_id } diff --git a/src/mapper.py b/src/mapper.py index 390fcc3..bc9aaf0 100644 --- a/src/mapper.py +++ b/src/mapper.py @@ -33,6 +33,71 @@ class FieldMapper: """ self.reporter = reporter + def _convert_multiline_text(self, value: Any) -> str: + """ + 将多行文本字段值转换为字符串 + + 智能表格的多行文本字段返回格式: + [ + {"text": "第一行\n", "type": "text"}, + {"text": "第二行\n", "type": "text"}, + {"text": "第三行", "type": "text"} + ] + + Args: + value: 字段值,可能是字符串、列表或字典数组 + + Returns: + str: 拼接后的完整文本 + """ + if isinstance(value, str): + # 如果已经是字符串,直接返回 + return value.strip() + elif isinstance(value, list): + # 如果是列表,检查是否是多行文本格式 + if value and isinstance(value[0], dict) and 'text' in value[0]: + # 多行文本格式:提取所有text字段并拼接 + text_parts = [item.get('text', '') for item in value if isinstance(item, dict)] + return ''.join(text_parts).strip() + else: + # 普通列表,转换为字符串 + return str(value).strip() + else: + # 其他类型转换为字符串 + return str(value).strip() if value else '' + + def _convert_text_to_html(self, text: str) -> str: + """ + 将纯文本转换为HTML格式 + + 处理: + 1. 换行符转换为
+ 2. 自动识别URL并转换为链接 + + Args: + text: 纯文本 + + Returns: + str: HTML格式的文本 + """ + if not text: + return "" + + import re + + # 1. HTML转义特殊字符(除了即将处理的URL) + # 暂时不转义,因为TAPD可能支持HTML + + # 2. 识别URL并转换为链接 + # 匹配http://或https://开头的URL + url_pattern = r'(https?://[^\s<>"]+)' + text = re.sub(url_pattern, r'\1', text) + + # 3. 将换行符转换为
+ text = text.replace('\n', '
') + + return text + def _convert_multiselect_to_string(self, value: Any) -> str: """ 将多选字段值转换为分号分隔的字符串 @@ -54,6 +119,73 @@ class FieldMapper: # 其他类型转换为字符串 return str(value).strip() if value else '' + def _format_attachment_info(self, attachment_value: Any) -> str: + """ + 格式化附件信息为HTML格式 + + Args: + attachment_value: 附件字段值 + + Returns: + str: HTML格式的附件信息 + """ + if not attachment_value: + return "" + + attachment_html = "


【附件】
" + + # 处理单个附件(字典) + if isinstance(attachment_value, dict): + name = attachment_value.get('name', '未知文件') + url = attachment_value.get('file_url', '') + size = attachment_value.get('size', 0) + file_ext = attachment_value.get('file_ext', '') + + # 格式化文件大小 + if size > 1024 * 1024: + size_str = f"{size / 1024 / 1024:.2f}MB" + elif size > 1024: + size_str = f"{size / 1024:.2f}KB" + else: + size_str = f"{size}B" + + attachment_html += f"• {name}" + if file_ext: + attachment_html += f" (.{file_ext})" + if size > 0: + attachment_html += f" [{size_str}]" + if url: + attachment_html += f'
  链接: {url}' + attachment_html += "
" + + # 处理多个附件(列表) + elif isinstance(attachment_value, list): + for idx, item in enumerate(attachment_value, 1): + if isinstance(item, dict): + name = item.get('name', f'附件{idx}') + url = item.get('file_url', '') + size = item.get('size', 0) + file_ext = item.get('file_ext', '') + + # 格式化文件大小 + if size > 1024 * 1024: + size_str = f"{size / 1024 / 1024:.2f}MB" + elif size > 1024: + size_str = f"{size / 1024:.2f}KB" + else: + size_str = f"{size}B" + + attachment_html += f"• {name}" + if file_ext: + attachment_html += f" (.{file_ext})" + if size > 0: + attachment_html += f" [{size_str}]" + if url: + attachment_html += f'
  链接: {url}' + attachment_html += "
" + + return attachment_html + def map_record_to_tapd(self, record_data: Dict[str, Any]) -> Dict[str, Any]: """ 将智能表格记录数据映射为TAPD API参数 @@ -76,9 +208,19 @@ class FieldMapper: tapd_data['title'] = title # 2. 详细描述(必填) - description = record_data.get('详细描述', '').strip() + description = self._convert_multiline_text(record_data.get('详细描述', '')) if not description: raise ValueError("详细描述不能为空") + + # 转换为HTML格式(处理换行和URL) + description = self._convert_text_to_html(description) + + # 附加附件信息到详细描述 + attachment_value = record_data.get('附件(截图、录屏、日志)') + if attachment_value: + attachment_info = self._format_attachment_info(attachment_value) + description += attachment_info + tapd_data['description'] = description # 3. 优先级(必填) diff --git a/src/smartsheet.py b/src/smartsheet.py index 9b18ed3..0f3a1fc 100644 --- a/src/smartsheet.py +++ b/src/smartsheet.py @@ -193,6 +193,31 @@ class SmartSheetAPI: print(f" ✓ 构建字段映射完成,共 {len(mapping)} 个字段") return mapping + def check_attachment_field(self, fields: List[Dict]) -> tuple: + """ + 检查是否存在附件字段 + + Args: + fields: 字段列表 + + Returns: + tuple: (是否存在, field_id或None) + """ + attachment_field_name = "附件(截图、录屏、日志)" + + for field in fields: + field_title = field.get('field_title', '') + if field_title == attachment_field_name: + field_id = field.get('field_id', '') + field_type = field.get('field_type', '') + print(f" ✓ 找到附件字段: {attachment_field_name}") + print(f" 字段ID: {field_id}") + print(f" 字段类型: {field_type}") + return (True, field_id) + + print(f" ⚠ 未找到附件字段: {attachment_field_name}") + return (False, None) + def get_records(self, sheet_id: str, filter_spec: Optional[Dict] = None, limit: int = 100, offset: int = 0) -> Dict: """ @@ -404,7 +429,13 @@ class SmartSheetAPI: # 3. 纯文本字段(只包含 text,没有 id) elif 'text' in first_value: - return first_value.get('text', '') + # 如果有多个值,说明是多行文本,返回完整的数组供后续处理 + if len(field_value_list) > 1: + # 多行文本:返回完整的数组结构 + return field_value_list + else: + # 单行文本:返回text值 + return first_value.get('text', '') # 4. 数字字段 elif 'number' in first_value: diff --git a/src/tapd_api.py b/src/tapd_api.py index 7d767e0..0b99c3e 100644 --- a/src/tapd_api.py +++ b/src/tapd_api.py @@ -5,7 +5,7 @@ TAPD API调用模块 import os import requests -from typing import Dict, Optional, Any +from typing import Dict, Optional, Any, Tuple, List from requests.auth import HTTPBasicAuth from src.api_logger import get_logger @@ -311,3 +311,232 @@ class TAPDApi: """ # TAPD bug URL格式 return f"https://www.tapd.cn/{self.workspace_id}/bugtrace/bugs/view/{bug_id}" + + def _check_file_size_before_upload(self, file_path: str, max_size: int) -> Tuple[bool, str]: + """ + 上传前检查文件大小 + + Args: + file_path: 文件路径 + max_size: 最大文件大小(字节) + + Returns: + Tuple[bool, str]: (是否通过检查, 错误信息) + """ + try: + import os + file_size = os.path.getsize(file_path) + if file_size > max_size: + error_msg = f"文件大小超过限制: {file_size / 1024 / 1024:.2f}MB > {max_size / 1024 / 1024:.0f}MB" + return (False, error_msg) + return (True, "") + except Exception as e: + return (False, f"无法获取文件大小: {e}") + + def upload_attachment(self, file_path: str, bug_id: str, max_size: int = 150 * 1024 * 1024, + upload_timeout: int = 120, max_retries: int = 3) -> Dict: + """ + 上传单个附件到TAPD bug单 + + Args: + file_path: 文件路径 + bug_id: bug ID + max_size: 最大文件大小(字节),默认150MB + upload_timeout: 上传超时时间(秒),默认120秒 + max_retries: 最大重试次数,默认3次 + + Returns: + Dict: 上传结果,包含success、attachment_id、error_message + + Raises: + RuntimeError: 上传失败时抛出 + """ + import os + import time + + filename = os.path.basename(file_path) + print(f" → 上传附件: {filename}") + + # 检查文件是否存在 + if not os.path.exists(file_path): + error_msg = f"文件不存在: {file_path}" + print(f" ✗ {error_msg}") + return {'success': False, 'error_message': error_msg} + + # 检查文件大小 + size_ok, error_msg = self._check_file_size_before_upload(file_path, max_size) + if not size_ok: + print(f" ✗ {error_msg}") + return {'success': False, 'error_message': error_msg} + + # 准备上传参数 + url = f"{self.BASE_URL}/files/upload_attachment" + data = { + 'workspace_id': self.workspace_id, + 'type': 'bug', + 'entry_id': bug_id + } + + # 准备日志记录的请求数据 + log_request_data = { + "url": url, + "method": "POST", + "data": data, + "filename": filename, + "file_size": os.path.getsize(file_path), + "auth_user": self.api_user + } + + # 重试上传 + for attempt in range(1, max_retries + 1): + try: + if attempt > 1: + print(f" → 重试上传 ({attempt}/{max_retries})") + time.sleep(attempt * 2) + + # 打开文件并上传 + with open(file_path, 'rb') as f: + files = {'file': (filename, f, 'application/octet-stream')} + response = self.session.post( + url, + data=data, + files=files, + auth=self.auth, + timeout=upload_timeout + ) + + response.raise_for_status() + result = response.json() + + # 检查TAPD API返回的状态 + if result.get('status') != 1: + error_msg = result.get('info', '未知错误') + if attempt == max_retries: + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="tapd", + operation="upload_attachment", + request_data=log_request_data, + response_data=result, + success=False, + error_message=error_msg + ) + print(f" ✗ 上传失败: {error_msg}") + return {'success': False, 'error_message': error_msg} + continue + + # 上传成功,提取附件ID + data_field = result.get('data', {}) + attachment_info = data_field.get('Attachment', {}) + attachment_id = attachment_info.get('id') + + # 记录API调用日志(成功) + self.logger.log_api_call( + api_type="tapd", + operation="upload_attachment", + request_data=log_request_data, + response_data=result, + success=True + ) + + print(f" ✓ 上传成功 (附件ID: {attachment_id})") + return { + 'success': True, + 'attachment_id': attachment_id, + 'error_message': None + } + + except requests.exceptions.Timeout: + error_msg = f"上传超时 (>{upload_timeout}秒)" + if attempt == max_retries: + self.logger.log_api_call( + api_type="tapd", + operation="upload_attachment", + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + print(f" ✗ {error_msg}") + return {'success': False, 'error_message': error_msg} + except requests.exceptions.RequestException as e: + error_msg = f"上传失败: {e}" + if attempt == max_retries: + self.logger.log_api_call( + api_type="tapd", + operation="upload_attachment", + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + print(f" ✗ {error_msg}") + return {'success': False, 'error_message': error_msg} + except Exception as e: + error_msg = f"未预期的错误: {e}" + if attempt == max_retries: + self.logger.log_api_call( + api_type="tapd", + operation="upload_attachment", + request_data=log_request_data, + response_data={}, + success=False, + error_message=error_msg + ) + print(f" ✗ {error_msg}") + return {'success': False, 'error_message': error_msg} + + return {'success': False, 'error_message': "上传失败(已达最大重试次数)"} + + def upload_attachments_batch(self, file_paths: List[str], bug_id: str, max_size: int = 150 * 1024 * 1024, + upload_timeout: int = 120, max_retries: int = 3) -> Dict: + """ + 批量上传附件到TAPD bug单 + + Args: + file_paths: 文件路径列表 + bug_id: bug ID + max_size: 最大文件大小(字节),默认150MB + upload_timeout: 上传超时时间(秒),默认120秒 + max_retries: 最大重试次数,默认3次 + + Returns: + Dict: 上传结果,包含success、success_files、failed_files + """ + print(f"\n → 开始批量上传附件 (共 {len(file_paths)} 个)") + + success_files = [] + failed_files = [] + + for idx, file_path in enumerate(file_paths, 1): + print(f"\n [{idx}/{len(file_paths)}]") + result = self.upload_attachment(file_path, bug_id, max_size, upload_timeout, max_retries) + + if result['success']: + success_files.append({ + 'file_path': file_path, + 'attachment_id': result['attachment_id'] + }) + else: + failed_files.append({ + 'file_path': file_path, + 'error': result['error_message'] + }) + + # 返回结果 + if len(failed_files) > 0: + print(f"\n ⚠ 批量上传完成: 成功 {len(success_files)} 个, 失败 {len(failed_files)} 个") + return { + 'success': False, + 'success_files': success_files, + 'failed_files': failed_files, + 'error_message': f"部分附件上传失败 ({len(failed_files)}/{len(file_paths)})" + } + else: + print(f"\n ✓ 所有附件上传成功 ({len(success_files)} 个)") + return { + 'success': True, + 'success_files': success_files, + 'failed_files': [], + 'error_message': None + } diff --git a/src/token_manager.py b/src/token_manager.py index fc06617..5bbf614 100644 --- a/src/token_manager.py +++ b/src/token_manager.py @@ -11,6 +11,7 @@ import time import requests from pathlib import Path from typing import Optional, Dict +from src.api_logger import get_logger class TokenManager: @@ -44,6 +45,9 @@ class TokenManager: self.corpid = os.environ.get('CORPID') self.corpsecret = os.environ.get('CORPSECRET') + # 初始化日志记录器 + self.logger = get_logger() + def _validate_env_config(self): """ 验证环境变量配置是否完整 @@ -85,28 +89,86 @@ class TokenManager: response.raise_for_status() result = response.json() + # 准备日志记录数据 + log_request_data = { + "url": self.TOKEN_API_URL, + "method": "GET", + "params": {"corpid": self.corpid, "corpsecret": "***"} + } + # 检查返回的错误码 errcode = result.get('errcode', 0) if errcode != 0: errmsg = result.get('errmsg', '未知错误') + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="wework", + operation="wework/get_token", + request_data=log_request_data, + response_data=result, + success=False, + error_message=errmsg + ) raise RuntimeError(f"获取access_token失败: errcode={errcode}, errmsg={errmsg}") access_token = result.get('access_token') expires_in = result.get('expires_in', self.TOKEN_EXPIRES_IN) if not access_token: + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="wework", + operation="wework/get_token", + request_data=log_request_data, + response_data=result, + success=False, + error_message="API返回的数据中未找到access_token" + ) raise RuntimeError("API返回的数据中未找到access_token") print(f" ✓ 成功获取access_token (有效期: {expires_in}秒)") + # 记录API调用日志(成功) + log_response_data = { + "errcode": result.get('errcode', 0), + "errmsg": result.get('errmsg', 'ok'), + "access_token": access_token[:20] + "...", # 只记录前20个字符 + "expires_in": expires_in + } + self.logger.log_api_call( + api_type="wework", + operation="wework/get_token", + request_data=log_request_data, + response_data=log_response_data, + success=True + ) + return { 'access_token': access_token, 'expires_in': expires_in } except requests.exceptions.Timeout: + # 记录API调用日志(超时) + self.logger.log_api_call( + api_type="wework", + operation="wework/get_token", + request_data={"url": self.TOKEN_API_URL, "method": "GET"}, + response_data={}, + success=False, + error_message="请求超时" + ) raise RuntimeError("获取access_token超时,请检查网络连接") except requests.exceptions.RequestException as e: + # 记录API调用日志(异常) + self.logger.log_api_call( + api_type="wework", + operation="wework/get_token", + request_data={"url": self.TOKEN_API_URL, "method": "GET"}, + response_data={}, + success=False, + error_message=str(e) + ) raise RuntimeError(f"获取access_token失败: {e}") def _load_cache(self) -> Optional[Dict]: diff --git a/src/upload_attachment.py b/src/upload_attachment.py new file mode 100644 index 0000000..e5c8317 --- /dev/null +++ b/src/upload_attachment.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +TAPD附件上传脚本 +用于上传文件到TAPD系统 +""" + +import requests +import os +import sys +import time +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from src.config import ConfigManager + +# 禁用SSL警告(如果有证书问题) +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +def upload_attachment(file_path, max_retries=3): + """ + 上传附件到TAPD(带重试机制) + + Args: + file_path: 要上传的文件路径 + max_retries: 最大重试次数 + + Returns: + dict: 上传成功返回附件信息,失败返回None + """ + print("\n=== TAPD附件上传 ===") + + # 检查文件是否存在 + if not os.path.exists(file_path): + print(f"✗ 文件不存在: {file_path}") + return None + + # 检查文件大小(限制150MB) + file_size = os.path.getsize(file_path) + max_size = 150 * 1024 * 1024 # 150MB + if file_size > max_size: + print(f"✗ 文件大小超过限制: {file_size / 1024 / 1024:.2f}MB (最大150MB)") + return None + + print(f"✓ 文件路径: {file_path}") + print(f"✓ 文件大小: {file_size / 1024:.2f}KB") + + # 从环境变量读取认证信息 + api_user = os.environ.get('TAPD_API_USER') + api_password = os.environ.get('TAPD_API_PASSWORD') + + if not api_user or not api_password: + print("✗ TAPD认证信息未设置") + print(" 请设置环境变量:") + print(" - TAPD_API_USER") + print(" - TAPD_API_PASSWORD") + return None + + print(f"✓ TAPD认证信息已加载 (用户: {api_user})") + + # 从配置文件读取workspace_id + try: + config_manager = ConfigManager() + tapd_config = config_manager.get_tapd_config() + workspace_id = tapd_config['workspace_id'] + print(f"✓ 从配置文件读取workspace_id: {workspace_id}") + except Exception as e: + print(f"✗ 读取workspace_id失败: {e}") + return None + + # 构造请求 + url = "https://tapd-api.bilibili.co/tapd/upload_attachment" + + # 必选参数 + data = { + 'workspace_id': workspace_id, + 'type': 'bug_description', # 固定为bug_description + 'custom_field': 'attachment' # 字段英文名 + } + + # 重试上传 + from requests.auth import HTTPBasicAuth + auth = HTTPBasicAuth(api_user, api_password) + + for attempt in range(1, max_retries + 1): + try: + print(f"\n{'='*60}") + print(f"{'[重试 ' + str(attempt) + '/' + str(max_retries) + '] ' if attempt > 1 else ''}开始上传文件") + print(f"{'='*60}") + + # 显示请求详情 + print(f"\n【请求信息】") + print(f" URL: {url}") + print(f" 方法: POST") + print(f" 认证: Basic Auth (用户: {api_user})") + print(f"\n【请求参数】") + print(f" workspace_id: {workspace_id}") + print(f" type: {data['type']}") + print(f" custom_field: {data['custom_field']}") + print(f"\n【上传文件】") + print(f" 文件名: {os.path.basename(file_path)}") + print(f" 文件大小: {file_size / 1024:.2f}KB") + print(f" Content-Type: application/octet-stream") + + # 每次重试都重新打开文件 + with open(file_path, 'rb') as f: + files = { + 'file': (os.path.basename(file_path), f, 'application/octet-stream') + } + + # 发送请求,增加超时时间和禁用代理 + print(f"\n正在发送请求...") + response = requests.post( + url, + data=data, + files=files, + auth=auth, + timeout=120, # 增加超时时间到2分钟 + verify=False, # 如果有SSL证书问题,可以禁用验证 + proxies={'http': None, 'https': None} # 禁用代理 + ) + + # 显示响应信息 + print(f"\n{'='*60}") + print(f"【响应信息】") + print(f"{'='*60}") + print(f" HTTP状态码: {response.status_code}") + print(f" 响应头:") + for key, value in response.headers.items(): + print(f" {key}: {value}") + + print(f"\n【响应内容】") + print(f" 原始响应文本 (前500字符):") + print(f" {response.text[:500]}") + if len(response.text) > 500: + print(f" ... (总长度: {len(response.text)} 字符)") + + # 尝试解析JSON + try: + response_data = response.json() + print(f"\n JSON解析成功!") + print(f" 响应数据类型: {type(response_data)}") + print(f" 响应数据结构:") + import json + print(f" {json.dumps(response_data, indent=2, ensure_ascii=False)}") + except Exception as e: + print(f"\n ✗ JSON解析失败: {e}") + print(f" 完整响应文本:") + print(f" {response.text}") + if attempt < max_retries: + print(f"\n 等待 {attempt * 2} 秒后重试...") + time.sleep(attempt * 2) + continue + return None + + # 检查返回结果 + if response_data.get("status") == 1: + # 获取data字段 + data_field = response_data.get("data", {}) + + # 检查data是否为字典 + if isinstance(data_field, dict): + attachment_data = data_field.get("Attachment", {}) + elif isinstance(data_field, str): + # 如果data是字符串,可能需要再次解析 + print(f"[调试] data字段是字符串,尝试解析...") + import json + try: + attachment_data = json.loads(data_field).get("Attachment", {}) + except: + attachment_data = {} + else: + attachment_data = {} + + print(f"\n✓ 文件上传成功!") + print(f" 附件ID: {attachment_data.get('id')}") + print(f" 文件名: {attachment_data.get('filename')}") + print(f" 类型: {attachment_data.get('type')}") + print(f" 内容类型: {attachment_data.get('content_type')}") + print(f" 创建时间: {attachment_data.get('created')}") + print(f" 工作项ID: {attachment_data.get('entry_id', '(未关联)')}") + return attachment_data + else: + print(f"\n✗ 上传失败") + print(f" 状态码: {response_data.get('status')}") + print(f" 错误信息: {response_data.get('info', '未知错误')}") + if attempt < max_retries: + print(f" 等待 {attempt * 2} 秒后重试...") + time.sleep(attempt * 2) + continue + return None + + except requests.exceptions.ConnectionError as e: + print(f"\n✗ 连接错误: {str(e)}") + if attempt < max_retries: + wait_time = attempt * 3 + print(f" 等待 {wait_time} 秒后重试...") + time.sleep(wait_time) + continue + else: + print(f" 已达到最大重试次数 ({max_retries}),上传失败") + return None + + except requests.exceptions.Timeout as e: + print(f"\n✗ 请求超时: {str(e)}") + if attempt < max_retries: + wait_time = attempt * 3 + print(f" 等待 {wait_time} 秒后重试...") + time.sleep(wait_time) + continue + else: + print(f" 已达到最大重试次数 ({max_retries}),上传失败") + return None + + except Exception as e: + print(f"\n✗ 上传异常: {str(e)}") + if attempt < max_retries: + wait_time = attempt * 2 + print(f" 等待 {wait_time} 秒后重试...") + time.sleep(wait_time) + continue + else: + print(f" 已达到最大重试次数 ({max_retries}),上传失败") + import traceback + traceback.print_exc() + return None + + return None + + +def main(): + """主函数""" + # 要上传的文件路径(根目录下的test.png) + project_root = Path(__file__).parent.parent + file_path = project_root / "test.png" + + print("=" * 60) + print("TAPD附件上传脚本") + print("=" * 60) + + # 执行上传 + result = upload_attachment(str(file_path)) + + if result: + print("\n" + "=" * 60) + print("上传完成!") + print("=" * 60) + else: + print("\n" + "=" * 60) + print("上传失败,请检查错误信息") + print("=" * 60) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/wework_notifier.py b/src/wework_notifier.py index 088ce5c..c14aa9a 100644 --- a/src/wework_notifier.py +++ b/src/wework_notifier.py @@ -8,6 +8,7 @@ import requests from typing import List, Dict from datetime import datetime +from src.api_logger import get_logger class WeWorkNotifier: @@ -26,6 +27,7 @@ class WeWorkNotifier: self.agentid = agentid self.receivers = receivers self.base_url = "https://qyapi.weixin.qq.com/cgi-bin" + self.logger = get_logger() def send_validation_failure_notification(self, invalid_records: List[Dict]) -> bool: """ @@ -137,16 +139,54 @@ class WeWorkNotifier: response = requests.post(url, params=params, json=data, timeout=10) response_data = response.json() + # 准备日志记录数据 + log_request_data = { + "url": url, + "method": "POST", + "data": { + "touser": self.receivers, + "msgtype": "text", + "agentid": int(self.agentid), + "text": {"content": content[:100] + "..." if len(content) > 100 else content} + } + } + # 检查返回结果 if response_data.get("errcode") == 0: print(f" ✓ 企业微信消息发送成功") + # 记录API调用日志(成功) + self.logger.log_api_call( + api_type="wework", + operation="wework/send_message", + request_data=log_request_data, + response_data=response_data, + success=True + ) return True else: print(f" ✗ 企业微信消息发送失败") print(f" 错误码: {response_data.get('errcode')}") print(f" 错误信息: {response_data.get('errmsg')}") + # 记录API调用日志(失败) + self.logger.log_api_call( + api_type="wework", + operation="wework/send_message", + request_data=log_request_data, + response_data=response_data, + success=False, + error_message=response_data.get('errmsg', '未知错误') + ) return False except Exception as e: print(f" ✗ 企业微信消息发送异常: {str(e)}") + # 记录API调用日志(异常) + self.logger.log_api_call( + api_type="wework", + operation="wework/send_message", + request_data={"url": url, "method": "POST"}, + response_data={}, + success=False, + error_message=str(e) + ) return False diff --git a/test.png b/test.png new file mode 100644 index 0000000..2277b2c Binary files /dev/null and b/test.png differ diff --git a/test_attachment.py b/test_attachment.py new file mode 100644 index 0000000..6b445c0 --- /dev/null +++ b/test_attachment.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +附件功能测试脚本 +用于测试附件下载和上传功能 +""" + +import sys +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from src.config import ConfigManager +from src.token_manager import TokenManager +from src.attachment_handler import AttachmentHandler +from src.tapd_api import TAPDApi + + +def test_attachment_handler(): + """测试附件处理器""" + print("\n" + "=" * 60) + print("测试1:附件处理器初始化") + print("=" * 60) + + try: + # 加载配置 + config_manager = ConfigManager() + attachment_config = config_manager.get_attachment_config() + + print(f"✓ 附件配置加载成功:") + print(f" 临时目录: {attachment_config['temp_dir']}") + print(f" 最大文件大小: {attachment_config['max_file_size']}MB") + print(f" 下载超时: {attachment_config['download_timeout']}秒") + print(f" 上传超时: {attachment_config['upload_timeout']}秒") + print(f" 下载重试次数: {attachment_config['download_retry']}") + print(f" 上传重试次数: {attachment_config['upload_retry']}") + + # 获取access_token + token_manager = TokenManager() + access_token = token_manager.get_token() + print(f"\n✓ access_token获取成功") + + # 初始化附件处理器 + handler = AttachmentHandler(access_token, attachment_config) + print(f"\n✓ 附件处理器初始化成功") + + return True + + except Exception as e: + print(f"\n✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_tapd_upload(): + """测试TAPD附件上传功能""" + print("\n" + "=" * 60) + print("测试2:TAPD附件上传API") + print("=" * 60) + + try: + # 加载配置 + config_manager = ConfigManager() + tapd_config = config_manager.get_tapd_config() + attachment_config = config_manager.get_attachment_config() + + # 初始化TAPD API + tapd_api = TAPDApi(tapd_config['workspace_id'], test_mode=False) + print(f"✓ TAPD API初始化成功") + + # 检查是否有测试文件 + test_file = Path("test.png") + if not test_file.exists(): + print(f"\n⚠ 测试文件不存在: {test_file}") + print(f" 请在项目根目录放置一个test.png文件用于测试") + return False + + print(f"\n✓ 找到测试文件: {test_file}") + print(f" 文件大小: {test_file.stat().st_size / 1024:.2f}KB") + + # 提示用户输入bug_id + print(f"\n请输入一个已存在的bug ID用于测试附件上传:") + print(f"(如果不想测试上传,直接按回车跳过)") + bug_id = input("Bug ID: ").strip() + + if not bug_id: + print(f"\n⚠ 跳过上传测试") + return True + + # 测试上传 + print(f"\n开始测试上传附件到bug {bug_id}...") + result = tapd_api.upload_attachment( + str(test_file), + bug_id, + max_size=attachment_config['max_file_size'] * 1024 * 1024, + upload_timeout=attachment_config['upload_timeout'], + max_retries=attachment_config['upload_retry'] + ) + + if result['success']: + print(f"\n✓ 上传测试成功!") + print(f" 附件ID: {result['attachment_id']}") + return True + else: + print(f"\n✗ 上传测试失败: {result['error_message']}") + return False + + except Exception as e: + print(f"\n✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """主函数""" + print("=" * 60) + print("附件功能测试") + print("=" * 60) + + # 测试1:附件处理器 + test1_passed = test_attachment_handler() + + # 测试2:TAPD上传 + test2_passed = test_tapd_upload() + + # 显示测试结果 + print("\n" + "=" * 60) + print("测试结果汇总") + print("=" * 60) + print(f"测试1 - 附件处理器初始化: {'✓ 通过' if test1_passed else '✗ 失败'}") + print(f"测试2 - TAPD附件上传API: {'✓ 通过' if test2_passed else '✗ 失败'}") + print("=" * 60) + + if test1_passed and test2_passed: + print("\n✓ 所有测试通过!") + return 0 + else: + print("\n✗ 部分测试失败,请检查错误信息") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/test_full_flow.py b/test_full_flow.py new file mode 100644 index 0000000..f2db492 --- /dev/null +++ b/test_full_flow.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +完整流程测试脚本 +测试从智能表格读取→下载附件→创建bug→上传附件→回写结果的完整流程 +""" + +import sys +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from src.main import main + +if __name__ == "__main__": + print("=" * 80) + print("完整流程测试") + print("=" * 80) + print("\n提示:这将运行一次完整的开单流程") + print("请确保:") + print("1. 智能表格中有待开单的记录") + print("2. 记录包含附件字段") + print("3. 环境变量已正确设置") + print("\n按回车继续,或Ctrl+C取消...") + input() + + sys.exit(main())