'phase10-attachment-and-html-describe'

This commit is contained in:
zelong 2026-01-05 16:11:40 +08:00
parent e958f73af0
commit 8b9a96118a
18 changed files with 2381 additions and 46 deletions

View File

@ -1,6 +1,6 @@
**文档内地址为tapd-api.bilibili.co/tapd请修改为公司地址 https://tapd-api.bilibili.co/tapd**
**文档内地址为tapd-api.bilibili.co/tapd请修改为公司地址 https://tapd-api.bilibili.co/tapd/**
# 过滤器FilterSpec
@ -572,32 +572,32 @@ curl -u 'api_user:api_password' -d 'name=story_created_by_api&workspace_id=10158
返回缺陷所有字段及候选值(枚举值),即通常理解的字段的 "英文Key" 和 "中文值".
# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#url)url
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#url)url
```
https://tapd-api.bilibili.co/bugs/get_fields_info
```
# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#支持格式)支持格式
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#支持格式)支持格式
JSON/XML默认JSON格式
# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#http请求方式)HTTP请求方式
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#http请求方式)HTTP请求方式
GET
# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求数限制)请求数限制
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求数限制)请求数限制
默认返回所有数据
# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求参数)请求参数
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#请求参数)请求参数
| 字段名 | 必选 | 类型及范围 | 说明 | 特殊规则 |
| :----------: | :--: | :--------: | :----------------------------------------------------------: | :------: |
| workspace_id | `是` | integer | 项目ID | |
| all_options | 否 | integer | 是否也返回已关闭的选项。all_options=1 则返回。默认是 0不返回与TAPD界面对齐 | |
# [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#调用示例及返回结果)调用示例及返回结果
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#调用示例及返回结果)调用示例及返回结果
## [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/bug/get_bug_fields_info.html#获取项目下的缺陷字段)获取项目下的缺陷字段
@ -2527,4 +2527,194 @@ curl -u 'api_user:api_password' 'https://tapd-api.bilibili.co/tapd/bugs?workspac
],
"info": "success"
}
```
```
# 附件
## 获取附件
### 说明
返回符合查询条件的所有的附件
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#url)url
```
https://tapd-api.bilibili.co/tapd/attachments
```
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#支持格式)支持格式
JSON/XML默认JSON格式
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#http请求方式)HTTP请求方式
GET
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#请求数限制)请求数限制
默认返回 30 条。可通过传 limit 参数设置,最大取 200。也可以传 page 参数翻页
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#请求参数)请求参数
| 字段名 | 必选 | 类型及范围 | 说明 | 特殊规则 |
| :----------: | :--: | :--------: | :--------: | :------: |
| workspace_id | `是` | integer | 项目ID | |
| id | 否 | integer | ID | |
| type | 否 | string | 类型 | |
| entry_id | 否 | integer | 依赖对象ID | |
| filename | 否 | integer | 附件名称 | |
| owner | 否 | string | 上传人 | |
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#调用示例及返回结果)调用示例及返回结果
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#curl-使用-basic-auth-鉴权调用示例)curl 使用 Basic Auth 鉴权调用示例
```
curl -u 'api_user:api_password' 'https://api.tapd.cn/attachments?workspace_id=10104801'
```
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#返回结果)返回结果
```json
{
"status": 1,
"data": [
{
"Attachment": {
"id": "1210104801000028203",
"type": "wiki_description",
"entry_id": "6100014242115511668",
"filename": "OneDrive.mp4",
"content_type": "video/mp4",
"created": "2021-04-08 15:51:27",
"workspace_id": "10104801",
"owner": "anyechen"
}
},
{
"Attachment": {
"id": "1210104801000002153",
"type": "wiki",
"entry_id": "1210104801000017645",
"filename": "raingeek.jpg",
"content_type": "image/jpeg",
"created": "2017-11-23 17:01:36",
"workspace_id": "10104801",
"owner": "anyechen"
}
}
],
"info": "success"
}
```
### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#附件字段说明)附件字段说明
#### [#](https://open.tapd.cn/document/api-doc/API文档/api_reference/attachment/get_attachments.html#附件重要字段说明)附件重要字段说明
| 字段 | 说明 |
| :----------: | :--------: |
| id | 附件ID |
| type | 类型 |
| entry_id | 依赖对象ID |
| created | 创建时间 |
| filename | 附件名称 |
| content_type | 内容类型 |
| workspace_id | 项目ID |
| owner | 上传人 |
.
## 上传附件
### 接口概览
- **API 地址**: `https://tapd-api.bilibili.co/tapd/files/upload_attachment`
- **请求方式**: `POST`
- **Content-Type**: `multipart/form-data`
### **请求参数**
| 字段名 | 是否必填 | 类型及范围 | 说明 |
| ------------ | -------- | ---------- | ---------------- |
| workspace_id | 是 | integer | 项目ID |
| file | 是 | 文件 | 文件 |
| type | 是 | string | story/bug/task |
| entry_id | 是 | integer | 需求/缺陷/任务ID |
| owner | 否 | string | 附件创建人昵称 |
| overwrite | 否 | bool | 同名文件是否覆盖 |
### **调用示例及返回结果**
#### **把本地的 uu.jpg 上传到项目 755**
### **curl 使用 Basic Auth 鉴权调用示例**
```
curl -u 'api_user:api_password' -F 'workspace_id=755' -F 'type=task' -F 'entry_id=1000000755859140551' -F 'file=@uu.jpg' 'api.tapd.cn/files/upload_attachment'
```
### **curl 使用 OAuth Access Token 鉴权调用示例**
```
curl -H 'Authorization: Bearer ACCESS_TOKEN' -F 'workspace_id=755' -F 'type=task' -F 'entry_id=1000000755859140551' -F 'file=@uu.jpg' 'api.tapd.cn/files/upload_attachment'
```
### **返回结果**
```
{
"status": 1,
"data": {
"Attachment": {
"id": "1000000755503455439",
"type": "task",
"entry_id": 1000000755859140551,
"filename": "uu.jpg",
"description": "",
"content_type": "image/jpeg",
"created": "2021-09-07 21:36:08",
"workspace_id": 755,
"owner": ""
}
},
"info": "success"
}
```
### **字段说明**
#### **返回字段说明**
| 字段 | 说明 |
| ------------ | ---------------------------- |
| type | 业务对象类型stroy/bug/task |
| entry_id | 业务对象ID |
| filename | 文件名 |
| description | 附件描述 |
| content_type | 文件类型 |
| created | 创建时间 |
| workspace_id | 项目ID |
| owner | 附件上传人 |

View File

@ -14,6 +14,20 @@ docid = dcRybSHojZR9-b5ePgDp33yr29bQy6BtQiVJ-nSGUM-ot6FSpq-TGW9jEn_f7ORLcFWRj9zv
[Schedule]
# 开单扫描频率(分钟)
scan_interval = 60
scan_interval = 30
# bug状态同步频率分钟
sync_interval = 60
sync_interval = 30
[Attachment]
# 临时文件目录(相对于项目根目录)
temp_dir = temp/attachments
# 单个文件大小限制MB
max_file_size = 150
# 下载超时时间(秒)
download_timeout = 60
# 上传超时时间(秒)
upload_timeout = 120
# 下载失败重试次数
download_retry = 3
# 上传失败重试次数
upload_retry = 3

View File

@ -1,4 +1,4 @@
{
"access_token": "t7JYQBuy7p3Wg_hRj6xSchih9NRzuactFsxY7U3CMJG3Tkt5cm-BUvO0-e74oMUbZG5s4ouahAdB-AHT2eSljlrD1YzjOnxAvzbOjBFrGT-yEArT9fU6mli8uhBmBk7BteKfHZStZ7d3cs1bGSTnjkNS5vVpyxUu4gcLT8Ya1S9DcMdCBrKnBwmLDcoI2_TkLvKhW67b_zo2EVa0erZhHlL2klT0ajyVhwfB0aeXhF8",
"fetch_time": 1766572400.099746
"access_token": "SPFfB9jMxleGiJn76FQH6v5pploseAJXTCVkTVVxl1_PEmHZJiqXsNGpEyGAK4qmYe3WRxYJ57xgCQLRCHopVfeoDfP87IgxVCytCqQABGES5ndG05SVkrI-9evg8Z4kbstlsiRMmfPGGGoNUgL1kUoZc2No0FYytm8FTfulnAXfiTExzoF8OCTdEPc9mA0g8JKFhlkiS2F0agBESS_2_ewbcZvA0i44-ChTKRBdRa0",
"fetch_time": 1767599732.4166858
}

View File

