G41_TAPD_BUG_SYNC/src/api_test.py

1462 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
企业微信智能表格API测试工具
用于测试和熟悉企业微信文档API接口
"""
import requests
import json
import os
import sys
from datetime import datetime
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager
class WeWorkAPITester:
"""企业微信API测试类"""
def __init__(self, auto_load_token=True):
"""
初始化企业微信API测试类
Args:
auto_load_token: 是否自动加载token默认为True
"""
self.access_token = None
self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json')
self.base_url = "https://qyapi.weixin.qq.com/cgi-bin"
# 确保日志文件存在
self._init_log_file()
# 自动加载token
if auto_load_token:
self._auto_load_token()
def _init_log_file(self):
"""初始化日志文件"""
if not os.path.exists(self.log_file):
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump({"records": []}, f, ensure_ascii=False, indent=2)
def _auto_load_token(self):
"""
自动加载access_token
优先从缓存读取如果缓存不存在或已过期则尝试从API获取新token
"""
print("\n=== 自动加载access_token ===")
# 先尝试从缓存读取
if self._load_token_from_cache_silent():
return
# 缓存无效尝试从API获取
print(" 尝试从API获取新token...")
try:
from src.token_manager import TokenManager
token_manager = TokenManager()
self.access_token = token_manager.get_token()
print(f" ✓ 成功获取access_token")
print(f" Token: {self.access_token[:20]}...")
except ValueError as e:
print(f" ⚠ 环境变量未配置: {e}")
print(" 请手动选择菜单选项1获取token")
except Exception as e:
print(f" ⚠ 自动获取token失败: {e}")
print(" 请手动选择菜单选项1获取token")
def _load_token_from_cache_silent(self):
"""
静默从缓存读取token不打印标题
Returns:
bool: 是否成功读取有效token
"""
cache_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'config',
'token_cache.json'
)
try:
if not os.path.exists(cache_file):
print(" 缓存文件不存在")
return False
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
access_token = cache_data.get('access_token')
fetch_time = cache_data.get('fetch_time')
if not access_token:
print(" 缓存文件中没有access_token")
return False
# 检查token是否过期7200秒 = 2小时提前5分钟刷新
import time
current_time = time.time()
elapsed_time = current_time - fetch_time
remaining_time = 7200 - elapsed_time
if remaining_time <= 300: # 剩余不足5分钟视为过期
print(f" 缓存的token已过期或即将过期")
return False
# token有效
self.access_token = access_token
print(f" ✓ 从缓存读取access_token成功")
print(f" Token: {self.access_token[:20]}...")
print(f" 剩余有效期: {int(remaining_time)}秒 ({int(remaining_time//60)}分钟)")
return True
except Exception as e:
print(f" 读取缓存失败: {e}")
return False
def _log_api_call(self, operation, request_data, response_data):
"""记录API调用到JSON文件"""
try:
# 读取现有记录
with open(self.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 添加新记录
record = {
"operation": operation,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"request": request_data,
"response": response_data
}
log_data["records"].append(record)
# 写回文件
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
print(f"✓ API调用已记录到日志文件")
except Exception as e:
print(f"✗ 记录日志失败: {str(e)}")
def load_token_from_cache(self):
"""
从缓存文件读取access_token
Returns:
bool: 是否成功读取token
"""
print("\n=== 从缓存读取access_token ===")
# token缓存文件路径
cache_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'config',
'token_cache.json'
)
try:
# 检查文件是否存在
if not os.path.exists(cache_file):
print(f"✗ 缓存文件不存在: {cache_file}")
return False
# 读取缓存文件
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
access_token = cache_data.get('access_token')
fetch_time = cache_data.get('fetch_time')
if not access_token:
print("✗ 缓存文件中没有access_token")
return False
# 检查token是否过期7200秒 = 2小时
import time
current_time = time.time()
elapsed_time = current_time - fetch_time
remaining_time = 7200 - elapsed_time
if remaining_time <= 0:
print("✗ 缓存的token已过期")
print(f" 获取时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fetch_time))}")
print(f" 已过期 {int(-remaining_time)}")
return False
# token有效加载到内存
self.access_token = access_token
print(f"✓ 成功从缓存读取access_token")
print(f" Token: {self.access_token[:20]}...")
print(f" 获取时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fetch_time))}")
print(f" 剩余有效期: {int(remaining_time)}秒 ({int(remaining_time//60)}分钟)")
return True
except json.JSONDecodeError:
print(f"✗ 缓存文件格式错误")
return False
except Exception as e:
print(f"✗ 读取缓存失败: {str(e)}")
return False
def get_access_token(self, corpid, corpsecret):
"""
获取access_token
Args:
corpid: 企业ID
corpsecret: 应用的凭证密钥
Returns:
bool: 是否成功获取token
"""
print("\n=== 获取access_token ===")
url = f"{self.base_url}/gettoken"
params = {
"corpid": corpid,
"corpsecret": corpsecret
}
try:
response = requests.get(url, params=params, timeout=10)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"params": {
"corpid": corpid,
"corpsecret": "***" # 隐藏敏感信息
}
}
self._log_api_call("get_access_token", request_data, response_data)
# 检查返回结果
if response_data.get("errcode") == 0:
self.access_token = response_data.get("access_token")
expires_in = response_data.get("expires_in")
print(f"✓ 成功获取access_token")
print(f" Token: {self.access_token[:20]}...")
print(f" 有效期: {expires_in}秒 ({expires_in//60}分钟)")
return True
else:
print(f"✗ 获取失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
return False
except Exception as e:
print(f"✗ 请求异常: {str(e)}")
return False
def create_doc(self, doc_name, doc_type=10, spaceid=None, fatherid=None):
"""
新建文档的方法
该方法用于在系统中创建新的文档,支持多种文档类型,并可以指定文档所在的空间和父目录。
通过调用企业微信文档的API接口实现文档创建功能。
Args:
doc_name: 文档名称
doc_type: 文档类型 (3:文档 4:表格 10:智能表格)
spaceid: 空间ID (可选)
fatherid: 父目录ID (可选)
Returns:
dict: 包含docid和url的字典失败返回None
"""
print("\n=== 新建文档 ===")
if not self.access_token:
print("✗ 请先获取access_token")
return None
url = f"{self.base_url}/wedoc/create_doc"
params = {"access_token": self.access_token}
# 构造请求体
data = {
"doc_type": doc_type,
"doc_name": doc_name
}
if spaceid:
data["spaceid"] = spaceid
if fatherid:
data["fatherid"] = fatherid
try:
response = requests.post(url, params=params, json=data, timeout=10)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"params": {"access_token": "***"},
"body": data
}
self._log_api_call("create_doc", request_data, response_data)
# 检查返回结果
if response_data.get("errcode") == 0:
docid = response_data.get("docid")
doc_url = response_data.get("url")
print(f"✓ 文档创建成功")
print(f" 文档名称: {doc_name}")
print(f" 文档ID: {docid}")
print(f" 访问链接: {doc_url}")
return {"docid": docid, "url": doc_url}
else:
print(f"✗ 创建失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
return None
except Exception as e:
print(f"✗ 请求异常: {str(e)}")
return None
def rename_doc(self, docid, new_name):
"""
重命名文档
Args:
docid: 文档ID
new_name: 新的文档名称
Returns:
bool: 是否成功
"""
print("\n=== 重命名文档 ===")
if not self.access_token:
print("✗ 请先获取access_token")
return False
url = f"{self.base_url}/wedoc/rename_doc"
params = {"access_token": self.access_token}
data = {
"docid": docid,
"new_name": new_name
}
try:
response = requests.post(url, params=params, json=data, timeout=10)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"params": {"access_token": "***"},
"body": data
}
self._log_api_call("rename_doc", request_data, response_data)
# 检查返回结果
if response_data.get("errcode") == 0:
print(f"✓ 重命名成功")
print(f" 文档ID: {docid}")
print(f" 新名称: {new_name}")
return True
else:
print(f"✗ 重命名失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
return False
except Exception as e:
print(f"✗ 请求异常: {str(e)}")
return False
def delete_doc(self, docid):
"""
删除文档
Args:
docid: 文档ID
Returns:
bool: 是否成功
"""
print("\n=== 删除文档 ===")
if not self.access_token:
print("✗ 请先获取access_token")
return False
url = f"{self.base_url}/wedoc/del_doc"
params = {"access_token": self.access_token}
data = {"docid": docid}
try:
response = requests.post(url, params=params, json=data, timeout=10)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"params": {"access_token": "***"},
"body": data
}
self._log_api_call("delete_doc", request_data, response_data)
# 检查返回结果
if response_data.get("errcode") == 0:
print(f"✓ 删除成功")
print(f" 文档ID: {docid}")
return True
else:
print(f"✗ 删除失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
return False
except Exception as e:
print(f"✗ 请求异常: {str(e)}")
return False
def send_message(self, content):
"""
发送应用消息
Args:
content: 消息内容
Returns:
bool: 是否成功
"""
print("\n=== 发送应用消息 ===")
if not self.access_token:
print("✗ 请先获取access_token")
return False
# 从配置文件读取agentid和receivers
try:
config_manager = ConfigManager()
config = config_manager.config
if not config.has_section('wework'):
print("✗ 配置文件缺少[wework]节")
return False
if not config.has_option('wework', 'agentid'):
print("✗ 配置文件[wework]节缺少agentid配置项")
return False
if not config.has_option('wework', 'receivers'):
print("✗ 配置文件[wework]节缺少receivers配置项")
return False
agentid = config.get('wework', 'agentid').strip()
receivers = config.get('wework', 'receivers').strip()
print(f"✓ 从配置文件读取agentid: {agentid}")
print(f"✓ 从配置文件读取receivers: {receivers}")
except Exception as e:
print(f"✗ 读取配置失败: {e}")
return False
url = f"{self.base_url}/message/send"
params = {"access_token": self.access_token}
# 构造请求体使用配置文件中的receivers
data = {
"touser": receivers,
"msgtype": "text",
"agentid": int(agentid),
"text": {
"content": content
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
try:
response = requests.post(url, params=params, json=data, timeout=10)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"params": {"access_token": "***"},
"body": data
}
self._log_api_call("send_message", request_data, response_data)
# 检查返回结果
if response_data.get("errcode") == 0:
print(f"✓ 消息发送成功")
print(f" 消息内容: {content}")
return True
else:
print(f"✗ 发送失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
return False
except Exception as e:
print(f"✗ 请求异常: {str(e)}")
return False
class TAPDAPITester:
"""TAPD API测试类"""
def __init__(self):
self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json')
self.base_url = "https://tapd-api.bilibili.co/tapd"
self.api_user = None
self.api_password = None
self.workspace_id = None
def _init_auth(self):
"""初始化TAPD认证信息"""
# 从环境变量读取认证信息
self.api_user = os.environ.get('TAPD_API_USER')
self.api_password = os.environ.get('TAPD_API_PASSWORD')
if not self.api_user or not self.api_password:
print("✗ TAPD认证信息未设置")
print(" 请设置环境变量:")
print(" - TAPD_API_USER")
print(" - TAPD_API_PASSWORD")
return False
print(f"✓ TAPD认证信息已加载 (用户: {self.api_user})")
return True
def _init_workspace_id(self):
"""从配置文件读取workspace_id"""
try:
config_manager = ConfigManager()
tapd_config = config_manager.get_tapd_config()
self.workspace_id = tapd_config['workspace_id']
print(f"✓ 从配置文件读取workspace_id: {self.workspace_id}")
return True
except Exception as e:
print(f"✗ 读取workspace_id失败: {e}")
return False
def _log_api_call(self, operation, request_data, response_data):
"""记录API调用到JSON文件"""
try:
# 读取现有记录
with open(self.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 添加新记录
record = {
"operation": operation,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"request": request_data,
"response": response_data
}
log_data["records"].append(record)
# 写回文件
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
print(f"✓ API调用已记录到日志文件")
except Exception as e:
print(f"✗ 记录日志失败: {str(e)}")
def get_story_fields_info(self):
"""
获取TAPD需求的所有字段配置及候选值
Returns:
dict: 字段配置信息失败返回None
"""
print("\n=== 获取TAPD需求字段配置 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
url = f"{self.base_url}/stories/get_fields_info"
params = {
"workspace_id": self.workspace_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_story_fields_info", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 成功获取需求字段配置")
# 显示字段统计
if isinstance(data, dict):
field_count = len(data)
print(f"{field_count} 个字段配置")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_story_fields.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 字段配置已保存到: {output_file}")
# 显示部分字段信息
print(f"\n需求字段列表预览:")
print("-" * 80)
for idx, (field_name, field_info) in enumerate(list(data.items())[:10], 1):
field_label = field_info.get('label', '(无标签)')
html_type = field_info.get('html_type', '(未知类型)')
print(f" {idx}. {field_name} ({field_label}) - 类型: {html_type}")
# 如果有候选值,显示
options = field_info.get('options', [])
if options:
if isinstance(options, dict):
option_list = list(options.values())[:5]
total_count = len(options)
elif isinstance(options, list):
option_list = options[:5]
total_count = len(options)
else:
option_list = []
total_count = 0
if option_list:
print(f" 候选值: {', '.join(str(o) for o in option_list)}" +
(f" ...等{total_count}" if total_count > 5 else ""))
if field_count > 10:
print(f" ... 还有 {field_count - 10} 个字段,详见输出文件")
print("-" * 80)
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def get_bug_custom_fields(self):
"""
获取TAPD缺陷的所有字段配置及候选值
Returns:
dict: 字段配置信息失败返回None
"""
print("\n=== 获取TAPD缺陷字段配置 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
url = f"{self.base_url}/bugs/get_fields_info"
params = {
"workspace_id": self.workspace_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_bug_custom_fields", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 成功获取字段配置")
# 显示字段统计
if isinstance(data, dict):
field_count = len(data)
print(f"{field_count} 个字段配置")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_bug_fields.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 字段配置已保存到: {output_file}")
# 显示部分字段信息
print(f"\n字段列表预览:")
print("-" * 80)
for idx, (field_name, field_info) in enumerate(list(data.items())[:10], 1):
field_label = field_info.get('label', '(无标签)')
html_type = field_info.get('html_type', '(未知类型)')
print(f" {idx}. {field_name} ({field_label}) - 类型: {html_type}")
# 如果有候选值,显示
options = field_info.get('options', [])
if options:
# options可能是字典或列表
if isinstance(options, dict):
option_list = list(options.values())[:5]
total_count = len(options)
elif isinstance(options, list):
option_list = options[:5]
total_count = len(options)
else:
option_list = []
total_count = 0
if option_list:
print(f" 候选值: {', '.join(str(o) for o in option_list)}" +
(f" ...等{total_count}" if total_count > 5 else ""))
if field_count > 10:
print(f" ... 还有 {field_count - 10} 个字段,详见输出文件")
print("-" * 80)
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def get_story(self, story_id):
"""
根据需求ID获取需求详情
Args:
story_id: 需求ID
Returns:
dict: 需求信息失败返回None
"""
print("\n=== 获取TAPD需求 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 验证story_id
if not story_id:
print("✗ 需求ID不能为空")
return None
url = f"{self.base_url}/stories"
params = {
"workspace_id": self.workspace_id,
"id": story_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
print(f" story_id: {story_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_story", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", [])
if not data:
print(f"\n✗ 未找到需求ID为 {story_id} 的需求")
return None
# TAPD返回的是列表取第一个
story_data = data[0] if isinstance(data, list) else data
story_info = story_data.get("Story", {})
print(f"\n✓ 成功获取需求信息")
print(f"\n需求详情:")
print("=" * 80)
# 显示关键字段
key_fields = [
("id", "ID"),
("name", "标题"),
("status", "状态"),
("priority", "优先级"),
("owner", "处理人"),
("creator", "创建人"),
("created", "创建时间"),
("modified", "最后修改时间"),
("iteration_id", "迭代ID"),
("description", "详细描述")
]
for field_name, field_label in key_fields:
value = story_info.get(field_name, '')
if field_name == "description" and value:
# 描述字段可能很长只显示前100个字符
if len(str(value)) > 100:
value = str(value)[:100] + "..."
print(f" {field_label}: {value}")
print("=" * 80)
# 保存完整数据到文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
f'tapd_story_{story_id}.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(story_info, f, ensure_ascii=False, indent=2)
print(f"\n✓ 完整需求信息已保存到: {output_file}")
return story_info
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def upload_attachment(self, file_path, entry_type, entry_id, owner=None, overwrite=False):
"""
上传附件到TAPD
根据TAPD官方文档实现https://tapd-api.bilibili.co/tapd/files/upload_attachment
Args:
file_path: 本地文件路径
entry_type: 业务对象类型,必填,可选值: story/bug/task
entry_id: 业务对象ID需求/缺陷/任务ID必填
owner: 附件创建人昵称 (可选)
overwrite: 同名文件是否覆盖 (可选默认False)
Returns:
dict: 上传成功返回附件信息失败返回None
"""
print("\n=== 上传附件到TAPD ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 验证必需参数
if not entry_type:
print("✗ entry_type 参数不能为空")
print(" 可选值: story/bug/task")
return None
if entry_type not in ['story', 'bug', 'task']:
print(f"✗ entry_type 参数值无效: {entry_type}")
print(" 可选值: story/bug/task")
return None
if not entry_id:
print("✗ entry_id 参数不能为空")
print(" 需要提供需求/缺陷/任务的ID")
return None
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"✗ 文件不存在: {file_path}")
return None
# 获取文件信息
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
content_type = self._get_content_type(filename)
print(f"\n【文件信息】")
print(f" 文件名: {filename}")
print(f" 文件路径: {file_path}")
print(f" 文件大小: {file_size} 字节 ({file_size/1024:.2f} KB)")
print(f" Content-Type: {content_type}")
# 使用正确的API端点
url = f"{self.base_url}/files/upload_attachment"
# 准备表单数据(按照文档要求)
data = {
'workspace_id': str(self.workspace_id),
'type': entry_type,
'entry_id': str(entry_id)
}
# 添加可选参数
if owner:
data['owner'] = owner
if overwrite:
data['overwrite'] = 'true' if overwrite else 'false'
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n【请求信息】")
print(f" 请求URL: {url}")
print(f" 请求方法: POST")
print(f" Content-Type: multipart/form-data")
print(f" 认证方式: Basic Auth")
print(f" 认证用户: {self.api_user}")
print(f"\n【表单数据】")
print(f" workspace_id: {self.workspace_id}")
print(f" type: {entry_type}")
print(f" entry_id: {entry_id}")
if owner:
print(f" owner: {owner}")
if overwrite:
print(f" overwrite: {overwrite}")
print(f" file: {filename} ({content_type})")
# 打开文件并上传
print(f"\n【正在上传】")
print(f" 请稍候...")
with open(file_path, 'rb') as f:
files = {
'file': (filename, f, content_type)
}
response = requests.post(url, auth=auth, files=files, data=data, timeout=60)
# 输出HTTP响应信息
print(f"\n【HTTP响应】")
print(f" 状态码: {response.status_code}")
print(f" 原因: {response.reason}")
print(f" 响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
# 输出原始响应内容
print(f"\n【原始响应内容】")
print(f" {response.text}")
response_data = response.json()
# 输出解析后的JSON响应
print(f"\n【解析后的JSON响应】")
print(json.dumps(response_data, ensure_ascii=False, indent=2))
# 记录API调用
request_data = {
"url": url,
"method": "POST",
"data": {
"workspace_id": self.workspace_id,
"type": entry_type,
"entry_id": entry_id,
"owner": owner,
"overwrite": overwrite,
"filename": filename,
"file_size": file_size,
"content_type": content_type
},
"auth_user": self.api_user
}
self._log_api_call("upload_attachment", request_data, response_data)
# 检查返回结果
print(f"\n【结果分析】")
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 文件上传成功")
# 显示上传结果
if isinstance(data, dict):
attachment_info = data.get('Attachment', {})
att_id = attachment_info.get('id', '(无ID)')
att_filename = attachment_info.get('filename', '(无文件名)')
att_type = attachment_info.get('type', '(无类型)')
att_entry_id = attachment_info.get('entry_id', '(无entry_id)')
att_owner = attachment_info.get('owner', '(未知)')
att_created = attachment_info.get('created', '(未知)')
print(f" 附件ID: {att_id}")
print(f" 文件名: {att_filename}")
print(f" 业务对象类型: {att_type}")
print(f" 业务对象ID: {att_entry_id}")
print(f" 上传人: {att_owner}")
print(f" 创建时间: {att_created}")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_upload_result.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 上传结果已保存到: {output_file}")
return data
else:
print(f"\n✗ 上传失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def _get_content_type(self, filename):
"""
根据文件扩展名获取Content-Type
Args:
filename: 文件名
Returns:
str: Content-Type
"""
import mimetypes
content_type, _ = mimetypes.guess_type(filename)
return content_type or 'application/octet-stream'
def get_attachments(self, entry_id=None, attachment_type=None, filename=None, owner=None, limit=30):
"""
获取TAPD附件列表
Args:
entry_id: 依赖对象ID (可选)
attachment_type: 附件类型 (可选)
filename: 附件名称 (可选)
owner: 上传人 (可选)
limit: 返回数量限制默认30最大200
Returns:
list: 附件列表失败返回None
"""
print("\n=== 获取TAPD附件列表 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
# 使用公司地址
url = f"{self.base_url}/attachments"
params = {
"workspace_id": self.workspace_id,
"limit": limit
}
# 添加可选参数
if entry_id:
params["entry_id"] = entry_id
if attachment_type:
params["type"] = attachment_type
if filename:
params["filename"] = filename
if owner:
params["owner"] = owner
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
if entry_id:
print(f" entry_id: {entry_id}")
if attachment_type:
print(f" type: {attachment_type}")
if filename:
print(f" filename: {filename}")
if owner:
print(f" owner: {owner}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_attachments", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", [])
print(f"\n✓ 成功获取附件列表")
# 显示附件统计
if isinstance(data, list):
attachment_count = len(data)
print(f"{attachment_count} 个附件")
if attachment_count > 0:
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_attachments.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 附件列表已保存到: {output_file}")
# 显示附件信息
print(f"\n附件列表预览:")
print("-" * 80)
for idx, item in enumerate(data[:10], 1):
attachment = item.get('Attachment', {})
att_id = attachment.get('id', '(无ID)')
att_filename = attachment.get('filename', '(无文件名)')
att_type = attachment.get('type', '(无类型)')
att_content_type = attachment.get('content_type', '(未知)')
att_owner = attachment.get('owner', '(未知)')
att_created = attachment.get('created', '(未知)')
print(f" {idx}. {att_filename}")
print(f" ID: {att_id}")
print(f" 类型: {att_type} | 内容类型: {att_content_type}")
print(f" 上传人: {att_owner} | 创建时间: {att_created}")
if attachment_count > 10:
print(f" ... 还有 {attachment_count - 10} 个附件,详见输出文件")
print("-" * 80)
else:
print(" 没有找到符合条件的附件")
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def print_menu():
"""打印菜单"""
print("\n" + "="*50)
print("API测试工具")
print("="*50)
print("【企业微信API】")
print("1. 获取access_token")
print("2. 新建文档")
print("3. 重命名文档")
print("4. 删除文档")
print("5. 发送应用消息")
print("\n【TAPD API】")
print("6. 获取缺陷字段配置")
print("7. 获取需求字段配置")
print("8. 获取需求")
print("9. 获取附件列表")
print("10. 上传附件")
print("\n【其他】")
print("11. 查看日志文件")
print("0. 退出")
print("="*50)
def main():
"""主函数"""
wework_tester = WeWorkAPITester()
tapd_tester = TAPDAPITester()
while True:
print_menu()
choice = input("\n请选择操作 (0-11): ").strip()
if choice == "0":
print("\n感谢使用,再见!")
break
elif choice == "1":
print("\n=== 获取access_token ===")
print("1. 从API获取需要输入corpid和corpsecret")
print("2. 从缓存读取")
sub_choice = input("请选择 (1/2): ").strip()
if sub_choice == "1":
print("\n请输入企业微信认证信息:")
corpid = input("企业ID (corpid): ").strip()
corpsecret = input("应用密钥 (corpsecret): ").strip()
if corpid and corpsecret:
wework_tester.get_access_token(corpid, corpsecret)
else:
print("✗ corpid和corpsecret不能为空")
elif sub_choice == "2":
wework_tester.load_token_from_cache()
else:
print("✗ 无效的选择")
elif choice == "2":
doc_name = input("\n请输入文档名称: ").strip()
if not doc_name:
print("✗ 文档名称不能为空")
continue
print("\n文档类型:")
print(" 3 - 文档")
print(" 4 - 表格")
print(" 10 - 智能表格 (默认)")
doc_type_input = input("请选择文档类型 (直接回车默认为10): ").strip()
doc_type = int(doc_type_input) if doc_type_input else 10
wework_tester.create_doc(doc_name, doc_type)
elif choice == "3":
docid = input("\n请输入文档ID: ").strip()
new_name = input("请输入新的文档名称: ").strip()
if docid and new_name:
wework_tester.rename_doc(docid, new_name)
else:
print("✗ 文档ID和新名称不能为空")
elif choice == "4":
docid = input("\n请输入要删除的文档ID: ").strip()
if docid:
confirm = input(f"确认要删除文档 {docid} 吗? (y/n): ").strip().lower()
if confirm == 'y':
wework_tester.delete_doc(docid)
else:
print("已取消删除操作")
else:
print("✗ 文档ID不能为空")
elif choice == "5":
# 发送应用消息
content = input("\n请输入消息内容: ").strip()
if not content:
print("✗ 消息内容不能为空")
continue
wework_tester.send_message(content)
elif choice == "6":
# 获取TAPD缺陷字段配置
tapd_tester.get_bug_custom_fields()
elif choice == "7":
# 获取TAPD需求字段配置
tapd_tester.get_story_fields_info()
elif choice == "8":
# 获取TAPD需求
story_id = input("\n请输入需求ID: ").strip()
if not story_id:
print("✗ 需求ID不能为空")
continue
tapd_tester.get_story(story_id)
elif choice == "9":
# 获取TAPD附件列表
print("\n=== 获取附件列表 ===")
print("是否需要添加筛选条件?")
print("1. 获取所有附件(默认)")
print("2. 按依赖对象ID筛选")
print("3. 按附件类型筛选")
print("4. 按文件名筛选")
print("5. 按上传人筛选")
filter_choice = input("请选择 (直接回车默认为1): ").strip()
entry_id = None
attachment_type = None
filename = None
owner = None
if filter_choice == "2":
entry_id = input("请输入依赖对象ID: ").strip()
elif filter_choice == "3":
attachment_type = input("请输入附件类型: ").strip()
elif filter_choice == "4":
filename = input("请输入文件名: ").strip()
elif filter_choice == "5":
owner = input("请输入上传人: ").strip()
# 询问返回数量
limit_input = input("请输入返回数量限制 (直接回车默认为30最大200): ").strip()
limit = int(limit_input) if limit_input else 30
tapd_tester.get_attachments(
entry_id=entry_id,
attachment_type=attachment_type,
filename=filename,
owner=owner,
limit=limit
)
elif choice == "10":
# 上传附件到TAPD
print("\n=== 上传附件 ===")
file_path = input("请输入文件路径: ").strip()
if not file_path:
print("✗ 文件路径不能为空")
continue
# 询问业务对象类型(必填)
print("\n业务对象类型:")
print(" story - 需求")
print(" bug - 缺陷")
print(" task - 任务")
entry_type = input("请选择业务对象类型 (story/bug/task): ").strip().lower()
if entry_type not in ['story', 'bug', 'task']:
print("✗ 业务对象类型无效,必须是 story/bug/task 之一")
continue
# 询问业务对象ID必填
type_name_map = {'story': '需求', 'bug': '缺陷', 'task': '任务'}
entry_id = input(f"请输入{type_name_map[entry_type]}ID: ").strip()
if not entry_id:
print("✗ 业务对象ID不能为空")
continue
# 询问是否指定上传人(可选)
owner_input = input("请输入上传人用户名 (直接回车使用API调用者): ").strip()
owner = owner_input if owner_input else None
# 询问是否覆盖同名文件(可选)
overwrite_input = input("是否覆盖同名文件? (y/n直接回车默认为n): ").strip().lower()
overwrite = True if overwrite_input == 'y' else False
tapd_tester.upload_attachment(
file_path=file_path,
entry_type=entry_type,
entry_id=entry_id,
owner=owner,
overwrite=overwrite
)
elif choice == "11":
print("\n=== 查看日志文件 ===")
try:
with open(wework_tester.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
records = log_data.get("records", [])
if not records:
print("日志文件为空")
else:
print(f"\n共有 {len(records)} 条记录\n")
for i, record in enumerate(records[-10:], 1): # 只显示最近10条
print(f"记录 {i}:")
print(f" 操作: {record.get('operation')}")
print(f" 时间: {record.get('timestamp')}")
# 根据不同的操作类型显示不同的响应信息
response = record.get('response', {})
if 'errcode' in response:
# 企业微信API响应
print(f" 响应: errcode={response.get('errcode')}, "
f"errmsg={response.get('errmsg')}")
elif 'status' in response:
# TAPD API响应
print(f" 响应: status={response.get('status')}, "
f"info={response.get('info', 'ok')}")
print()
if len(records) > 10:
print(f"(仅显示最近10条完整日志请查看: {wework_tester.log_file})")
except Exception as e:
print(f"✗ 读取日志文件失败: {str(e)}")
else:
print("✗ 无效的选择,请重新输入")
input("\n按回车键继续...")
if __name__ == "__main__":
main()