'fix_bug_status_and_auto'

This commit is contained in:
zelong 2025-12-24 16:28:39 +08:00
parent e566d4d93a
commit 9c448ed206
8 changed files with 814 additions and 57 deletions

127
src/api_logger.py Normal file
View File

@ -0,0 +1,127 @@
"""
API调用日志记录模块
负责记录所有API调用的请求和响应到JSON文件
"""
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional
from pathlib import Path
class APILogger:
"""API调用日志记录器"""
def __init__(self, log_file: Optional[str] = None):
"""
初始化日志记录器
Args:
log_file: 日志文件路径如果为None则使用默认路径
"""
if log_file is None:
# 默认路径:项目根目录/logs/api_log.json
project_root = Path(__file__).parent.parent
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
self.log_file = log_dir / "api_log.json"
else:
self.log_file = Path(log_file)
# 确保日志文件存在
self._init_log_file()
def _init_log_file(self):
"""初始化日志文件"""
if not self.log_file.exists():
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump({"records": []}, f, ensure_ascii=False, indent=2)
def log_api_call(self, api_type: str, operation: str,
request_data: Dict[str, Any],
response_data: Dict[str, Any],
success: bool = True,
error_message: Optional[str] = None):
"""
记录API调用
Args:
api_type: API类型 "smartsheet", "tapd", "wework"
operation: 操作名称 "get_records", "create_bug"
request_data: 请求数据包含urlmethodparams等
response_data: 响应数据
success: 是否成功
error_message: 错误信息如果失败
"""
try:
# 读取现有记录
with open(self.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 添加新记录
record = {
"api_type": api_type,
"operation": operation,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"success": success,
"request": request_data,
"response": response_data
}
if error_message:
record["error_message"] = error_message
log_data["records"].append(record)
# 写回文件
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
except Exception as e:
# 日志记录失败不应该影响主流程
print(f"⚠ API日志记录失败: {str(e)}")
# 全局单例
_global_logger = None
def get_logger() -> APILogger:
"""
获取全局API日志记录器单例
Returns:
APILogger: 日志记录器实例
"""
global _global_logger
if _global_logger is None:
_global_logger = APILogger()
return _global_logger
if __name__ == "__main__":
# 测试代码
print("=== API日志记录器测试 ===\n")
logger = APILogger()
# 测试记录一个成功的API调用
logger.log_api_call(
api_type="smartsheet",
operation="get_records",
request_data={
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records",
"method": "POST",
"params": {"docid": "test123", "sheet_id": "sheet456"}
},
response_data={
"errcode": 0,
"errmsg": "ok",
"records": []
},
success=True
)
print("✓ 成功记录API调用")
print(f"日志文件: {logger.log_file}")

View File

@ -8,7 +8,15 @@
import requests
import json
import os
import sys
from datetime import datetime
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager
class WeWorkAPITester:
@ -52,6 +60,66 @@ class WeWorkAPITester:
except Exception as e:
print(f"✗ 记录日志失败: {str(e)}")
def load_token_from_cache(self):
"""
从缓存文件读取access_token
Returns:
bool: 是否成功读取token
"""
print("\n=== 从缓存读取access_token ===")
# token缓存文件路径
cache_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'config',
'token_cache.json'
)
try:
# 检查文件是否存在
if not os.path.exists(cache_file):
print(f"✗ 缓存文件不存在: {cache_file}")
return False
# 读取缓存文件
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
access_token = cache_data.get('access_token')
fetch_time = cache_data.get('fetch_time')
if not access_token:
print("✗ 缓存文件中没有access_token")
return False
# 检查token是否过期7200秒 = 2小时
import time
current_time = time.time()
elapsed_time = current_time - fetch_time
remaining_time = 7200 - elapsed_time
if remaining_time <= 0:
print("✗ 缓存的token已过期")
print(f" 获取时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fetch_time))}")
print(f" 已过期 {int(-remaining_time)}")
return False
# token有效加载到内存
self.access_token = access_token
print(f"✓ 成功从缓存读取access_token")
print(f" Token: {self.access_token[:20]}...")
print(f" 获取时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fetch_time))}")
print(f" 剩余有效期: {int(remaining_time)}秒 ({int(remaining_time//60)}分钟)")
return True
except json.JSONDecodeError:
print(f"✗ 缓存文件格式错误")
return False
except Exception as e:
print(f"✗ 读取缓存失败: {str(e)}")
return False
def get_access_token(self, corpid, corpsecret):
"""
获取access_token
@ -267,42 +335,309 @@ class WeWorkAPITester:
print(f"✗ 请求异常: {str(e)}")
return False
def send_message(self, content):
"""
发送应用消息
Args:
content: 消息内容
Returns:
bool: 是否成功
"""
print("\n=== 发送应用消息 ===")
if not self.access_token:
print("✗ 请先获取access_token")
return False
# 从配置文件读取agentid和receivers
try:
config_manager = ConfigManager()
config = config_manager.config
if not config.has_section('wework'):
print("✗ 配置文件缺少[wework]节")
return False
if not config.has_option('wework', 'agentid'):
print("✗ 配置文件[wework]节缺少agentid配置项")
return False
if not config.has_option('wework', 'receivers'):
print("✗ 配置文件[wework]节缺少receivers配置项")
return False
agentid = config.get('wework', 'agentid').strip()
receivers = config.get('wework', 'receivers').strip()
print(f"✓ 从配置文件读取agentid: {agentid}")
print(f"✓ 从配置文件读取receivers: {receivers}")
except Exception as e:
print(f"✗ 读取配置失败: {e}")
return False
url = f"{self.base_url}/message/send"
params = {"access_token": self.access_token}
# 构造请求体使用配置文件中的receivers
data = {
"touser": receivers,
"msgtype": "text",
"agentid": int(agentid),
"text": {
"content": content
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
try:
response = requests.post(url, params=params, json=data, timeout=10)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"params": {"access_token": "***"},
"body": data
}
self._log_api_call("send_message", request_data, response_data)
# 检查返回结果
if response_data.get("errcode") == 0:
print(f"✓ 消息发送成功")
print(f" 消息内容: {content}")
return True
else:
print(f"✗ 发送失败")
print(f" 错误码: {response_data.get('errcode')}")
print(f" 错误信息: {response_data.get('errmsg')}")
return False
except Exception as e:
print(f"✗ 请求异常: {str(e)}")
return False
class TAPDAPITester:
"""TAPD API测试类"""
def __init__(self):
self.log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'api_test_log.json')
self.base_url = "https://tapd-api.bilibili.co/tapd"
self.api_user = None
self.api_password = None
self.workspace_id = None
def _init_auth(self):
"""初始化TAPD认证信息"""
# 从环境变量读取认证信息
self.api_user = os.environ.get('TAPD_API_USER')
self.api_password = os.environ.get('TAPD_API_PASSWORD')
if not self.api_user or not self.api_password:
print("✗ TAPD认证信息未设置")
print(" 请设置环境变量:")
print(" - TAPD_API_USER")
print(" - TAPD_API_PASSWORD")
return False
print(f"✓ TAPD认证信息已加载 (用户: {self.api_user})")
return True
def _init_workspace_id(self):
"""从配置文件读取workspace_id"""
try:
config_manager = ConfigManager()
tapd_config = config_manager.get_tapd_config()
self.workspace_id = tapd_config['workspace_id']
print(f"✓ 从配置文件读取workspace_id: {self.workspace_id}")
return True
except Exception as e:
print(f"✗ 读取workspace_id失败: {e}")
return False
def _log_api_call(self, operation, request_data, response_data):
"""记录API调用到JSON文件"""
try:
# 读取现有记录
with open(self.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 添加新记录
record = {
"operation": operation,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"request": request_data,
"response": response_data
}
log_data["records"].append(record)
# 写回文件
with open(self.log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
print(f"✓ API调用已记录到日志文件")
except Exception as e:
print(f"✗ 记录日志失败: {str(e)}")
def get_bug_custom_fields(self):
"""
获取TAPD缺陷的所有字段配置及候选值
Returns:
dict: 字段配置信息失败返回None
"""
print("\n=== 获取TAPD缺陷字段配置 ===")
# 初始化认证信息
if not self._init_auth():
return None
# 初始化workspace_id
if not self._init_workspace_id():
return None
url = f"{self.base_url}/bugs/get_fields_info"
params = {
"workspace_id": self.workspace_id
}
try:
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self.api_user, self.api_password)
print(f"\n正在请求TAPD API...")
print(f" URL: {url}")
print(f" workspace_id: {self.workspace_id}")
response = requests.get(url, params=params, auth=auth, timeout=30)
response_data = response.json()
# 记录API调用
request_data = {
"url": url,
"method": "GET",
"params": params,
"auth_user": self.api_user
}
self._log_api_call("get_bug_custom_fields", request_data, response_data)
# 检查返回结果
if response_data.get("status") == 1:
data = response_data.get("data", {})
print(f"\n✓ 成功获取字段配置")
# 显示字段统计
if isinstance(data, dict):
field_count = len(data)
print(f"{field_count} 个字段配置")
# 保存到单独的文件
output_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'logs',
'tapd_bug_fields.json'
)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" ✓ 字段配置已保存到: {output_file}")
# 显示部分字段信息
print(f"\n字段列表预览:")
print("-" * 80)
for idx, (field_name, field_info) in enumerate(list(data.items())[:10], 1):
field_label = field_info.get('label', '(无标签)')
html_type = field_info.get('html_type', '(未知类型)')
print(f" {idx}. {field_name} ({field_label}) - 类型: {html_type}")
# 如果有候选值,显示
options = field_info.get('options', [])
if options:
# options可能是字典或列表
if isinstance(options, dict):
option_list = list(options.values())[:5]
total_count = len(options)
elif isinstance(options, list):
option_list = options[:5]
total_count = len(options)
else:
option_list = []
total_count = 0
if option_list:
print(f" 候选值: {', '.join(str(o) for o in option_list)}" +
(f" ...等{total_count}" if total_count > 5 else ""))
if field_count > 10:
print(f" ... 还有 {field_count - 10} 个字段,详见输出文件")
print("-" * 80)
return data
else:
print(f"\n✗ 获取失败")
print(f" 状态码: {response_data.get('status')}")
print(f" 错误信息: {response_data.get('info', '未知错误')}")
return None
except Exception as e:
print(f"\n✗ 请求异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def print_menu():
"""打印菜单"""
print("\n" + "="*50)
print("企业微信智能表格API测试工具")
print("API测试工具")
print("="*50)
print("【企业微信API】")
print("1. 获取access_token")
print("2. 新建文档")
print("3. 重命名文档")
print("4. 删除文档")
print("5. 查看日志文件")
print("5. 发送应用消息")
print("\n【TAPD API】")
print("6. 获取缺陷字段配置")
print("\n【其他】")
print("7. 查看日志文件")
print("0. 退出")
print("="*50)
def main():
"""主函数"""
tester = WeWorkAPITester()
wework_tester = WeWorkAPITester()
tapd_tester = TAPDAPITester()
while True:
print_menu()
choice = input("\n请选择操作 (0-5): ").strip()
choice = input("\n请选择操作 (0-7): ").strip()
if choice == "0":
print("\n感谢使用,再见!")
break
elif choice == "1":
print("\n请输入企业微信认证信息:")
corpid = input("企业ID (corpid): ").strip()
corpsecret = input("应用密钥 (corpsecret): ").strip()
print("\n=== 获取access_token ===")
print("1. 从API获取需要输入corpid和corpsecret")
print("2. 从缓存读取")
sub_choice = input("请选择 (1/2): ").strip()
if corpid and corpsecret:
tester.get_access_token(corpid, corpsecret)
if sub_choice == "1":
print("\n请输入企业微信认证信息:")
corpid = input("企业ID (corpid): ").strip()
corpsecret = input("应用密钥 (corpsecret): ").strip()
if corpid and corpsecret:
wework_tester.get_access_token(corpid, corpsecret)
else:
print("✗ corpid和corpsecret不能为空")
elif sub_choice == "2":
wework_tester.load_token_from_cache()
else:
print("✗ corpid和corpsecret不能为空")
print("无效的选择")
elif choice == "2":
doc_name = input("\n请输入文档名称: ").strip()
@ -317,14 +652,14 @@ def main():
doc_type_input = input("请选择文档类型 (直接回车默认为10): ").strip()
doc_type = int(doc_type_input) if doc_type_input else 10
tester.create_doc(doc_name, doc_type)
wework_tester.create_doc(doc_name, doc_type)
elif choice == "3":
docid = input("\n请输入文档ID: ").strip()
new_name = input("请输入新的文档名称: ").strip()
if docid and new_name:
tester.rename_doc(docid, new_name)
wework_tester.rename_doc(docid, new_name)
else:
print("✗ 文档ID和新名称不能为空")
@ -334,16 +669,29 @@ def main():
if docid:
confirm = input(f"确认要删除文档 {docid} 吗? (y/n): ").strip().lower()
if confirm == 'y':
tester.delete_doc(docid)
wework_tester.delete_doc(docid)
else:
print("已取消删除操作")
else:
print("✗ 文档ID不能为空")
elif choice == "5":
# 发送应用消息
content = input("\n请输入消息内容: ").strip()
if not content:
print("✗ 消息内容不能为空")
continue
wework_tester.send_message(content)
elif choice == "6":
# 获取TAPD缺陷字段配置
tapd_tester.get_bug_custom_fields()
elif choice == "7":
print("\n=== 查看日志文件 ===")
try:
with open(tester.log_file, 'r', encoding='utf-8') as f:
with open(wework_tester.log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
records = log_data.get("records", [])
@ -355,12 +703,21 @@ def main():
print(f"记录 {i}:")
print(f" 操作: {record.get('operation')}")
print(f" 时间: {record.get('timestamp')}")
print(f" 响应: errcode={record.get('response', {}).get('errcode')}, "
f"errmsg={record.get('response', {}).get('errmsg')}")
# 根据不同的操作类型显示不同的响应信息
response = record.get('response', {})
if 'errcode' in response:
# 企业微信API响应
print(f" 响应: errcode={response.get('errcode')}, "
f"errmsg={response.get('errmsg')}")
elif 'status' in response:
# TAPD API响应
print(f" 响应: status={response.get('status')}, "
f"info={response.get('info', 'ok')}")
print()
if len(records) > 10:
print(f"(仅显示最近10条完整日志请查看: {tester.log_file})")
print(f"(仅显示最近10条完整日志请查看: {wework_tester.log_file})")
except Exception as e:
print(f"✗ 读取日志文件失败: {str(e)}")

View File

@ -18,6 +18,7 @@ from src.validator import RecordValidator
from src.tapd_api import TAPDApi
from src.mapper import FieldMapper, BugCreationResult
from src.token_manager import TokenManager
from src.status_mapper import BugStatusMapper
def parse_arguments():
@ -192,8 +193,13 @@ def scan_and_validate_records(access_token: str, docid: str, verbose: bool = Fal
if len(records) == 0:
print(" ✓ 没有待开单的记录")
return {
'valid_records': [],
'invalid_records': []
'validation_result': {
'valid_records': [],
'invalid_records': []
},
'fields': fields,
'field_mapping': field_mapping,
'sheet_id': sheet_id
}
# 5. 提取记录数据并校验
@ -288,13 +294,14 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
# 生成bug URL
bug_url = tapd_api.get_bug_url(bug_id)
# 获取bug状态
bug_status = bug_info.get('status', '新建')
# 获取bug状态英文并映射为中文
bug_status_en = bug_info.get('status', 'new')
bug_status_cn = BugStatusMapper.to_chinese(bug_status_en)
print(f" ✓ 创建成功!")
print(f" Bug ID: {bug_id}")
print(f" Bug URL: {bug_url}")
print(f" Bug 状态: {bug_status}")
print(f" Bug 状态: {bug_status_cn} ({bug_status_en})")
# 记录成功结果
result = BugCreationResult(
@ -303,8 +310,8 @@ def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test
bug_id=bug_id,
bug_url=bug_url
)
# 添加bug状态信息
result.bug_status = bug_status
# 添加bug状态信息(存储中文)
result.bug_status = bug_status_cn
success_results.append(result)
except ValueError as e:
@ -415,7 +422,7 @@ def write_back_results(access_token: str, docid: str, sheet_id: str,
"text": result.bug_id,
"link": result.bug_url
}],
"bug状态": [{"type": "text", "text": getattr(result, 'bug_status', '')}]
"bug状态": [{"type": "text", "text": getattr(result, 'bug_status', '')}]
}
}
update_records.append(record_update)
@ -525,7 +532,10 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
if len(validation_result['valid_records']) > 0:
validator.print_valid_records_summary(validation_result['valid_records'])
# 3. 创建TAPD bug单
# 3. 创建TAPD bug单仅针对校验通过的记录
creation_success_results = []
creation_failed_results = []
if len(validation_result['valid_records']) > 0:
creation_result = create_tapd_bugs(
validation_result['valid_records'],
@ -538,20 +548,39 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
result['bugs_created'] = len(creation_result['success_results'])
result['bugs_failed'] = len(creation_result['failed_results'])
# 4. 回写结果到智能表格
sheet_id = scan_result.get('sheet_id')
if sheet_id:
writeback_result = write_back_results(
access_token,
all_config['smartsheet']['docid'],
sheet_id,
creation_result['success_results'],
creation_result['failed_results'],
test_mode
)
creation_success_results = creation_result['success_results']
creation_failed_results = creation_result['failed_results']
# 更新统计信息
result['writeback_success'] = writeback_result.get('success_count', 0) + writeback_result.get('failed_count', 0)
# 4. 处理校验失败的记录转换为BugCreationResult格式
validation_failed_results = []
for invalid_record in validation_result['invalid_records']:
record_data = invalid_record['record_data']
missing_fields = invalid_record['missing_fields']
validation_failed_result = BugCreationResult(
record_id=record_data.get('record_id', '未知'),
success=False,
error_message=f"校验失败,缺失字段: {', '.join(missing_fields)}"
)
validation_failed_results.append(validation_failed_result)
# 5. 回写结果到智能表格(包括校验失败的记录)
sheet_id = scan_result.get('sheet_id')
if sheet_id and (len(creation_success_results) > 0 or len(creation_failed_results) > 0 or len(validation_failed_results) > 0):
# 合并创建失败和校验失败的记录
all_failed_results = creation_failed_results + validation_failed_results
writeback_result = write_back_results(
access_token,
all_config['smartsheet']['docid'],
sheet_id,
creation_success_results,
all_failed_results,
test_mode
)
# 更新统计信息
result['writeback_success'] = writeback_result.get('success_count', 0) + writeback_result.get('failed_count', 0)
# 显示最终统计
print("\n" + "=" * 60)
@ -561,9 +590,12 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F
print(f"✓ 成功获取字段信息")
print(f"✓ 成功筛选待开单记录")
print(f"✓ 成功校验必填项")
print(f"✓ 成功创建 {result['bugs_created']} 个TAPD bug单")
if result['bugs_created'] > 0:
print(f"✓ 成功创建 {result['bugs_created']} 个TAPD bug单")
if result['bugs_failed'] > 0:
print(f"{result['bugs_failed']} 个bug单创建失败")
if len(validation_failed_results) > 0:
print(f"{len(validation_failed_results)} 条记录校验失败")
if result['writeback_success'] > 0:
print(f"✓ 成功回写 {result['writeback_success']} 条记录到智能表格")
print("=" * 60)

View File

@ -86,7 +86,7 @@ class FieldMapper:
priority = record_data.get('优先级', '').strip()
if not priority:
raise ValueError("优先级不能为空")
tapd_data['priority'] = priority
tapd_data['priority_label'] = priority
# 4. 严重程度(必填)
severity = record_data.get('严重程度', '').strip()

View File

@ -5,6 +5,7 @@
import requests
from typing import Dict, List, Optional, Any
from src.api_logger import get_logger
class SmartSheetAPI:
@ -26,6 +27,7 @@ class SmartSheetAPI:
self.docid = docid
self.test_mode = test_mode
self.session = requests.Session()
self.logger = get_logger()
def _make_request(self, endpoint: str, method: str = "POST", data: Optional[Dict] = None) -> Dict:
"""
@ -44,6 +46,13 @@ class SmartSheetAPI:
"""
url = f"{self.BASE_URL}/{endpoint}?access_token={self.access_token}"
# 准备日志记录的请求数据隐藏access_token
log_request_data = {
"url": f"{self.BASE_URL}/{endpoint}",
"method": method,
"data": data
}
# 测试模式:显示请求信息
if self.test_mode:
print("\n" + "=" * 80)
@ -65,12 +74,20 @@ class SmartSheetAPI:
response.raise_for_status()
result = response.json()
# 记录API调用日志成功
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=True
)
# 测试模式:显示响应信息
if self.test_mode:
import json
print(f"\n响应状态码: {response.status_code}")
print(f"响应数据:")
# 测试模式下显示完整的响应数据
print(json.dumps(result, ensure_ascii=False, indent=2))
print("=" * 80)
@ -82,9 +99,29 @@ class SmartSheetAPI:
return result
except requests.exceptions.Timeout:
raise RuntimeError(f"API请求超时: {endpoint}")
error_msg = f"API请求超时: {endpoint}"
# 记录API调用日志失败
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"API请求失败: {e}")
error_msg = f"API请求失败: {e}"
# 记录API调用日志失败
self.logger.log_api_call(
api_type="smartsheet",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
def get_sheet_list(self) -> List[Dict]:
"""
@ -299,12 +336,12 @@ class SmartSheetAPI:
print(f" ✓ 过滤后找到 {len(all_records)} 条已开单记录")
# 过滤掉终态的记录bug状态为rejected、closed或空)
# 过滤掉终态的记录bug状态为"取消"、"验证通过"或空)
filtered_records = []
skipped_count = 0
# 定义终态列表
terminal_statuses = ['rejected', 'closed']
# 定义终态列表(中文)
terminal_statuses = ['取消', '验证通过']
for record in all_records:
bug_status = self.get_field_value_by_title(record, 'bug状态')