@ -13,38 +13,47 @@ from pathlib import Path
class APILogger:
"""API调用日志记录器"""
def __init__(self, log_file: Optional[str] = None):
def __init__(self, log_dir: Optional[str] = None):
"""
初始化日志记录器
Args:
log_file: 日志文件路径如果为None则使用默认路径
log_dir: 日志目录路径如果为None则使用默认路径
"""
if log_file is None:
# 默认路径:项目根目录/logs/api_log.json
if log_dir is None:
# 默认路径:项目根目录/logs/
project_root = Path(__file__).parent.parent
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
self.log_file = log_dir / "api_log.json"
self.log_dir = project_root / "logs"
else:
self.log_file = Path(log_file)
self.log_dir = Path(log_dir)
# 确保日志文件存在
self._init_log_file()
# 确保日志目录存在
self.log_dir.mkdir(exist_ok=True)
def _init_log_file(self):
"""初始化日志文件"""
if not self.log_file.exists():
with open(self.log_file, 'w', encoding='utf-8') as f:
def _get_today_log_file(self) -> Path:
"""
获取今天的日志文件路径
Returns:
Path: 今天的日志文件路径格式api_log_YYYY-MM-DD.json
"""
today = datetime.now().strftime("%Y-%m-%d")
log_file = self.log_dir / f"api_log_{today}.json"
# 如果文件不存在,初始化
if not log_file.exists():
with open(log_file, 'w', encoding='utf-8') as f:
json.dump({"records": []}, f, ensure_ascii=False, indent=2)
return log_file
def log_api_call(self, api_type: str, operation: str,
request_data: Dict[str, Any],
response_data: Dict[str, Any],
success: bool = True,
error_message: Optional[str] = None):
"""
记录API调用
记录API调用使用追加式写入避免读取整个文件
Args:
api_type: API类型 "smartsheet", "tapd", "wework"
@ -55,11 +64,10 @@ class APILogger:
error_message: 错误信息如果失败
"""
try:
# 读取现有记录
with open(self.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 获取今天的日志文件
log_file = self._get_today_log_file()
# 添加新记录
# 构造新记录
record = {
"api_type": api_type,
"operation": operation,
@ -72,11 +80,24 @@ class APILogger:
if error_message:
record["error_message"] = error_message
log_data["records"].append(record)
# 使用追加式写入
with open(log_file, 'r+', encoding='utf-8') as f:
# 定位到文件末尾
f.seek(0, 2)
file_size = f.tell()
# 写回文件
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
if file_size == 0:
# 空文件,写入初始结构
f.write('{"records": [\n')
f.write(json.dumps(record, ensure_ascii=False, indent=2))
f.write('\n]}')
else:
# 回退到最后的 ]}
f.seek(file_size - 3)
# 添加逗号和新记录
f.write(',\n')
f.write(json.dumps(record, ensure_ascii=False, indent=2))
f.write('\n]}')
except Exception as e:
# 日志记录失败不应该影响主流程
@ -107,6 +128,7 @@ if __name__ == "__main__":
logger = APILogger()
# 测试记录一个成功的API调用
print("测试1: 记录成功的API调用...")
logger.log_api_call(
api_type="smartsheet",
operation="get_records",
@ -122,6 +144,27 @@ if __name__ == "__main__":
},
success=True
)
print("✓ 成功记录API调用")
print(f"日志文件: {logger.log_file}")
# 测试记录一个失败的API调用
print("\n测试2: 记录失败的API调用...")
logger.log_api_call(
api_type="tapd",
operation="create_bug",
request_data={
"url": "https://api.tapd.cn/bugs",
"method": "POST",
"data": {"title": "测试bug"}
},
response_data={
"status": 0,
"info": "参数错误"
},
success=False,
error_message="缺少必填参数workspace_id"
)
print("✓ 成功记录失败的API调用")
log_file = logger._get_today_log_file()
print(f"\n日志文件: {log_file}")
print(f"日志目录: {logger.log_dir}")

View File

@ -586,6 +586,333 @@ class TAPDAPITester:
traceback.print_exc()
return None
def upload_attachment(self, file_path, entry_type, entry_id, owner=None, overwrite=False):
"""
上传附件到TAPD
根据TAPD官方文档实现https://tapd-api.bilibili.co/tapd/files/upload_attachment
Args:
file_path: 本地文件路径
entry_type: 业务对象类型必填可选值: story/bug/task
entry_id: 业务对象ID需求/缺陷/任务ID必填
owner: 附件创建人昵称 (可选)
overwrite: 同名文件是否覆盖 (可选默认False)
Returns:
dict: 上传成功返回附件信息失败返回None
"""
print("\n=== 上传附件到TAPD ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 验证必需参数
if not entry_type:
print("✗ entry_type 参数不能为空")
print(" 可选值: story/bug/task")
return None
if entry_type not in ['story', 'bug', 'task']:
print(f"✗ entry_type 参数值无效: {entry_type}")
print(" 可选值: story/bug/task")
return None
if not entry_id:
print("✗ entry_id 参数不能为空")
print(" 需要提供需求/缺陷/任务的ID")
return None
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"✗ 文件不存在: {file_path}")
return None
# 获取文件信息
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
content_type = self._get_content_type(filename)
print(f"\n【文件信息】")
print(f" 文件名: {filename}")
print(f" 文件路径: {file_path}")
print(f" 文件大小: {file_size} 字节 ({file_size/1024:.2f} KB)")
print(f" Content-Type: {content_type}")
# 使用正确的API端点
url = f"{self.base_url}/files/upload_attachment"
# 准备表单数据(按照文档要求)
data = {
'workspace_id': str(self.workspace_id),
'type': entry_type,
'entry_id': str(entry_id)
}
# 添加可选参数
if owner:
data['owner'] = owner
if overwrite:
data['overwrite'] = 'true' if overwrite else 'false'
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n【请求信息】")
print(f" 请求URL: {url}")
print(f" 请求方法: POST")
print(f" Content-Type: multipart/form-data")
print(f" 认证方式: Basic Auth")
print(f" 认证用户: {self.api_user}")
print(f"\n【表单数据】")
print(f" workspace_id: {self.workspace_id}")
print(f" type: {entry_type}")
print(f" entry_id: {entry_id}")
if owner:
print(f" owner: {owner}")
if overwrite:
print(f" overwrite: {overwrite}")
print(f" file: {filename} ({content_type})")
# 打开文件并上传
print(f"\n【正在上传】")
print(f" 请稍候...")
with open(file_path, 'rb') as f:
files = {
'file': (filename, f, content_type)
}
response = requests.post(url, auth=auth, files=files, data=data, timeout=60)
# 输出HTTP响应信息
print(f"\n【HTTP响应】")
print(f" 状态码: {response.status_code}")
print(f" 原因: {response.reason}")
print(f" 响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
# 输出原始响应内容
print(f"\n【原始响应内容】")
print(f" {response.text}")
response_data = response.json()
# 输出解析后的JSON响应
print(f"\n【解析后的JSON响应】")
print(json.dumps(response_data, ensure_ascii=False, indent=2))
# 记录API调用
request_data = {
"url": url,
"method": "POST",
"data": {
"workspace_id": self.workspace_id,
"type": entry_type,
"entry_id": entry_id,
"owner": owner,
"overwrite": overwrite,
"filename": filename,
"file_size": file_size,
"content_type": content_type
},
"auth_user": self.api_user
}
self._log_api_call("upload_attachment", request_data, response_data)
# 检查返回结果
print(f"\n【结果分析】")
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 文件上传成功")
# 显示上传结果
if isinstance(data, dict):
attachment_info = data.get('Attachment', {})
att_id = attachment_info.get('id', '(无ID)')
att_filename = attachment_info.get('filename', '(无文件名)')
att_type = attachment_info.get('type', '(无类型)')
att_entry_id = attachment_info.get('entry_id', '(无entry_id)')
att_owner = attachment_info.get('owner', '(未知)')
att_created = attachment_info.get('created', '(未知)')
print(f" 附件ID: {att_id}")
print(f" 文件名: {att_filename}")
print(f" 业务对象类型: {att_type}")
print(f" 业务对象ID: {att_entry_id}")
print(f" 上传人: {att_owner}")
print(f" 创建时间: {att_created}")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_upload_result.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 上传结果已保存到: {output_file}")
return data
else:
print(f"\n✗ 上传失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def _get_content_type(self, filename):
"""
根据文件扩展名获取Content-Type
Args:
filename: 文件名
Returns:
str: Content-Type
"""
import mimetypes
content_type, _ = mimetypes.guess_type(filename)
return content_type or 'application/octet-stream'
def get_attachments(self, entry_id=None, attachment_type=None, filename=None, owner=None, limit=30):
"""
获取TAPD附件列表
Args:
entry_id: 依赖对象ID (可选)
attachment_type: 附件类型 (可选)
filename: 附件名称 (可选)
owner: 上传人 (可选)
limit: 返回数量限制默认30最大200
Returns:
list: 附件列表失败返回None
"""
print("\n=== 获取TAPD附件列表 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 使用公司地址
url = f"{self.base_url}/attachments"
params = {
"workspace_id": self.workspace_id,
"limit": limit
}
# 添加可选参数
if entry_id:
params["entry_id"] = entry_id
if attachment_type:
params["type"] = attachment_type
if filename:
params["filename"] = filename
if owner:
params["owner"] = owner
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
if entry_id:
print(f" entry_id: {entry_id}")
if attachment_type:
print(f" type: {attachment_type}")
if filename:
print(f" filename: {filename}")
if owner:
print(f" owner: {owner}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_attachments", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", [])
print(f"\n✓ 成功获取附件列表")
# 显示附件统计
if isinstance(data, list):
attachment_count = len(data)
print(f"{attachment_count} 个附件")
if attachment_count > 0:
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_attachments.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 附件列表已保存到: {output_file}")
# 显示附件信息
print(f"\n附件列表预览:")
print("-" * 80)
for idx, item in enumerate(data[:10], 1):
attachment = item.get('Attachment', {})
att_id = attachment.get('id', '(无ID)')
att_filename = attachment.get('filename', '(无文件名)')
att_type = attachment.get('type', '(无类型)')
att_content_type = attachment.get('content_type', '(未知)')
att_owner = attachment.get('owner', '(未知)')
att_created = attachment.get('created', '(未知)')
print(f" {idx}. {att_filename}")
print(f" ID: {att_id}")
print(f" 类型: {att_type} | 内容类型: {att_content_type}")
print(f" 上传人: {att_owner} | 创建时间: {att_created}")
if attachment_count > 10:
print(f" ... 还有 {attachment_count - 10} 个附件,详见输出文件")
print("-" * 80)
else:
print(" 没有找到符合条件的附件")
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def print_menu():
"""打印菜单"""
@ -600,8 +927,10 @@ def print_menu():
print("5. 发送应用消息")
print("\n【TAPD API】")
print("6. 获取缺陷字段配置")
print("7. 获取附件列表")
print("8. 上传附件")
print("\n【其他】")
print("7. 查看日志文件")
print("9. 查看日志文件")
print("0. 退出")
print("="*50)
@ -613,7 +942,7 @@ def main():
while True:
print_menu()
choice = input("\n请选择操作 (0-7): ").strip()
choice = input("\n请选择操作 (0-9): ").strip()
if choice == "0":
print("\n感谢使用,再见!")
@ -689,6 +1018,88 @@ def main():
tapd_tester.get_bug_custom_fields()
elif choice == "7":
# 获取TAPD附件列表
print("\n=== 获取附件列表 ===")
print("是否需要添加筛选条件?")
print("1. 获取所有附件(默认)")
print("2. 按依赖对象ID筛选")
print("3. 按附件类型筛选")
print("4. 按文件名筛选")
print("5. 按上传人筛选")
filter_choice = input("请选择 (直接回车默认为1): ").strip()
entry_id = None
attachment_type = None
filename = None
owner = None
if filter_choice == "2":
entry_id = input("请输入依赖对象ID: ").strip()
elif filter_choice == "3":
attachment_type = input("请输入附件类型: ").strip()
elif filter_choice == "4":
filename = input("请输入文件名: ").strip()
elif filter_choice == "5":
owner = input("请输入上传人: ").strip()
# 询问返回数量
limit_input = input("请输入返回数量限制 (直接回车默认为30最大200): ").strip()
limit = int(limit_input) if limit_input else 30
tapd_tester.get_attachments(
entry_id=entry_id,
attachment_type=attachment_type,
filename=filename,
owner=owner,
limit=limit
)
elif choice == "8":
# 上传附件到TAPD
print("\n=== 上传附件 ===")
file_path = input("请输入文件路径: ").strip()
if not file_path:
print("✗ 文件路径不能为空")
continue
# 询问业务对象类型(必填)
print("\n业务对象类型:")
print(" story - 需求")
print(" bug - 缺陷")
print(" task - 任务")
entry_type = input("请选择业务对象类型 (story/bug/task): ").strip().lower()
if entry_type not in ['story', 'bug', 'task']:
print("✗ 业务对象类型无效,必须是 story/bug/task 之一")
continue
# 询问业务对象ID必填
type_name_map = {'story': '需求', 'bug': '缺陷', 'task': '任务'}
entry_id = input(f"请输入{type_name_map[entry_type]}ID: ").strip()
if not entry_id:
print("✗ 业务对象ID不能为空")
continue
# 询问是否指定上传人(可选)
owner_input = input("请输入上传人用户名 (直接回车使用API调用者): ").strip()
owner = owner_input if owner_input else None
# 询问是否覆盖同名文件(可选)
overwrite_input = input("是否覆盖同名文件? (y/n直接回车默认为n): ").strip().lower()
overwrite = True if overwrite_input == 'y' else False
tapd_tester.upload_attachment(
file_path=file_path,
entry_type=entry_type,
entry_id=entry_id,
owner=owner,
overwrite=overwrite
)
elif choice == "9":
print("\n=== 查看日志文件 ===")
try:
with open(wework_tester.log_file, 'r', encoding='utf-8') as f:

300
src/attachment_handler.py Normal file
View File

@ -0,0 +1,300 @@
"""
附件处理模块
负责从智能表格下载附件到本地临时目录
"""
import os
import requests
import time
import shutil
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from src.api_logger import get_logger
class AttachmentHandler:
"""附件处理器"""
def __init__(self, access_token: str, config: Dict):
"""
初始化附件处理器
Args:
access_token: 企业微信access_token
config: 附件配置字典
"""
self.access_token = access_token
self.config = config
self.logger = get_logger()
# 从配置中读取参数
self.temp_dir = config.get('temp_dir', 'temp/attachments')
self.max_file_size = config.get('max_file_size', 150) * 1024 * 1024 # 转换为字节
self.download_timeout = config.get('download_timeout', 60)
self.download_retry = config.get('download_retry', 3)
# 确保临时目录存在
self._ensure_temp_dir()
print(f" ✓ 附件处理器初始化完成")
print(f" 临时目录: {self.temp_dir}")
print(f" 最大文件大小: {self.max_file_size / 1024 / 1024:.0f}MB")
print(f" 下载超时: {self.download_timeout}")
print(f" 下载重试次数: {self.download_retry}")
def _ensure_temp_dir(self):
"""确保临时目录存在"""
temp_path = Path(self.temp_dir)
if not temp_path.exists():
temp_path.mkdir(parents=True, exist_ok=True)
print(f" ✓ 创建临时目录: {self.temp_dir}")
def _get_record_temp_dir(self, record_id: str) -> Path:
"""
获取记录的临时目录
Args:
record_id: 记录ID
Returns:
Path: 记录的临时目录路径
"""
record_dir = Path(self.temp_dir) / record_id
if not record_dir.exists():
record_dir.mkdir(parents=True, exist_ok=True)
return record_dir
def _check_file_size(self, file_url: str) -> Tuple[bool, int]:
"""
检查文件大小通过HEAD请求
Args:
file_url: 文件URL
Returns:
Tuple[bool, int]: (是否通过检查, 文件大小)
"""
try:
response = requests.head(file_url, timeout=10)
content_length = response.headers.get('Content-Length')
if content_length:
file_size = int(content_length)
if file_size > self.max_file_size:
return (False, file_size)
return (True, file_size)
else:
# 如果无法获取文件大小,允许下载(在下载时再检查)
return (True, 0)
except Exception as e:
print(f" ⚠ 无法检查文件大小: {e}")
# 检查失败,允许下载(在下载时再检查)
return (True, 0)
def _download_single_file(self, file_url: str, save_path: Path, filename: str) -> Tuple[bool, str]:
"""
下载单个文件带重试机制
Args:
file_url: 文件URL
save_path: 保存路径
filename: 文件名
Returns:
Tuple[bool, str]: (是否成功, 错误信息或文件路径)
"""
for attempt in range(1, self.download_retry + 1):
try:
if attempt > 1:
print(f" → 重试下载 ({attempt}/{self.download_retry}): {filename}")
time.sleep(attempt * 2) # 递增等待时间
# 发起下载请求
response = requests.get(file_url, stream=True, timeout=self.download_timeout)
response.raise_for_status()
# 写入文件
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# 检查下载后的文件大小
downloaded_size = save_path.stat().st_size
if downloaded_size > self.max_file_size:
save_path.unlink() # 删除超大文件
error_msg = f"文件大小超过限制: {downloaded_size / 1024 / 1024:.2f}MB > {self.max_file_size / 1024 / 1024:.0f}MB"
return (False, error_msg)
return (True, str(save_path))
except requests.exceptions.Timeout:
error_msg = f"下载超时 (>{self.download_timeout}秒)"
if attempt == self.download_retry:
return (False, error_msg)
except requests.exceptions.RequestException as e:
error_msg = f"下载失败: {e}"
if attempt == self.download_retry:
return (False, error_msg)
except Exception as e:
error_msg = f"未预期的错误: {e}"
if attempt == self.download_retry:
return (False, error_msg)
return (False, "下载失败(已达最大重试次数)")
def download_attachments(self, record_data: Dict, record_id: str) -> Dict:
"""
批量下载附件
Args:
record_data: 记录数据
record_id: 记录ID
Returns:
Dict: 下载结果包含successfailed_filesdownloaded_files
"""
print(f"\n → 开始下载附件...")
print(f" [DEBUG] 记录ID: {record_id}")
# 获取附件字段的值
attachment_value = record_data.get('附件(截图、录屏、日志)')
print(f" [DEBUG] 附件字段值类型: {type(attachment_value)}")
print(f" [DEBUG] 附件字段值: {attachment_value}")
# 如果附件字段为空,返回成功(无附件需要下载)
if not attachment_value:
print(f" ✓ 附件字段为空,跳过下载")
return {
'success': True,
'downloaded_files': [],
'failed_files': [],
'error_message': None
}
# 解析附件字段值提取文件URL列表
# 根据智能表格API文档附件字段的值是一个列表
file_urls = []
if isinstance(attachment_value, list):
print(f" [DEBUG] 附件字段是列表,长度: {len(attachment_value)}")
for idx, item in enumerate(attachment_value):
print(f" [DEBUG] 附件项 {idx}: 类型={type(item)}, 值={item}")
# 提取文件URL具体字段名需要根据实际API返回确认
if isinstance(item, dict):
file_url = item.get('url') or item.get('link') or item.get('file_url')
filename = item.get('name') or item.get('filename') or f"attachment_{len(file_urls)}"
print(f" [DEBUG] 提取结果: url={file_url}, filename={filename}")
if file_url:
file_urls.append({'url': file_url, 'name': filename})
else:
print(f" [DEBUG] 附件字段不是列表,无法解析")
if not file_urls:
print(f" ⚠ 无法解析附件字段未找到文件URL")
print(f" [DEBUG] 附件字段完整内容: {attachment_value}")
return {
'success': False,
'downloaded_files': [],
'failed_files': [],
'error_message': "无法解析附件字段"
}
print(f" 找到 {len(file_urls)} 个附件")
# 获取记录的临时目录
record_dir = self._get_record_temp_dir(record_id)
# 下载所有附件
downloaded_files = []
failed_files = []
for idx, file_info in enumerate(file_urls, 1):
file_url = file_info['url']
filename = file_info['name']
print(f"\n [{idx}/{len(file_urls)}] 下载: {filename}")
# 检查文件大小
size_ok, file_size = self._check_file_size(file_url)
if not size_ok:
error_msg = f"文件大小超过限制: {file_size / 1024 / 1024:.2f}MB"
print(f"{error_msg}")
failed_files.append({'filename': filename, 'error': error_msg})
continue
if file_size > 0:
print(f" 文件大小: {file_size / 1024:.2f}KB")
# 处理文件名冲突
save_path = record_dir / filename
counter = 1
while save_path.exists():
name_parts = filename.rsplit('.', 1)
if len(name_parts) == 2:
new_filename = f"{name_parts[0]}({counter}).{name_parts[1]}"
else:
new_filename = f"{filename}({counter})"
save_path = record_dir / new_filename
counter += 1
# 下载文件
success, result = self._download_single_file(file_url, save_path, filename)
if success:
print(f" ✓ 下载成功: {save_path.name}")
downloaded_files.append(str(save_path))
# 记录API调用日志
self.logger.log_api_call(
api_type="smartsheet",
operation="download_attachment",
request_data={'url': file_url, 'filename': filename},
response_data={'saved_path': str(save_path), 'size': save_path.stat().st_size},
success=True
)
else:
print(f" ✗ 下载失败: {result}")
failed_files.append({'filename': filename, 'error': result})
# 记录API调用日志
self.logger.log_api_call(
api_type="smartsheet",
operation="download_attachment",
request_data={'url': file_url, 'filename': filename},
response_data={},
success=False,
error_message=result
)
# 返回结果
if len(failed_files) > 0:
print(f"\n ⚠ 下载完成: 成功 {len(downloaded_files)} 个, 失败 {len(failed_files)}")
return {
'success': False,
'downloaded_files': downloaded_files,
'failed_files': failed_files,
'error_message': f"部分附件下载失败 ({len(failed_files)}/{len(file_urls)})"
}
else:
print(f"\n ✓ 所有附件下载成功 ({len(downloaded_files)} 个)")
return {
'success': True,
'downloaded_files': downloaded_files,
'failed_files': [],
'error_message': None
}
def cleanup_temp_files(self, record_id: str):
"""
清理记录的临时文件
Args:
record_id: 记录ID
"""
try:
record_dir = Path(self.temp_dir) / record_id
if record_dir.exists():
shutil.rmtree(record_dir)
print(f" ✓ 清理临时文件: {record_dir}")
except Exception as e:
print(f" ⚠ 清理临时文件失败: {e}")

View File

@ -185,6 +185,112 @@ class ConfigManager:
return result
def get_attachment_config(self):
"""
获取附件配置
Returns:
dict: 包含附件相关配置的字典
Raises:
ValueError: 配置值非法时抛出
"""
# 默认值
default_temp_dir = "temp/attachments"
default_max_file_size = 150
default_download_timeout = 60
default_upload_timeout = 120
default_download_retry = 3
default_upload_retry = 3
# 如果没有Attachment节返回默认值
if not self.config.has_section('Attachment'):
return {
'temp_dir': default_temp_dir,
'max_file_size': default_max_file_size,
'download_timeout': default_download_timeout,
'upload_timeout': default_upload_timeout,
'download_retry': default_download_retry,
'upload_retry': default_upload_retry
}
# 读取temp_dir配置
if not self.config.has_option('Attachment', 'temp_dir'):
temp_dir = default_temp_dir
else:
temp_dir = self.config.get('Attachment', 'temp_dir').strip()
if not temp_dir:
temp_dir = default_temp_dir
# 读取max_file_size配置
if not self.config.has_option('Attachment', 'max_file_size'):
max_file_size = default_max_file_size
else:
max_file_size_str = self.config.get('Attachment', 'max_file_size').strip()
try:
max_file_size = int(max_file_size_str)
except ValueError:
raise ValueError(f"max_file_size配置项必须为整数当前值: {max_file_size_str}")
if max_file_size <= 0:
raise ValueError(f"max_file_size配置项必须为正整数当前值: {max_file_size}")
# 读取download_timeout配置
if not self.config.has_option('Attachment', 'download_timeout'):
download_timeout = default_download_timeout
else:
download_timeout_str = self.config.get('Attachment', 'download_timeout').strip()
try:
download_timeout = int(download_timeout_str)
except ValueError:
raise ValueError(f"download_timeout配置项必须为整数当前值: {download_timeout_str}")
if download_timeout <= 0:
raise ValueError(f"download_timeout配置项必须为正整数当前值: {download_timeout}")
# 读取upload_timeout配置
if not self.config.has_option('Attachment', 'upload_timeout'):
upload_timeout = default_upload_timeout
else:
upload_timeout_str = self.config.get('Attachment', 'upload_timeout').strip()
try:
upload_timeout = int(upload_timeout_str)
except ValueError:
raise ValueError(f"upload_timeout配置项必须为整数当前值: {upload_timeout_str}")
if upload_timeout <= 0:
raise ValueError(f"upload_timeout配置项必须为正整数当前值: {upload_timeout}")
# 读取download_retry配置
if not self.config.has_option('Attachment', 'download_retry'):
download_retry = default_download_retry
else:
download_retry_str = self.config.get('Attachment', 'download_retry').strip()
try:
download_retry = int(download_retry_str)
except ValueError:
raise ValueError(f"download_retry配置项必须为整数当前值: {download_retry_str}")
if download_retry <= 0:
raise ValueError(f"download_retry配置项必须为正整数当前值: {download_retry}")
# 读取upload_retry配置
if not self.config.has_option('Attachment', 'upload_retry'):
upload_retry = default_upload_retry
else:
upload_retry_str = self.config.get('Attachment', 'upload_retry').strip()
try:
upload_retry = int(upload_retry_str)
except ValueError:
raise ValueError(f"upload_retry配置项必须为整数当前值: {upload_retry_str}")
if upload_retry <= 0:
raise ValueError(f"upload_retry配置项必须为正整数当前值: {upload_retry}")
return {
'temp_dir': temp_dir,
'max_file_size': max_file_size,
'download_timeout': download_timeout,
'upload_timeout': upload_timeout,
'download_retry': download_retry,
'upload_retry': upload_retry
}
def get_all_config(self):
"""
获取所有配置
@ -196,7 +302,8 @@ class ConfigManager:
'tapd': self.get_tapd_config(),
'smartsheet': self.get_smartsheet_config(),
'schedule': self.get_schedule_config(),
'wework': self.get_wework_config()
'wework': self.get_wework_config(),
'attachment': self.get_attachment_config()
}
def print_config(self):

320
src/log_viewer.py Normal file
View File

@ -0,0 +1,320 @@
"""
日志查询工具
提供交互式界面查询API调用日志
"""
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional
class LogViewer:
"""日志查询工具类"""
def __init__(self, log_dir: Optional[str] = None):
"""
初始化日志查询工具
Args:
log_dir: 日志目录路径如果为None则使用默认路径
"""
if log_dir is None:
# 默认路径:项目根目录/logs/
project_root = Path(__file__).parent.parent
self.log_dir = project_root / "logs"
else:
self.log_dir = Path(log_dir)
if not self.log_dir.exists():
raise FileNotFoundError(f"日志目录不存在: {self.log_dir}")
def list_available_dates(self) -> List[str]:
"""
列出所有可用的日志文件日期
Returns:
日期列表格式为YYYY-MM-DD按时间倒序排列
"""
log_files = list(self.log_dir.glob("api_log_*.json"))
dates = []
for log_file in log_files:
# 从文件名提取日期api_log_2025-12-25.json -> 2025-12-25
filename = log_file.stem # 去掉.json后缀
if filename.startswith("api_log_") and not "archive" in filename:
date_str = filename.replace("api_log_", "")
dates.append(date_str)
# 按日期倒序排列(最新的在前)
return sorted(dates, reverse=True)
def load_log(self, date: str) -> List[Dict[str, Any]]:
"""
加载指定日期的日志
Args:
date: 日期字符串格式为YYYY-MM-DD
Returns:
日志记录列表
"""
log_file = self.log_dir / f"api_log_{date}.json"
if not log_file.exists():
raise FileNotFoundError(f"日志文件不存在: {log_file}")
with open(log_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("records", [])
def get_operation_stats(self, records: List[Dict[str, Any]]) -> Dict[str, int]:
"""
统计每种操作类型的数量
Args:
records: 日志记录列表
Returns:
操作类型统计字典格式为 {operation: count}
"""
stats = {}
for record in records:
op = record.get('operation', 'unknown')
stats[op] = stats.get(op, 0) + 1
return stats
def filter_by_operation(self, records: List[Dict[str, Any]], operation: str) -> List[Dict[str, Any]]:
"""
按操作类型过滤记录
Args:
records: 日志记录列表
operation: 操作类型
Returns:
过滤后的记录列表
"""
return [r for r in records if r.get('operation') == operation]
def display_record(self, record: Dict[str, Any], index: int, total: int):
"""
显示单条记录的详细信息
Args:
record: 日志记录
index: 当前记录索引从1开始
total: 总记录数
"""
status = "成功 ✓" if record.get('success', True) else "失败 ✗"
timestamp = record.get('timestamp', 'unknown')
print(f"\n[{index}/{total}] {timestamp} - {status}")
print("-" * 60)
print("\n请求内容:")
print(json.dumps(record.get('request', {}), ensure_ascii=False, indent=2))
print("\n返回内容:")
print(json.dumps(record.get('response', {}), ensure_ascii=False, indent=2))
if 'error_message' in record:
print(f"\n错误信息: {record['error_message']}")
def browse_records(self, records: List[Dict[str, Any]]):
"""
交互式浏览记录从最新的开始
Args:
records: 日志记录列表
"""
if not records:
print("\n没有找到任何记录")
return
# 从最后一条开始
index = len(records) - 1
show_failed_only = False
while index >= 0:
# 如果只看失败的,跳过成功的记录
if show_failed_only and records[index].get('success', True):
index -= 1
continue
# 显示当前记录
self.display_record(records[index], index + 1, len(records))
# 等待用户输入
cmd = input("\n[Enter]上一条 [q]退出 [f]只看失败 [a]查看全部: ").strip().lower()
if cmd == 'q':
break
elif cmd == 'f':
show_failed_only = True
print("\n已切换到只看失败的记录")
elif cmd == 'a':
show_failed_only = False
print("\n已切换到查看全部记录")
index -= 1
if index < 0:
print("\n已经是第一条记录了")
def run(self):
"""主流程:日期选择 → 操作选择 → 浏览记录"""
print("\n" + "=" * 60)
print("=== TAPD 日志查询工具 ===")
print("=" * 60)
# 第一步:选择日期
selected_date = self._select_date()
if not selected_date:
return
# 第二步:加载日志
try:
print(f"\n正在加载 {selected_date} 的日志...")
records = self.load_log(selected_date)
print(f"共找到 {len(records)} 条API调用记录")
except Exception as e:
print(f"\n✗ 加载日志失败: {str(e)}")
return
if not records:
print("\n该日期没有日志记录")
return
# 第三步:选择操作类型
filtered_records = self._select_operation(records)
if filtered_records is None:
return
# 第四步:浏览记录
self.browse_records(filtered_records)
def _select_date(self) -> Optional[str]:
"""
日期选择界面
Returns:
选择的日期字符串如果取消则返回None
"""
dates = self.list_available_dates()
if not dates:
print("\n没有找到任何日志文件")
return None
print("\n请选择日期:")
# 显示最近3天的快捷选项
today = datetime.now().strftime("%Y-%m-%d")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
day_before = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d")
options = []
if today in dates:
options.append(("1", f"今天 ({today})", today))
if yesterday in dates:
options.append(("2", f"昨天 ({yesterday})", yesterday))
if day_before in dates:
options.append(("3", f"前天 ({day_before})", day_before))
# 显示选项
for opt_num, opt_label, _ in options:
print(f"{opt_num}. {opt_label}")
next_num = len(options) + 1
print(f"{next_num}. 输入指定日期 (YYYY-MM-DD)")
# 获取用户输入
choice = input(f"\n请输入选项 [1-{next_num}]: ").strip()
# 处理快捷选项
for opt_num, _, date_value in options:
if choice == opt_num:
return date_value
# 处理手动输入日期
if choice == str(next_num):
date_input = input("请输入日期 (YYYY-MM-DD): ").strip()
if date_input in dates:
return date_input
else:
print(f"\n✗ 日志文件不存在: api_log_{date_input}.json")
return None
print("\n✗ 无效的选项")
return None
def _select_operation(self, records: List[Dict[str, Any]]) -> Optional[List[Dict[str, Any]]]:
"""
操作类型选择界面
Args:
records: 日志记录列表
Returns:
过滤后的记录列表如果取消则返回None
"""
# 统计操作类型
stats = self.get_operation_stats(records)
# 按数量排序
sorted_ops = sorted(stats.items(), key=lambda x: x[1], reverse=True)
print("\n请选择要查询的操作类型:")
# 显示操作类型列表
for i, (op, count) in enumerate(sorted_ops, 1):
print(f"{i}. {op} ({count}条)")
next_num = len(sorted_ops) + 1
print(f"{next_num}. 查看所有记录")
print(f"{next_num + 1}. 返回上一级")
# 获取用户输入
choice = input(f"\n请输入选项 [1-{next_num + 1}]: ").strip()
try:
choice_num = int(choice)
if 1 <= choice_num <= len(sorted_ops):
# 选择了特定操作类型
selected_op = sorted_ops[choice_num - 1][0]
filtered = self.filter_by_operation(records, selected_op)
print(f"\n=== {selected_op} - 共{len(filtered)}条 ===")
return filtered
elif choice_num == next_num:
# 查看所有记录
print(f"\n=== 所有记录 - 共{len(records)}条 ===")
return records
elif choice_num == next_num + 1:
# 返回上一级
return None
else:
print("\n✗ 无效的选项")
return None
except ValueError:
print("\n✗ 请输入数字")
return None
if __name__ == "__main__":
try:
viewer = LogViewer()
viewer.run()
except KeyboardInterrupt:
print("\n\n程序已退出")
except Exception as e:
print(f"\n✗ 发生错误: {str(e)}")

View File

@ -167,6 +167,11 @@ def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id:
if missing_fields:
raise RuntimeError(f"子表 {sheet_title} 缺少必需字段: {', '.join(missing_fields)}")
# 检查附件字段是否存在(附件字段必须存在,但可以为空)
has_attachment_field, attachment_field_id = api.check_attachment_field(fields)
if not has_attachment_field:
raise RuntimeError(f"子表 {sheet_title} 缺少附件字段: 附件(截图、录屏、日志)")
# 3. 获取"开单状态"为空的记录
status_field_id = field_mapping['开单状态']
records = api.get_empty_status_records(sheet_id, status_field_id)
@ -181,7 +186,9 @@ def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id:
'fields': fields,
'field_mapping': field_mapping,
'sheet_id': sheet_id,
'sheet_title': sheet_title
'sheet_title': sheet_title,
'has_attachment_field': has_attachment_field,
'attachment_field_id': attachment_field_id
}
# 4. 提取记录数据并校验
@ -199,13 +206,15 @@ def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id:
if len(validation_result['invalid_records']) > 0:
print(f" - 校验失败: {len(validation_result['invalid_records'])}")
# 返回结果,包含字段信息和sheet_id
# 返回结果,包含字段信息、sheet_id和附件字段信息
return {
'validation_result': validation_result,
'fields': fields,
'field_mapping': field_mapping,
'sheet_id': sheet_id,
'sheet_title': sheet_title
'sheet_title': sheet_title,
'has_attachment_field': has_attachment_field,
'attachment_field_id': attachment_field_id
}

View File

@ -33,6 +33,71 @@ class FieldMapper:
"""
self.reporter = reporter
def _convert_multiline_text(self, value: Any) -> str:
"""
将多行文本字段值转换为字符串
智能表格的多行文本字段返回格式
[
{"text": "第一行\n", "type": "text"},
{"text": "第二行\n", "type": "text"},
{"text": "第三行", "type": "text"}
]
Args:
value: 字段值可能是字符串列表或字典数组
Returns:
str: 拼接后的完整文本
"""
if isinstance(value, str):
# 如果已经是字符串,直接返回
return value.strip()
elif isinstance(value, list):
# 如果是列表,检查是否是多行文本格式
if value and isinstance(value[0], dict) and 'text' in value[0]:
# 多行文本格式提取所有text字段并拼接
text_parts = [item.get('text', '') for item in value if isinstance(item, dict)]
return ''.join(text_parts).strip()
else:
# 普通列表,转换为字符串
return str(value).strip()
else:
# 其他类型转换为字符串
return str(value).strip() if value else ''
def _convert_text_to_html(self, text: str) -> str:
"""
将纯文本转换为HTML格式
处理
1. 换行符转换为<br/>
2. 自动识别URL并转换为链接
Args:
text: 纯文本
Returns:
str: HTML格式的文本
"""
if not text:
return ""
import re
# 1. HTML转义特殊字符除了即将处理的URL
# 暂时不转义因为TAPD可能支持HTML
# 2. 识别URL并转换为链接
# 匹配http://或https://开头的URL
url_pattern = r'(https?://[^\s<>"]+)'
text = re.sub(url_pattern, r'<a href="\1">\1</a>', text)
# 3. 将换行符转换为<br/>
text = text.replace('\n', '<br/>')
return text
def _convert_multiselect_to_string(self, value: Any) -> str:
"""
将多选字段值转换为分号分隔的字符串
@ -54,6 +119,73 @@ class FieldMapper:
# 其他类型转换为字符串
return str(value).strip() if value else ''
def _format_attachment_info(self, attachment_value: Any) -> str:
"""
格式化附件信息为HTML格式
Args:
attachment_value: 附件字段值
Returns:
str: HTML格式的附件信息
"""
if not attachment_value:
return ""
attachment_html = "<br/><br/><hr/><strong>【附件】</strong><br/>"
# 处理单个附件(字典)
if isinstance(attachment_value, dict):
name = attachment_value.get('name', '未知文件')
url = attachment_value.get('file_url', '')
size = attachment_value.get('size', 0)
file_ext = attachment_value.get('file_ext', '')
# 格式化文件大小
if size > 1024 * 1024:
size_str = f"{size / 1024 / 1024:.2f}MB"
elif size > 1024:
size_str = f"{size / 1024:.2f}KB"
else:
size_str = f"{size}B"
attachment_html += f"{name}"
if file_ext:
attachment_html += f" (.{file_ext})"
if size > 0:
attachment_html += f" [{size_str}]"
if url:
attachment_html += f'<br/>&nbsp;&nbsp;链接: <a href="{url}">{url}</a>'
attachment_html += "<br/>"
# 处理多个附件(列表)
elif isinstance(attachment_value, list):
for idx, item in enumerate(attachment_value, 1):
if isinstance(item, dict):
name = item.get('name', f'附件{idx}')
url = item.get('file_url', '')
size = item.get('size', 0)
file_ext = item.get('file_ext', '')
# 格式化文件大小
if size > 1024 * 1024:
size_str = f"{size / 1024 / 1024:.2f}MB"
elif size > 1024:
size_str = f"{size / 1024:.2f}KB"
else:
size_str = f"{size}B"
attachment_html += f"{name}"
if file_ext:
attachment_html += f" (.{file_ext})"
if size > 0:
attachment_html += f" [{size_str}]"
if url:
attachment_html += f'<br/>&nbsp;&nbsp;链接: <a href="{url}">{url}</a>'
attachment_html += "<br/>"
return attachment_html
def map_record_to_tapd(self, record_data: Dict[str, Any]) -> Dict[str, Any]:
"""
将智能表格记录数据映射为TAPD API参数
@ -76,9 +208,19 @@ class FieldMapper:
tapd_data['title'] = title
# 2. 详细描述(必填)
description = record_data.get('详细描述', '').strip()
description = self._convert_multiline_text(record_data.get('详细描述', ''))
if not description:
raise ValueError("详细描述不能为空")
# 转换为HTML格式处理换行和URL
description = self._convert_text_to_html(description)
# 附加附件信息到详细描述
attachment_value = record_data.get('附件(截图、录屏、日志)')
if attachment_value:
attachment_info = self._format_attachment_info(attachment_value)
description += attachment_info
tapd_data['description'] = description
# 3. 优先级(必填)

View File

@ -193,6 +193,31 @@ class SmartSheetAPI:
print(f" ✓ 构建字段映射完成,共 {len(mapping)} 个字段")
return mapping
def check_attachment_field(self, fields: List[Dict]) -> tuple:
"""
检查是否存在附件字段
Args:
fields: 字段列表
Returns:
tuple: (是否存在, field_id或None)
"""
attachment_field_name = "附件(截图、录屏、日志)"
for field in fields:
field_title = field.get('field_title', '')
if field_title == attachment_field_name:
field_id = field.get('field_id', '')
field_type = field.get('field_type', '')
print(f" ✓ 找到附件字段: {attachment_field_name}")
print(f" 字段ID: {field_id}")
print(f" 字段类型: {field_type}")
return (True, field_id)
print(f" ⚠ 未找到附件字段: {attachment_field_name}")
return (False, None)
def get_records(self, sheet_id: str, filter_spec: Optional[Dict] = None,
limit: int = 100, offset: int = 0) -> Dict:
"""
@ -404,7 +429,13 @@ class SmartSheetAPI:
# 3. 纯文本字段(只包含 text没有 id
elif 'text' in first_value:
return first_value.get('text', '')
# 如果有多个值,说明是多行文本,返回完整的数组供后续处理
if len(field_value_list) > 1:
# 多行文本:返回完整的数组结构
return field_value_list
else:
# 单行文本返回text值
return first_value.get('text', '')
# 4. 数字字段
elif 'number' in first_value:

View File

@ -5,7 +5,7 @@ TAPD API调用模块
import os
import requests
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, Tuple, List
from requests.auth import HTTPBasicAuth
from src.api_logger import get_logger
@ -311,3 +311,232 @@ class TAPDApi:
"""
# TAPD bug URL格式
return f"https://www.tapd.cn/{self.workspace_id}/bugtrace/bugs/view/{bug_id}"
def _check_file_size_before_upload(self, file_path: str, max_size: int) -> Tuple[bool, str]:
"""
上传前检查文件大小
Args:
file_path: 文件路径
max_size: 最大文件大小字节
Returns:
Tuple[bool, str]: (是否通过检查, 错误信息)
"""
try:
import os
file_size = os.path.getsize(file_path)
if file_size > max_size:
error_msg = f"文件大小超过限制: {file_size / 1024 / 1024:.2f}MB > {max_size / 1024 / 1024:.0f}MB"
return (False, error_msg)
return (True, "")
except Exception as e:
return (False, f"无法获取文件大小: {e}")
def upload_attachment(self, file_path: str, bug_id: str, max_size: int = 150 * 1024 * 1024,
upload_timeout: int = 120, max_retries: int = 3) -> Dict:
"""
上传单个附件到TAPD bug单
Args:
file_path: 文件路径
bug_id: bug ID
max_size: 最大文件大小字节默认150MB
upload_timeout: 上传超时时间默认120秒
max_retries: 最大重试次数默认3次
Returns:
Dict: 上传结果包含successattachment_iderror_message
Raises:
RuntimeError: 上传失败时抛出
"""
import os
import time
filename = os.path.basename(file_path)
print(f" → 上传附件: {filename}")
# 检查文件是否存在
if not os.path.exists(file_path):
error_msg = f"文件不存在: {file_path}"
print(f"{error_msg}")
return {'success': False, 'error_message': error_msg}
# 检查文件大小
size_ok, error_msg = self._check_file_size_before_upload(file_path, max_size)
if not size_ok:
print(f"{error_msg}")
return {'success': False, 'error_message': error_msg}
# 准备上传参数
url = f"{self.BASE_URL}/files/upload_attachment"
data = {
'workspace_id': self.workspace_id,
'type': 'bug',
'entry_id': bug_id
}
# 准备日志记录的请求数据
log_request_data = {
"url": url,
"method": "POST",
"data": data,
"filename": filename,
"file_size": os.path.getsize(file_path),
"auth_user": self.api_user
}
# 重试上传
for attempt in range(1, max_retries + 1):
try:
if attempt > 1:
print(f" → 重试上传 ({attempt}/{max_retries})")
time.sleep(attempt * 2)
# 打开文件并上传
with open(file_path, 'rb') as f:
files = {'file': (filename, f, 'application/octet-stream')}
response = self.session.post(
url,
data=data,
files=files,
auth=self.auth,
timeout=upload_timeout
)
response.raise_for_status()
result = response.json()
# 检查TAPD API返回的状态
if result.get('status') != 1:
error_msg = result.get('info', '未知错误')
if attempt == max_retries:
# 记录API调用日志失败
self.logger.log_api_call(
api_type="tapd",
operation="upload_attachment",
request_data=log_request_data,
response_data=result,
success=False,
error_message=error_msg
)
print(f" ✗ 上传失败: {error_msg}")
return {'success': False, 'error_message': error_msg}
continue
# 上传成功提取附件ID
data_field = result.get('data', {})
attachment_info = data_field.get('Attachment', {})
attachment_id = attachment_info.get('id')
# 记录API调用日志成功
self.logger.log_api_call(
api_type="tapd",
operation="upload_attachment",
request_data=log_request_data,
response_data=result,
success=True
)
print(f" ✓ 上传成功 (附件ID: {attachment_id})")
return {
'success': True,
'attachment_id': attachment_id,
'error_message': None
}
except requests.exceptions.Timeout:
error_msg = f"上传超时 (>{upload_timeout}秒)"
if attempt == max_retries:
self.logger.log_api_call(
api_type="tapd",
operation="upload_attachment",
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
print(f"{error_msg}")
return {'success': False, 'error_message': error_msg}
except requests.exceptions.RequestException as e:
error_msg = f"上传失败: {e}"
if attempt == max_retries:
self.logger.log_api_call(
api_type="tapd",
operation="upload_attachment",
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
print(f"{error_msg}")
return {'success': False, 'error_message': error_msg}
except Exception as e:
error_msg = f"未预期的错误: {e}"
if attempt == max_retries:
self.logger.log_api_call(
api_type="tapd",
operation="upload_attachment",
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
print(f"{error_msg}")
return {'success': False, 'error_message': error_msg}
return {'success': False, 'error_message': "上传失败(已达最大重试次数)"}
def upload_attachments_batch(self, file_paths: List[str], bug_id: str, max_size: int = 150 * 1024 * 1024,
upload_timeout: int = 120, max_retries: int = 3) -> Dict:
"""
批量上传附件到TAPD bug单
Args:
file_paths: 文件路径列表
bug_id: bug ID
max_size: 最大文件大小字节默认150MB
upload_timeout: 上传超时时间默认120秒
max_retries: 最大重试次数默认3次
Returns:
Dict: 上传结果包含successsuccess_filesfailed_files
"""
print(f"\n → 开始批量上传附件 (共 {len(file_paths)} 个)")
success_files = []
failed_files = []
for idx, file_path in enumerate(file_paths, 1):
print(f"\n [{idx}/{len(file_paths)}]")
result = self.upload_attachment(file_path, bug_id, max_size, upload_timeout, max_retries)
if result['success']:
success_files.append({
'file_path': file_path,
'attachment_id': result['attachment_id']
})
else:
failed_files.append({
'file_path': file_path,
'error': result['error_message']
})
# 返回结果
if len(failed_files) > 0:
print(f"\n ⚠ 批量上传完成: 成功 {len(success_files)} 个, 失败 {len(failed_files)}")
return {
'success': False,
'success_files': success_files,
'failed_files': failed_files,
'error_message': f"部分附件上传失败 ({len(failed_files)}/{len(file_paths)})"
}
else:
print(f"\n ✓ 所有附件上传成功 ({len(success_files)} 个)")
return {
'success': True,
'success_files': success_files,
'failed_files': [],
'error_message': None
}

View File

@ -11,6 +11,7 @@ import time
import requests
from pathlib import Path
from typing import Optional, Dict
from src.api_logger import get_logger
class TokenManager:
@ -44,6 +45,9 @@ class TokenManager:
self.corpid = os.environ.get('CORPID')
self.corpsecret = os.environ.get('CORPSECRET')
# 初始化日志记录器
self.logger = get_logger()
def _validate_env_config(self):
"""
验证环境变量配置是否完整
@ -85,28 +89,86 @@ class TokenManager:
response.raise_for_status()
result = response.json()
# 准备日志记录数据
log_request_data = {
"url": self.TOKEN_API_URL,
"method": "GET",
"params": {"corpid": self.corpid, "corpsecret": "***"}
}
# 检查返回的错误码
errcode = result.get('errcode', 0)
if errcode != 0:
errmsg = result.get('errmsg', '未知错误')
# 记录API调用日志失败
self.logger.log_api_call(
api_type="wework",
operation="wework/get_token",
request_data=log_request_data,
response_data=result,
success=False,
error_message=errmsg
)
raise RuntimeError(f"获取access_token失败: errcode={errcode}, errmsg={errmsg}")
access_token = result.get('access_token')
expires_in = result.get('expires_in', self.TOKEN_EXPIRES_IN)
if not access_token:
# 记录API调用日志失败
self.logger.log_api_call(
api_type="wework",
operation="wework/get_token",
request_data=log_request_data,
response_data=result,
success=False,
error_message="API返回的数据中未找到access_token"
)
raise RuntimeError("API返回的数据中未找到access_token")
print(f" ✓ 成功获取access_token (有效期: {expires_in}秒)")
# 记录API调用日志成功
log_response_data = {
"errcode": result.get('errcode', 0),
"errmsg": result.get('errmsg', 'ok'),
"access_token": access_token[:20] + "...", # 只记录前20个字符
"expires_in": expires_in
}
self.logger.log_api_call(
api_type="wework",
operation="wework/get_token",
request_data=log_request_data,
response_data=log_response_data,
success=True
)
return {
'access_token': access_token,
'expires_in': expires_in
}
except requests.exceptions.Timeout:
# 记录API调用日志超时
self.logger.log_api_call(
api_type="wework",
operation="wework/get_token",
request_data={"url": self.TOKEN_API_URL, "method": "GET"},
response_data={},
success=False,
error_message="请求超时"
)
raise RuntimeError("获取access_token超时请检查网络连接")
except requests.exceptions.RequestException as e:
# 记录API调用日志异常
self.logger.log_api_call(
api_type="wework",
operation="wework/get_token",
request_data={"url": self.TOKEN_API_URL, "method": "GET"},
response_data={},
success=False,
error_message=str(e)
)
raise RuntimeError(f"获取access_token失败: {e}")
def _load_cache(self) -> Optional[Dict]:

261
src/upload_attachment.py Normal file
View File

@ -0,0 +1,261 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
TAPD附件上传脚本
用于上传文件到TAPD系统
"""
import requests
import os
import sys
import time
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager
# 禁用SSL警告如果有证书问题
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def upload_attachment(file_path, max_retries=3):
"""
上传附件到TAPD带重试机制
Args:
file_path: 要上传的文件路径
max_retries: 最大重试次数
Returns:
dict: 上传成功返回附件信息失败返回None
"""
print("\n=== TAPD附件上传 ===")
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"✗ 文件不存在: {file_path}")
return None
# 检查文件大小限制150MB
file_size = os.path.getsize(file_path)
max_size = 150 * 1024 * 1024 # 150MB
if file_size > max_size:
print(f"✗ 文件大小超过限制: {file_size / 1024 / 1024:.2f}MB (最大150MB)")
return None
print(f"✓ 文件路径: {file_path}")
print(f"✓ 文件大小: {file_size / 1024:.2f}KB")
# 从环境变量读取认证信息
api_user = os.environ.get('TAPD_API_USER')
api_password = os.environ.get('TAPD_API_PASSWORD')
if not api_user or not api_password:
print("✗ TAPD认证信息未设置")
print(" 请设置环境变量:")
print(" - TAPD_API_USER")
print(" - TAPD_API_PASSWORD")
return None
print(f"✓ TAPD认证信息已加载 (用户: {api_user})")
# 从配置文件读取workspace_id
try:
config_manager = ConfigManager()
tapd_config = config_manager.get_tapd_config()
workspace_id = tapd_config['workspace_id']
print(f"✓ 从配置文件读取workspace_id: {workspace_id}")
except Exception as e:
print(f"✗ 读取workspace_id失败: {e}")
return None
# 构造请求
url = "https://tapd-api.bilibili.co/tapd/upload_attachment"
# 必选参数
data = {
'workspace_id': workspace_id,
'type': 'bug_description', # 固定为bug_description
'custom_field': 'attachment' # 字段英文名
}
# 重试上传
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(api_user, api_password)
for attempt in range(1, max_retries + 1):
try:
print(f"\n{'='*60}")
print(f"{'[重试 ' + str(attempt) + '/' + str(max_retries) + '] ' if attempt > 1 else ''}开始上传文件")
print(f"{'='*60}")
# 显示请求详情
print(f"\n【请求信息】")
print(f" URL: {url}")
print(f" 方法: POST")
print(f" 认证: Basic Auth (用户: {api_user})")
print(f"\n【请求参数】")
print(f" workspace_id: {workspace_id}")
print(f" type: {data['type']}")
print(f" custom_field: {data['custom_field']}")
print(f"\n【上传文件】")
print(f" 文件名: {os.path.basename(file_path)}")
print(f" 文件大小: {file_size / 1024:.2f}KB")
print(f" Content-Type: application/octet-stream")
# 每次重试都重新打开文件
with open(file_path, 'rb') as f:
files = {
'file': (os.path.basename(file_path), f, 'application/octet-stream')
}
# 发送请求,增加超时时间和禁用代理
print(f"\n正在发送请求...")
response = requests.post(
url,
data=data,
files=files,
auth=auth,
timeout=120, # 增加超时时间到2分钟
verify=False, # 如果有SSL证书问题可以禁用验证
proxies={'http': None, 'https': None} # 禁用代理
)
# 显示响应信息
print(f"\n{'='*60}")
print(f"【响应信息】")
print(f"{'='*60}")
print(f" HTTP状态码: {response.status_code}")
print(f" 响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
print(f"\n【响应内容】")
print(f" 原始响应文本 (前500字符):")
print(f" {response.text[:500]}")
if len(response.text) > 500:
print(f" ... (总长度: {len(response.text)} 字符)")
# 尝试解析JSON
try:
response_data = response.json()
print(f"\n JSON解析成功!")
print(f" 响应数据类型: {type(response_data)}")
print(f" 响应数据结构:")
import json
print(f" {json.dumps(response_data, indent=2, ensure_ascii=False)}")
except Exception as e:
print(f"\n ✗ JSON解析失败: {e}")
print(f" 完整响应文本:")
print(f" {response.text}")
if attempt < max_retries:
print(f"\n 等待 {attempt * 2} 秒后重试...")
time.sleep(attempt * 2)
continue
return None
# 检查返回结果
if response_data.get("status") == 1:
# 获取data字段
data_field = response_data.get("data", {})
# 检查data是否为字典
if isinstance(data_field, dict):
attachment_data = data_field.get("Attachment", {})
elif isinstance(data_field, str):
# 如果data是字符串可能需要再次解析
print(f"[调试] data字段是字符串尝试解析...")
import json
try:
attachment_data = json.loads(data_field).get("Attachment", {})
except:
attachment_data = {}
else:
attachment_data = {}
print(f"\n✓ 文件上传成功!")
print(f" 附件ID: {attachment_data.get('id')}")
print(f" 文件名: {attachment_data.get('filename')}")
print(f" 类型: {attachment_data.get('type')}")
print(f" 内容类型: {attachment_data.get('content_type')}")
print(f" 创建时间: {attachment_data.get('created')}")
print(f" 工作项ID: {attachment_data.get('entry_id', '(未关联)')}")
return attachment_data
else:
print(f"\n✗ 上传失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
if attempt < max_retries:
print(f" 等待 {attempt * 2} 秒后重试...")
time.sleep(attempt * 2)
continue
return None
except requests.exceptions.ConnectionError as e:
print(f"\n✗ 连接错误: {str(e)}")
if attempt < max_retries:
wait_time = attempt * 3
print(f" 等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
else:
print(f" 已达到最大重试次数 ({max_retries}),上传失败")
return None
except requests.exceptions.Timeout as e:
print(f"\n✗ 请求超时: {str(e)}")
if attempt < max_retries:
wait_time = attempt * 3
print(f" 等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
else:
print(f" 已达到最大重试次数 ({max_retries}),上传失败")
return None
except Exception as e:
print(f"\n✗ 上传异常: {str(e)}")
if attempt < max_retries:
wait_time = attempt * 2
print(f" 等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
else:
print(f" 已达到最大重试次数 ({max_retries}),上传失败")
import traceback
traceback.print_exc()
return None
return None
def main():
"""主函数"""
# 要上传的文件路径根目录下的test.png
project_root = Path(__file__).parent.parent
file_path = project_root / "test.png"
print("=" * 60)
print("TAPD附件上传脚本")
print("=" * 60)
# 执行上传
result = upload_attachment(str(file_path))
if result:
print("\n" + "=" * 60)
print("上传完成!")
print("=" * 60)
else:
print("\n" + "=" * 60)
print("上传失败,请检查错误信息")
print("=" * 60)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -8,6 +8,7 @@
import requests
from typing import List, Dict
from datetime import datetime
from src.api_logger import get_logger
class WeWorkNotifier:
@ -26,6 +27,7 @@ class WeWorkNotifier:
self.agentid = agentid
self.receivers = receivers
self.base_url = "https://qyapi.weixin.qq.com/cgi-bin"
self.logger = get_logger()
def send_validation_failure_notification(self, invalid_records: List[Dict]) -> bool:
"""
@ -137,16 +139,54 @@ class WeWorkNotifier:
response = requests.post(url, params=params, json=data, timeout=10)
response_data = response.json()
# 准备日志记录数据
log_request_data = {
"url": url,
"method": "POST",
"data": {
"touser": self.receivers,
"msgtype": "text",
"agentid": int(self.agentid),
"text": {"content": content[:100] + "..." if len(content) > 100 else content}
}
}
# 检查返回结果
if response_data.get("errcode") == 0:
print(f" ✓ 企业微信消息发送成功")
# 记录API调用日志成功
self.logger.log_api_call(
api_type="wework",
operation="wework/send_message",
request_data=log_request_data,
response_data=response_data,
success=True
)
return True
else:
print(f" ✗ 企业微信消息发送失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
# 记录API调用日志失败
self.logger.log_api_call(
api_type="wework",
operation="wework/send_message",
request_data=log_request_data,
response_data=response_data,
success=False,
error_message=response_data.get('errmsg', '未知错误')
)
return False
except Exception as e:
print(f" ✗ 企业微信消息发送异常: {str(e)}")
# 记录API调用日志异常
self.logger.log_api_call(
api_type="wework",
operation="wework/send_message",
request_data={"url": url, "method": "POST"},
response_data={},
success=False,
error_message=str(e)
)
return False

BIN
test.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 672 KiB

147
test_attachment.py Normal file
View File

@ -0,0 +1,147 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
附件功能测试脚本
用于测试附件下载和上传功能
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager
from src.token_manager import TokenManager
from src.attachment_handler import AttachmentHandler
from src.tapd_api import TAPDApi
def test_attachment_handler():
"""测试附件处理器"""
print("\n" + "=" * 60)
print("测试1附件处理器初始化")
print("=" * 60)
try:
# 加载配置
config_manager = ConfigManager()
attachment_config = config_manager.get_attachment_config()
print(f"✓ 附件配置加载成功:")
print(f" 临时目录: {attachment_config['temp_dir']}")
print(f" 最大文件大小: {attachment_config['max_file_size']}MB")
print(f" 下载超时: {attachment_config['download_timeout']}")
print(f" 上传超时: {attachment_config['upload_timeout']}")
print(f" 下载重试次数: {attachment_config['download_retry']}")
print(f" 上传重试次数: {attachment_config['upload_retry']}")
# 获取access_token
token_manager = TokenManager()
access_token = token_manager.get_token()
print(f"\n✓ access_token获取成功")
# 初始化附件处理器
handler = AttachmentHandler(access_token, attachment_config)
print(f"\n✓ 附件处理器初始化成功")
return True
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_tapd_upload():
"""测试TAPD附件上传功能"""
print("\n" + "=" * 60)
print("测试2TAPD附件上传API")
print("=" * 60)
try:
# 加载配置
config_manager = ConfigManager()
tapd_config = config_manager.get_tapd_config()
attachment_config = config_manager.get_attachment_config()
# 初始化TAPD API
tapd_api = TAPDApi(tapd_config['workspace_id'], test_mode=False)
print(f"✓ TAPD API初始化成功")
# 检查是否有测试文件
test_file = Path("test.png")
if not test_file.exists():
print(f"\n⚠ 测试文件不存在: {test_file}")
print(f" 请在项目根目录放置一个test.png文件用于测试")
return False
print(f"\n✓ 找到测试文件: {test_file}")
print(f" 文件大小: {test_file.stat().st_size / 1024:.2f}KB")
# 提示用户输入bug_id
print(f"\n请输入一个已存在的bug ID用于测试附件上传:")
print(f"(如果不想测试上传,直接按回车跳过)")
bug_id = input("Bug ID: ").strip()
if not bug_id:
print(f"\n⚠ 跳过上传测试")
return True
# 测试上传
print(f"\n开始测试上传附件到bug {bug_id}...")
result = tapd_api.upload_attachment(
str(test_file),
bug_id,
max_size=attachment_config['max_file_size'] * 1024 * 1024,
upload_timeout=attachment_config['upload_timeout'],
max_retries=attachment_config['upload_retry']
)
if result['success']:
print(f"\n✓ 上传测试成功!")
print(f" 附件ID: {result['attachment_id']}")
return True
else:
print(f"\n✗ 上传测试失败: {result['error_message']}")
return False
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
print("=" * 60)
print("附件功能测试")
print("=" * 60)
# 测试1附件处理器
test1_passed = test_attachment_handler()
# 测试2TAPD上传
test2_passed = test_tapd_upload()
# 显示测试结果
print("\n" + "=" * 60)
print("测试结果汇总")
print("=" * 60)
print(f"测试1 - 附件处理器初始化: {'✓ 通过' if test1_passed else '✗ 失败'}")
print(f"测试2 - TAPD附件上传API: {'✓ 通过' if test2_passed else '✗ 失败'}")
print("=" * 60)
if test1_passed and test2_passed:
print("\n✓ 所有测试通过!")
return 0
else:
print("\n✗ 部分测试失败,请检查错误信息")
return 1
if __name__ == "__main__":
sys.exit(main())

29
test_full_flow.py Normal file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
完整流程测试脚本
测试从智能表格读取下载附件创建bug上传附件回写结果的完整流程
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from src.main import main
if __name__ == "__main__":
print("=" * 80)
print("完整流程测试")
print("=" * 80)
print("\n提示:这将运行一次完整的开单流程")
print("请确保:")
print("1. 智能表格中有待开单的记录")
print("2. 记录包含附件字段")
print("3. 环境变量已正确设置")
print("\n按回车继续或Ctrl+C取消...")
input()
sys.exit(main())