From 5be7992bfd9656c2147b7d859c1e6ffc5cfd3e8f Mon Sep 17 00:00:00 2001 From: zelong <2895587166@qq.com> Date: Thu, 25 Dec 2025 14:52:06 +0800 Subject: [PATCH] 'feature:multi-sheet-support' --- src/main.py | 382 ++++++++++++++++++++++++++--------------- src/sync_status.py | 260 +++++++++++++++++----------- src/wework_notifier.py | 55 +++--- 3 files changed, 425 insertions(+), 272 deletions(-) diff --git a/src/main.py b/src/main.py index 2179d72..de907ce 100644 --- a/src/main.py +++ b/src/main.py @@ -118,81 +118,61 @@ def get_or_validate_access_token(access_token_arg): raise RuntimeError(f"自动获取access_token失败: {e}") -def scan_and_validate_records(access_token: str, docid: str, verbose: bool = False, test_mode: bool = False): +def scan_and_validate_records_for_sheet(access_token: str, docid: str, sheet_id: str, sheet_title: str, + verbose: bool = False, test_mode: bool = False): """ - 扫描智能表格并校验记录 + 扫描并校验单个子表的记录 Args: access_token: 企业微信access_token docid: 智能表格文档ID + sheet_id: 子表ID + sheet_title: 子表标题 verbose: 是否显示详细信息 test_mode: 是否启用测试模式(显示所有字段ID和标题) Returns: Dict: 包含valid_records、invalid_records、fields和sheet_id的字典 """ - print("\n" + "=" * 60) - print("开始扫描智能表格") - print("=" * 60) - # 1. 初始化API - print("\n[1/5] 初始化智能表格API...") api = SmartSheetAPI(access_token, docid, test_mode=test_mode) - print(" ✓ API初始化完成") - if test_mode: - print(" ⚠ 测试模式已启用:将显示所有API调用的详细信息") - # 2. 获取子表列表 - print("\n[2/5] 获取子表列表...") - sheet_list = api.get_sheet_list() - - if not sheet_list: - raise RuntimeError("未找到任何子表") - - # 使用第一个子表 - sheet = sheet_list[0] - sheet_id = sheet['sheet_id'] - sheet_title = sheet.get('title', '未命名') - print(f" ✓ 使用子表: {sheet_title} (ID: {sheet_id})") - - # 3. 获取字段信息 - print("\n[3/5] 获取字段信息...") + # 2. 获取字段信息 fields = api.get_fields(sheet_id) field_mapping = api.build_field_mapping(fields) # 测试模式:显示所有字段的ID和标题 if test_mode: print("\n" + "=" * 60) - print("【测试模式】智能表格字段信息") + print(f"【测试模式】子表 {sheet_title} 字段信息") print("=" * 60) print(f"{'序号':<6} {'字段标题':<20} {'字段ID':<30} {'字段类型':<15}") print("-" * 60) for idx, field in enumerate(fields, 1): - field_title = field.get('field_title', '(无标题)') + field_title_display = field.get('field_title', '(无标题)') field_id = field.get('field_id', '(无ID)') field_type = field.get('field_type', '(未知)') - print(f"{idx:<6} {field_title:<20} {field_id:<30} {field_type:<15}") + print(f"{idx:<6} {field_title_display:<20} {field_id:<30} {field_type:<15}") print("=" * 60) print(f"共 {len(fields)} 个字段\n") if verbose: - print(f" 字段列表:") + print(f" 字段列表:") for field_name in field_mapping.keys(): - print(f" - {field_name}") + print(f" - {field_name}") # 检查必需的字段是否存在 required_fields = ['标题', '详细描述', '优先级', '严重程度', '处理人', '验证人', '发现版本', '模块', '开单状态'] missing_fields = [f for f in required_fields if f not in field_mapping] if missing_fields: - raise RuntimeError(f"智能表格缺少必需字段: {', '.join(missing_fields)}") + raise RuntimeError(f"子表 {sheet_title} 缺少必需字段: {', '.join(missing_fields)}") - # 4. 获取"开单状态"为空的记录 - print("\n[4/5] 查询待开单记录...") + # 3. 获取"开单状态"为空的记录 status_field_id = field_mapping['开单状态'] records = api.get_empty_status_records(sheet_id, status_field_id) if len(records) == 0: - print(" ✓ 没有待开单的记录") + print(f" ✓ 没有待开单的记录") return { 'validation_result': { 'valid_records': [], @@ -200,33 +180,112 @@ def scan_and_validate_records(access_token: str, docid: str, verbose: bool = Fal }, 'fields': fields, 'field_mapping': field_mapping, - 'sheet_id': sheet_id + 'sheet_id': sheet_id, + 'sheet_title': sheet_title } - # 5. 提取记录数据并校验 - print("\n[5/5] 提取并校验记录数据...") + # 4. 提取记录数据并校验 records_data = [] for record in records: - # 现在直接根据字段标题提取数据,不再需要field_mapping record_data = api.extract_record_data(record) records_data.append(record_data) - print(f" ✓ 成功提取 {len(records_data)} 条记录数据") - # 校验记录 - print("\n开始校验记录...") validator = RecordValidator() validation_result = validator.validate_records(records_data) + print(f" ✓ 扫描到 {len(records_data)} 条待开单记录") + print(f" - 校验通过: {len(validation_result['valid_records'])} 条") + if len(validation_result['invalid_records']) > 0: + print(f" - 校验失败: {len(validation_result['invalid_records'])} 条") + # 返回结果,包含字段信息和sheet_id return { 'validation_result': validation_result, 'fields': fields, 'field_mapping': field_mapping, - 'sheet_id': sheet_id + 'sheet_id': sheet_id, + 'sheet_title': sheet_title } +def scan_all_sheets(access_token: str, docid: str, verbose: bool = False, test_mode: bool = False): + """ + 扫描智能表格的所有子表并校验记录 + + Args: + access_token: 企业微信access_token + docid: 智能表格文档ID + verbose: 是否显示详细信息 + test_mode: 是否启用测试模式 + + Returns: + List[Dict]: 所有子表的扫描结果列表,每个元素包含: + - sheet_id: 子表ID + - sheet_title: 子表标题 + - validation_result: 校验结果 + - error: 错误信息(如果处理失败) + """ + print("\n" + "=" * 60) + print("开始扫描智能表格") + print("=" * 60) + + # 1. 初始化API并获取子表列表 + print("\n[1/2] 获取子表列表...") + api = SmartSheetAPI(access_token, docid, test_mode=test_mode) + sheet_list = api.get_sheet_list() + + if not sheet_list: + raise RuntimeError("未找到任何子表") + + print(f" ✓ 找到 {len(sheet_list)} 个子表") + + # 2. 遍历所有子表 + print(f"\n[2/2] 扫描所有子表...") + all_results = [] + + for idx, sheet in enumerate(sheet_list, 1): + sheet_id = sheet['sheet_id'] + sheet_title = sheet.get('title', '未命名') + + print(f"\n[子表 {idx}/{len(sheet_list)}] {sheet_title}") + + try: + # 扫描并校验单个子表 + result = scan_and_validate_records_for_sheet( + access_token, docid, sheet_id, sheet_title, verbose, test_mode + ) + result['error'] = None + all_results.append(result) + + except RuntimeError as e: + # 子表处理失败,记录错误并跳过 + print(f" ✗ 处理失败: {e}") + all_results.append({ + 'sheet_id': sheet_id, + 'sheet_title': sheet_title, + 'validation_result': { + 'valid_records': [], + 'invalid_records': [] + }, + 'error': str(e) + }) + except Exception as e: + # 未预期的错误 + print(f" ✗ 未预期的错误: {type(e).__name__}: {e}") + all_results.append({ + 'sheet_id': sheet_id, + 'sheet_title': sheet_title, + 'validation_result': { + 'valid_records': [], + 'invalid_records': [] + }, + 'error': f"{type(e).__name__}: {e}" + }) + + return all_results + + def create_tapd_bugs(valid_records: list, workspace_id: str, reporter: str, test_mode: bool = False) -> Dict: """ 批量创建TAPD bug单 @@ -510,127 +569,166 @@ def run_once(config_manager: ConfigManager, access_token: str, verbose: bool = F # 获取配置信息 all_config = config_manager.get_all_config() - # 1. 扫描智能表格并校验记录 - scan_result = scan_and_validate_records( + # 1. 扫描所有子表并校验记录 + all_sheet_results = scan_all_sheets( access_token, all_config['smartsheet']['docid'], verbose, test_mode ) - validation_result = scan_result['validation_result'] + # 2. 遍历每个子表,分别处理 + all_invalid_records = [] # 收集所有子表的失败记录(用于推送) + sheet_summaries = [] # 收集每个子表的统计摘要 - # 更新统计信息 - result['scanned_count'] = len(validation_result['valid_records']) + len(validation_result['invalid_records']) - result['valid_count'] = len(validation_result['valid_records']) - result['invalid_count'] = len(validation_result['invalid_records']) + for sheet_result in all_sheet_results: + sheet_id = sheet_result['sheet_id'] + sheet_title = sheet_result['sheet_title'] + validation_result = sheet_result['validation_result'] + error = sheet_result.get('error') - # 2. 显示校验结果 - validator = RecordValidator() - validator.print_validation_summary(validation_result) + # 如果子表处理失败,跳过 + if error: + sheet_summaries.append({ + 'sheet_title': sheet_title, + 'error': error, + 'scanned': 0, + 'valid': 0, + 'invalid': 0, + 'created': 0, + 'failed': 0 + }) + continue - # 显示校验通过的记录详情 - if len(validation_result['valid_records']) > 0: - validator.print_valid_records_summary(validation_result['valid_records']) + # 统计当前子表的数据 + scanned_count = len(validation_result['valid_records']) + len(validation_result['invalid_records']) + valid_count = len(validation_result['valid_records']) + invalid_count = len(validation_result['invalid_records']) - # 3. 创建TAPD bug单(仅针对校验通过的记录) - creation_success_results = [] - creation_failed_results = [] + # 收集失败记录(添加子表信息) + for invalid_record in validation_result['invalid_records']: + invalid_record['sheet_title'] = sheet_title + all_invalid_records.append(invalid_record) - if len(validation_result['valid_records']) > 0: - creation_result = create_tapd_bugs( - validation_result['valid_records'], - all_config['tapd']['workspace_id'], - all_config['tapd']['reporter'], - test_mode - ) + # 3. 创建TAPD bug单(仅针对校验通过的记录) + creation_success_results = [] + creation_failed_results = [] - # 更新统计信息 - result['bugs_created'] = len(creation_result['success_results']) - result['bugs_failed'] = len(creation_result['failed_results']) + if valid_count > 0: + creation_result = create_tapd_bugs( + validation_result['valid_records'], + all_config['tapd']['workspace_id'], + all_config['tapd']['reporter'], + test_mode + ) + creation_success_results = creation_result['success_results'] + creation_failed_results = creation_result['failed_results'] - creation_success_results = creation_result['success_results'] - creation_failed_results = creation_result['failed_results'] + # 4. 处理校验失败的记录(转换为BugCreationResult格式) + validation_failed_results = [] + for invalid_record in validation_result['invalid_records']: + record_data = invalid_record['record_data'] + missing_fields = invalid_record['missing_fields'] - # 4. 处理校验失败的记录(转换为BugCreationResult格式) - validation_failed_results = [] - for invalid_record in validation_result['invalid_records']: - record_data = invalid_record['record_data'] - missing_fields = invalid_record['missing_fields'] + validation_failed_result = BugCreationResult( + record_id=record_data.get('record_id', '未知'), + success=False, + error_message=f"校验失败,缺失字段: {', '.join(missing_fields)}" + ) + validation_failed_results.append(validation_failed_result) - validation_failed_result = BugCreationResult( - record_id=record_data.get('record_id', '未知'), - success=False, - error_message=f"校验失败,缺失字段: {', '.join(missing_fields)}" - ) - validation_failed_results.append(validation_failed_result) + # 5. 回写结果到智能表格 + if len(creation_success_results) > 0 or len(creation_failed_results) > 0 or len(validation_failed_results) > 0: + all_failed_results = creation_failed_results + validation_failed_results - # 5. 回写结果到智能表格(包括校验失败的记录) - sheet_id = scan_result.get('sheet_id') - if sheet_id and (len(creation_success_results) > 0 or len(creation_failed_results) > 0 or len(validation_failed_results) > 0): - # 合并创建失败和校验失败的记录 - all_failed_results = creation_failed_results + validation_failed_results + writeback_result = write_back_results( + access_token, + all_config['smartsheet']['docid'], + sheet_id, + creation_success_results, + all_failed_results, + test_mode + ) - writeback_result = write_back_results( - access_token, - all_config['smartsheet']['docid'], - sheet_id, - creation_success_results, - all_failed_results, - test_mode - ) + # 记录当前子表的统计摘要 + sheet_summaries.append({ + 'sheet_title': sheet_title, + 'error': None, + 'scanned': scanned_count, + 'valid': valid_count, + 'invalid': invalid_count, + 'created': len(creation_success_results), + 'failed': len(creation_failed_results) + len(validation_failed_results) + }) - # 更新统计信息 - result['writeback_success'] = writeback_result.get('success_count', 0) + writeback_result.get('failed_count', 0) + # 更新总体统计 + result['scanned_count'] += scanned_count + result['valid_count'] += valid_count + result['invalid_count'] += invalid_count + result['bugs_created'] += len(creation_success_results) + result['bugs_failed'] += len(creation_failed_results) + len(validation_failed_results) - # 6. 发送企业微信推送通知(仅当有校验失败的记录时) - if len(validation_result['invalid_records']) > 0: - print("\n" + "=" * 60) - print("发送企业微信推送通知") - print("=" * 60) - try: - # 从配置文件读取企业微信推送配置 - wework_config = all_config.get('wework', {}) - agentid = wework_config.get('agentid') - receivers = wework_config.get('receivers') - - if agentid and receivers: - notifier = WeWorkNotifier(access_token, agentid, receivers) - notifier.send_validation_failure_notification(validation_result['invalid_records']) - else: - print(" ⚠ 企业微信推送配置不完整,跳过推送") - print(" 请在config.ini的[wework]节中配置agentid和receivers") - except Exception as e: - print(f" ✗ 企业微信推送失败: {e}") - # 推送失败不影响主流程,继续执行 - - # 显示最终统计 + # 6. 发送企业微信推送通知(合并所有子表的失败记录) + if len(all_invalid_records) > 0: print("\n" + "=" * 60) - print("执行完成") - print("=" * 60) - print(f"✓ 成功连接智能表格") - print(f"✓ 成功获取字段信息") - print(f"✓ 成功筛选待开单记录") - print(f"✓ 成功校验必填项") - if result['bugs_created'] > 0: - print(f"✓ 成功创建 {result['bugs_created']} 个TAPD bug单") - if result['bugs_failed'] > 0: - print(f"⚠ {result['bugs_failed']} 个bug单创建失败") - if len(validation_failed_results) > 0: - print(f"⚠ {len(validation_failed_results)} 条记录校验失败") - if result['writeback_success'] > 0: - print(f"✓ 成功回写 {result['writeback_success']} 条记录到智能表格") - print("=" * 60) - else: - print("\n" + "=" * 60) - print("执行完成") - print("=" * 60) - print(f"✓ 成功连接智能表格") - print(f"✓ 成功获取字段信息") - print(f"✓ 成功筛选待开单记录") - print(f"✓ 成功校验必填项") - print(f"ℹ 没有需要创建的bug单") + print("发送企业微信推送通知") print("=" * 60) + try: + wework_config = all_config.get('wework', {}) + agentid = wework_config.get('agentid') + receivers = wework_config.get('receivers') + + if agentid and receivers: + notifier = WeWorkNotifier(access_token, agentid, receivers) + notifier.send_validation_failure_notification(all_invalid_records) + else: + print(" ⚠ 企业微信推送配置不完整,跳过推送") + except Exception as e: + print(f" ✗ 企业微信推送失败: {e}") + + # 7. 显示最终统计(分表统计 + 总体统计) + print("\n" + "=" * 60) + print("执行完成 - 分表统计") + print("=" * 60) + for summary in sheet_summaries: + sheet_title = summary['sheet_title'] + if summary['error']: + print(f"\n[{sheet_title}]") + print(f" ✗ 处理失败: {summary['error']}") + else: + print(f"\n[{sheet_title}]") + print(f" 扫描: {summary['scanned']} 条") + print(f" ✓ 校验通过: {summary['valid']} 条") + if summary['invalid'] > 0: + print(f" ⚠ 校验失败: {summary['invalid']} 条") + if summary['created'] > 0: + print(f" ✓ Bug创建成功: {summary['created']} 条") + if summary['failed'] > 0: + print(f" ⚠ Bug创建失败: {summary['failed']} 条") + + print("\n" + "=" * 60) + print("执行完成 - 总体统计") + print("=" * 60) + print(f"子表统计:") + print(f" 总计: {len(all_sheet_results)} 个子表") + success_sheets = len([s for s in sheet_summaries if not s['error']]) + failed_sheets = len([s for s in sheet_summaries if s['error']]) + if success_sheets > 0: + print(f" ✓ 成功: {success_sheets} 个") + if failed_sheets > 0: + print(f" ✗ 失败: {failed_sheets} 个") + + print(f"\n记录统计:") + print(f" 扫描: {result['scanned_count']} 条") + if result['valid_count'] > 0: + print(f" ✓ 校验通过: {result['valid_count']} 条") + if result['invalid_count'] > 0: + print(f" ⚠ 校验失败: {result['invalid_count']} 条") + if result['bugs_created'] > 0: + print(f" ✓ Bug创建成功: {result['bugs_created']} 条") + if result['bugs_failed'] > 0: + print(f" ⚠ Bug创建失败: {result['bugs_failed']} 条") + print("=" * 60) result['success'] = True return result diff --git a/src/sync_status.py b/src/sync_status.py index 5f1c559..a78961c 100644 --- a/src/sync_status.py +++ b/src/sync_status.py @@ -38,6 +38,87 @@ class BugStatusSyncer: self.smartsheet_api = None self.tapd_api = None + def _sync_sheet_status(self, sheet_id: str, sheet_title: str) -> Dict: + """ + 同步单个子表的bug状态 + + Args: + sheet_id: 子表ID + sheet_title: 子表标题 + + Returns: + Dict: {'checked': int, 'updated': int} + """ + # 1. 获取字段信息 + fields = self.smartsheet_api.get_fields(sheet_id) + field_mapping = self.smartsheet_api.build_field_mapping(fields) + + # 检查必需的字段是否存在 + required_fields = ['开单状态', 'TAPD单号', 'bug状态'] + missing_fields = [f for f in required_fields if f not in field_mapping] + if missing_fields: + raise RuntimeError(f"子表 {sheet_title} 缺少必需字段: {', '.join(missing_fields)}") + + status_field_id = field_mapping['开单状态'] + bug_id_field_id = field_mapping['TAPD单号'] + bug_status_field_id = field_mapping['bug状态'] + + # 2. 查询已开单的记录 + records = self.smartsheet_api.get_created_bugs( + sheet_id, + status_field_id, + bug_id_field_id, + bug_status_field_id + ) + + if len(records) == 0: + print(f" ✓ 没有需要同步状态的记录") + return {'checked': 0, 'updated': 0} + + print(f" → 检查 {len(records)} 个bug的状态...") + + # 3. 逐个查询TAPD bug状态并对比 + updates = [] # 需要更新的记录列表 + + for record in records: + record_id = record.get('record_id', '未知') + + # 提取TAPD单号 + tapd_bug_field = self.smartsheet_api.get_field_value_by_title(record, 'TAPD单号') + bug_id = str(tapd_bug_field) if tapd_bug_field else '' + + # 获取当前智能表格中的bug状态 + current_status = self.smartsheet_api.get_field_value_by_title(record, 'bug状态') + + try: + # 查询TAPD的最新状态 + bug_info = self.tapd_api.get_bug(bug_id) + latest_status_en = bug_info.get('status', '') + latest_status_cn = BugStatusMapper.to_chinese(latest_status_en) + + # 对比状态是否变化 + if latest_status_cn != current_status: + update_record = { + "record_id": record_id, + "values": { + "bug状态": [{"type": "text", "text": latest_status_cn}] + } + } + updates.append(update_record) + + except Exception: + # 单个bug查询失败不影响其他bug的同步 + continue + + # 4. 批量更新智能表格 + if len(updates) > 0: + self.smartsheet_api.update_records(sheet_id, updates) + print(f" ✓ 更新了 {len(updates)} 个bug的状态") + else: + print(f" ✓ 所有bug状态均未变化") + + return {'checked': len(records), 'updated': len(updates)} + def sync_bug_status(self) -> Dict: """ 执行一次bug状态同步 @@ -59,133 +140,102 @@ class BugStatusSyncer: try: # 1. 初始化API - print("\n[1/6] 初始化API...") + print("\n[1/5] 初始化API...") self.smartsheet_api = SmartSheetAPI(self.access_token, self.docid, test_mode=self.test_mode) self.tapd_api = TAPDApi(self.workspace_id, test_mode=self.test_mode) print(" ✓ API初始化完成") - # 2. 获取子表信息 - print("\n[2/6] 获取子表信息...") + # 2. 获取所有子表 + print("\n[2/5] 获取子表列表...") sheet_list = self.smartsheet_api.get_sheet_list() if not sheet_list: raise RuntimeError("未找到任何子表") - sheet = sheet_list[0] - sheet_id = sheet['sheet_id'] - sheet_title = sheet.get('title', '未命名') - print(f" ✓ 使用子表: {sheet_title} (ID: {sheet_id})") + print(f" ✓ 找到 {len(sheet_list)} 个子表") - # 3. 获取字段信息 - print("\n[3/6] 获取字段信息...") - fields = self.smartsheet_api.get_fields(sheet_id) - field_mapping = self.smartsheet_api.build_field_mapping(fields) + # 3. 遍历所有子表,分别同步状态 + print(f"\n[3/5] 同步所有子表的bug状态...") + sheet_summaries = [] # 收集每个子表的统计摘要 + total_checked = 0 + total_updated = 0 - # 检查必需的字段是否存在 - required_fields = ['开单状态', 'TAPD单号', 'bug状态'] - missing_fields = [f for f in required_fields if f not in field_mapping] - if missing_fields: - raise RuntimeError(f"智能表格缺少必需字段: {', '.join(missing_fields)}") + for idx, sheet in enumerate(sheet_list, 1): + sheet_id = sheet['sheet_id'] + sheet_title = sheet.get('title', '未命名') - status_field_id = field_mapping['开单状态'] - bug_id_field_id = field_mapping['TAPD单号'] - bug_status_field_id = field_mapping['bug状态'] - - # 4. 查询已开单的记录 - print("\n[4/6] 查询已开单的记录...") - records = self.smartsheet_api.get_created_bugs( - sheet_id, - status_field_id, - bug_id_field_id, - bug_status_field_id - ) - - result['checked_count'] = len(records) - - if len(records) == 0: - print(" ✓ 没有需要同步状态的记录") - result['success'] = True - return result - - # 5. 逐个查询TAPD bug状态并对比 - print(f"\n[5/6] 查询TAPD bug状态并对比(共 {len(records)} 条)...") - print("-" * 60) - - updates = [] # 需要更新的记录列表 - - for idx, record in enumerate(records, 1): - record_id = record.get('record_id', '未知') - - # 提取TAPD单号(可能是链接格式) - tapd_bug_field = self.smartsheet_api.get_field_value_by_title(record, 'TAPD单号') - - # 如果是链接格式,提取文本部分作为bug_id - if isinstance(tapd_bug_field, str): - bug_id = tapd_bug_field - else: - # 可能是其他格式,尝试转换为字符串 - bug_id = str(tapd_bug_field) - - # 获取当前智能表格中的bug状态 - current_status = self.smartsheet_api.get_field_value_by_title(record, 'bug状态') - - print(f"\n[{idx}/{len(records)}] 记录ID: {record_id}") - print(f" TAPD单号: {bug_id}") - print(f" 当前状态: {current_status}") + print(f"\n[子表 {idx}/{len(sheet_list)}] {sheet_title}") try: - # 查询TAPD的最新状态 - print(f" → 正在查询TAPD最新状态...") - bug_info = self.tapd_api.get_bug(bug_id) - latest_status_en = bug_info.get('status', '') + # 同步单个子表的状态 + sheet_result = self._sync_sheet_status(sheet_id, sheet_title) - # 将英文状态映射为中文 - latest_status_cn = BugStatusMapper.to_chinese(latest_status_en) + # 记录统计摘要 + sheet_summaries.append({ + 'sheet_title': sheet_title, + 'error': None, + 'checked': sheet_result['checked'], + 'updated': sheet_result['updated'] + }) - print(f" ✓ TAPD最新状态: {latest_status_cn} ({latest_status_en})") - - # 对比状态是否变化(使用中文状态对比) - if latest_status_cn != current_status: - print(f" ⚠ 状态已变化: {current_status} → {latest_status_cn}") - - # 添加到更新列表(回写中文状态) - update_record = { - "record_id": record_id, - "values": { - "bug状态": [{"type": "text", "text": latest_status_cn}] - } - } - updates.append(update_record) - else: - print(f" ✓ 状态未变化,无需更新") + # 累加总计 + total_checked += sheet_result['checked'] + total_updated += sheet_result['updated'] except RuntimeError as e: - print(f" ✗ 查询TAPD失败: {e}") - # 单个bug查询失败不影响其他bug的同步 - continue - + print(f" ✗ 处理失败: {e}") + sheet_summaries.append({ + 'sheet_title': sheet_title, + 'error': str(e), + 'checked': 0, + 'updated': 0 + }) except Exception as e: - print(f" ✗ 未预期的错误: {type(e).__name__}: {e}") - continue + print(f" ✗ 未预期的错误: {type(e).__name__}: {e}") + sheet_summaries.append({ + 'sheet_title': sheet_title, + 'error': f"{type(e).__name__}: {e}", + 'checked': 0, + 'updated': 0 + }) - # 6. 批量更新智能表格 - if len(updates) > 0: - print(f"\n[6/6] 批量更新智能表格(共 {len(updates)} 条)...") - try: - self.smartsheet_api.update_records(sheet_id, updates) - result['updated_count'] = len(updates) - print(f" ✓ 成功更新 {len(updates)} 条记录") - except RuntimeError as e: - print(f" ✗ 更新失败: {e}") - result['error_message'] = f"更新智能表格失败: {e}" - return result - else: - print(f"\n[6/6] 所有记录状态均未变化,无需更新") + # 4. 显示最终统计(分表统计 + 总体统计) + result['checked_count'] = total_checked + result['updated_count'] = total_updated print("\n" + "=" * 60) - print("状态同步完成") + print("状态同步完成 - 分表统计") print("=" * 60) - print(f"✓ 检查了 {result['checked_count']} 个bug") - print(f"✓ 更新了 {result['updated_count']} 个bug的状态") + for summary in sheet_summaries: + sheet_title = summary['sheet_title'] + if summary['error']: + print(f"\n[{sheet_title}]") + print(f" ✗ 处理失败: {summary['error']}") + else: + print(f"\n[{sheet_title}]") + print(f" 检查: {summary['checked']} 个bug") + if summary['updated'] > 0: + print(f" ✓ 更新: {summary['updated']} 个bug") + else: + print(f" ✓ 所有bug状态均未变化") + + print("\n" + "=" * 60) + print("状态同步完成 - 总体统计") + print("=" * 60) + print(f"子表统计:") + print(f" 总计: {len(sheet_list)} 个子表") + success_sheets = len([s for s in sheet_summaries if not s['error']]) + failed_sheets = len([s for s in sheet_summaries if s['error']]) + if success_sheets > 0: + print(f" ✓ 成功: {success_sheets} 个") + if failed_sheets > 0: + print(f" ✗ 失败: {failed_sheets} 个") + + print(f"\nBug统计:") + print(f" 检查: {total_checked} 个bug") + if total_updated > 0: + print(f" ✓ 更新: {total_updated} 个bug") + else: + print(f" ✓ 所有bug状态均未变化") print("=" * 60) result['success'] = True diff --git a/src/wework_notifier.py b/src/wework_notifier.py index d16ae99..088ce5c 100644 --- a/src/wework_notifier.py +++ b/src/wework_notifier.py @@ -50,52 +50,57 @@ class WeWorkNotifier: def _build_failure_message(self, invalid_records: List[Dict]) -> str: """ - 构造校验失败消息内容 + 构造校验失败消息内容(支持多子表分组) Args: - invalid_records: 校验失败的记录列表 + invalid_records: 校验失败的记录列表,每条记录包含 sheet_title 字段 Returns: str: 格式化的消息内容 """ + # 按子表分组 + records_by_sheet = {} + for record in invalid_records: + sheet_title = record.get('sheet_title', '未知子表') + if sheet_title not in records_by_sheet: + records_by_sheet[sheet_title] = [] + records_by_sheet[sheet_title].append(record) + # 消息头部 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + total_count = len(invalid_records) + sheet_count = len(records_by_sheet) + lines = [ "【autoTAPD 开单失败通知】", f"时间: {timestamp}", - f"失败数量: {len(invalid_records)} 条", + f"失败数量: {total_count} 条(来自 {sheet_count} 个子表)", "", "以下记录校验失败,请检查并补充缺失字段:", "=" * 40 ] - # 添加每条失败记录的详情 - for idx, invalid_record in enumerate(invalid_records, 1): - record_data = invalid_record['record_data'] - missing_fields = invalid_record['missing_fields'] + # 按子表分组显示失败记录 + global_idx = 1 + for sheet_title, sheet_records in records_by_sheet.items(): + lines.append(f"\n【子表:{sheet_title}】") + lines.append("") - # 获取记录标题(如果有) - title = record_data.get('标题', '(无标题)') - record_id = record_data.get('record_id', '未知') + for record in sheet_records: + record_data = record['record_data'] + missing_fields = record['missing_fields'] - lines.append(f"\n[{idx}] {title}") - lines.append(f"记录ID: {record_id}") - lines.append(f"缺失字段: {', '.join(missing_fields)}") + # 获取记录标题 + title = record_data.get('标题', '(无标题)') + record_id = record_data.get('record_id', '未知') - # 显示已有的字段值(用于参考) - available_fields = [] - for field_name in ['标题', '详细描述', '优先级', '严重程度', '处理人', '验证人', '发现版本', '模块']: - if field_name not in missing_fields and record_data.get(field_name): - value = record_data.get(field_name) - # 截断过长的值 - if isinstance(value, str) and len(value) > 20: - value = value[:20] + "..." - available_fields.append(f"{field_name}: {value}") + lines.append(f"[{global_idx}] {title}") + lines.append(f"记录ID: {record_id}") + lines.append(f"缺失字段: {', '.join(missing_fields)}") + lines.append("") - if available_fields: - lines.append(f"已有字段: {', '.join(available_fields[:3])}") # 只显示前3个 + global_idx += 1 - lines.append("") lines.append("=" * 40) lines.append("请前往智能表格补充缺失字段后,系统将自动重新开单。")