130
src/status_mapper.py Normal file
View File

@ -0,0 +1,130 @@
"""
TAPD Bug状态映射模块
负责将TAPD API返回的英文状态映射为中文显示值
"""
from typing import Optional
class BugStatusMapper:
"""Bug状态映射器"""
# TAPD Bug状态映射表英文key -> 中文显示值
STATUS_MAP = {
"new": "",
"in_progress": "处理中",
"resolved": "已解决 待验证",
"reopened": "重新打开",
"rejected": "取消",
"closed": "验证通过"
}
@classmethod
def to_chinese(cls, english_status: str) -> str:
"""
将英文状态转换为中文
Args:
english_status: TAPD API返回的英文状态 "new", "in_progress"
Returns:
str: 中文状态 "", "处理中"
如果状态未知返回原始值
"""
if not english_status:
return "未知"
# 转换为小写并去除空格
status_key = english_status.strip().lower()
# 查找映射
chinese_status = cls.STATUS_MAP.get(status_key)
if chinese_status:
return chinese_status
else:
# 如果找不到映射,返回原始值(可能是新增的状态)
return english_status
@classmethod
def to_english(cls, chinese_status: str) -> Optional[str]:
"""
将中文状态转换为英文反向映射
Args:
chinese_status: 中文状态 "", "处理中"
Returns:
Optional[str]: 英文状态 "new", "in_progress"
如果状态未知返回None
"""
if not chinese_status:
return None
# 反向查找
for english, chinese in cls.STATUS_MAP.items():
if chinese == chinese_status.strip():
return english
return None
@classmethod
def is_terminal_status(cls, status: str) -> bool:
"""
判断是否为终态状态不需要继续同步的状态
Args:
status: 状态值支持中文或英文
Returns:
bool: 是否为终态
"""
# 终态列表(英文)
terminal_statuses_en = ["rejected", "closed"]
# 终态列表(中文)
terminal_statuses_cn = ["取消", "验证通过"]
status_lower = status.strip().lower()
return (status_lower in terminal_statuses_en or
status.strip() in terminal_statuses_cn)
@classmethod
def get_all_statuses(cls) -> dict:
"""
获取所有状态映射
Returns:
dict: 完整的状态映射字典
"""
return cls.STATUS_MAP.copy()
if __name__ == "__main__":
# 测试代码
print("=== TAPD Bug状态映射测试 ===\n")
# 测试英文转中文
print("【英文 -> 中文】")
test_statuses = ["new", "in_progress", "resolved", "reopened", "rejected", "closed", "unknown"]
for status in test_statuses:
chinese = BugStatusMapper.to_chinese(status)
print(f" {status:<15} -> {chinese}")
print("\n【中文 -> 英文】")
test_statuses_cn = ["", "处理中", "已解决 待验证", "重新打开", "取消", "验证通过", "未知状态"]
for status in test_statuses_cn:
english = BugStatusMapper.to_english(status)
print(f" {status:<20} -> {english}")
print("\n【终态判断】")
test_all = ["new", "", "rejected", "取消", "closed", "验证通过", "in_progress", "处理中"]
for status in test_all:
is_terminal = BugStatusMapper.is_terminal_status(status)
print(f" {status:<20} -> {'终态' if is_terminal else '非终态'}")
print("\n【所有状态映射】")
all_statuses = BugStatusMapper.get_all_statuses()
for en, cn in all_statuses.items():
print(f" {en:<15} -> {cn}")

View File

@ -13,6 +13,7 @@ sys.path.insert(0, str(project_root))
from src.smartsheet import SmartSheetAPI
from src.tapd_api import TAPDApi
from src.status_mapper import BugStatusMapper
class BugStatusSyncer:
@ -135,19 +136,22 @@ class BugStatusSyncer:
# 查询TAPD的最新状态
print(f" → 正在查询TAPD最新状态...")
bug_info = self.tapd_api.get_bug(bug_id)
latest_status = bug_info.get('status', '')
latest_status_en = bug_info.get('status', '')
print(f" ✓ TAPD最新状态: {latest_status}")
# 将英文状态映射为中文
latest_status_cn = BugStatusMapper.to_chinese(latest_status_en)
# 对比状态是否变化
if latest_status != current_status:
print(f" ⚠ 状态已变化: {current_status}{latest_status}")
print(f" ✓ TAPD最新状态: {latest_status_cn} ({latest_status_en})")
# 添加到更新列表
# 对比状态是否变化(使用中文状态对比)
if latest_status_cn != current_status:
print(f" ⚠ 状态已变化: {current_status}{latest_status_cn}")
# 添加到更新列表(回写中文状态)
update_record = {
"record_id": record_id,
"values": {
"bug状态": [{"type": "text", "text": latest_status}]
"bug状态": [{"type": "text", "text": latest_status_cn}]
}
}
updates.append(update_record)

View File

@ -7,6 +7,7 @@ import os
import requests
from typing import Dict, Optional, Any
from requests.auth import HTTPBasicAuth
from src.api_logger import get_logger
class TAPDApi:
@ -44,6 +45,9 @@ class TAPDApi:
# 设置Basic Auth
self.auth = HTTPBasicAuth(self.api_user, self.api_password)
# 初始化日志记录器
self.logger = get_logger()
print(f" ✓ TAPD API初始化完成 (workspace_id: {workspace_id})")
if test_mode:
print(f" ⚠ 测试模式已启用将显示所有TAPD API调用的详细信息")
@ -67,6 +71,15 @@ class TAPDApi:
"""
url = f"{self.BASE_URL}/{endpoint}"
# 准备日志记录的请求数据(隐藏认证信息)
log_request_data = {
"url": url,
"method": method,
"params": params,
"data": data,
"auth_user": self.api_user
}
# 测试模式:显示请求信息
if self.test_mode:
print("\n" + "=" * 80)
@ -129,21 +142,78 @@ class TAPDApi:
# 检查TAPD API返回的状态
if result.get('status') != 1:
error_msg = result.get('info', '未知错误')
# 记录API调用日志失败
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=False,
error_message=error_msg
)
raise RuntimeError(f"TAPD API调用失败: {error_msg}")
# 记录API调用日志成功
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data=result,
success=True
)
return result
except requests.exceptions.Timeout:
raise RuntimeError(f"TAPD API请求超时: {endpoint}")
error_msg = f"TAPD API请求超时: {endpoint}"
# 记录API调用日志失败
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
except requests.exceptions.HTTPError as e:
error_msg = f"TAPD API HTTP错误: {e}"
if hasattr(e.response, 'text'):
error_msg += f"\n响应内容: {e.response.text[:200]}"
# 记录API调用日志失败
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"TAPD API请求失败: {e}")
error_msg = f"TAPD API请求失败: {e}"
# 记录API调用日志失败
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
except ValueError as e:
raise RuntimeError(f"TAPD API响应解析失败: {e}")
error_msg = f"TAPD API响应解析失败: {e}"
# 记录API调用日志失败
self.logger.log_api_call(
api_type="tapd",
operation=endpoint,
request_data=log_request_data,
response_data={},
success=False,
error_message=error_msg
)
raise RuntimeError(error_msg)
def create_bug(self, bug_data: Dict[str, Any]) -> Dict:
"""