2026-04-01 16:01:58 +08:00

194 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""任务三主流程"""
import sys
from pathlib import Path
from datetime import datetime
from typing import Dict, List
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src3.logger import get_task3_logger
from src3.config import Task3ConfigManager
from src3.overdue_fetcher import OverdueFetcher
from src3.message_formatter import MessageFormatter
from src3.webhook_sender import WebhookSender
def _post_simple_markdown(webhook_url: str, content: str):
"""发送简单markdown消息轻量通知"""
import requests
payload = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
try:
requests.post(webhook_url, json=payload, timeout=10)
return True
except Exception:
return False
def _send_error_notification(webhook_url: str, group_name: str):
"""发送错误通知"""
content = f"⚠️ 今日{group_name}组过期单提醒获取失败,请人工检查"
_post_simple_markdown(webhook_url, content)
def _send_no_overdue_message(webhook_url: str, group_name: str):
"""发送无过期单消息"""
content = f"✅ 今日{group_name}组无过期单,大家保持!"
_post_simple_markdown(webhook_url, content)
def run_once():
"""执行一次过期单推送"""
logger = get_task3_logger()
sync_id = logger.start_sync("manual")
try:
print("任务三TAPD过期单推送")
# 1. 加载基础配置
try:
config = Task3ConfigManager()
workspace_id = config.get_workspace_id()
except ValueError as e:
print(f"✗ 配置错误: {e}")
logger.end_sync_with_stats({}, False, f"配置错误: {e}", sync_id=sync_id)
return
# 2. 加载多组配置(组名/成员子表/Webhook
try:
group_team_configs = config.get_group_team_configs(logger=logger)
except ValueError as e:
print(f"✗ 组配置错误: {e}")
logger.end_sync_with_stats({}, False, f"组配置错误: {e}", sync_id=sync_id)
return
except Exception as e: # pragma: no cover
print(f"✗ 获取组配置失败: {e}")
logger.end_sync_with_stats({}, False, f"配置加载失败: {e}", sync_id=sync_id)
return
for group in group_team_configs:
group_name = group["group_name"]
member_count = len(group["member_list"])
print(f"{group_name}组成员数: {member_count}")
# 3. 合并成员后统一拉取过期单避免重复调用TAPD
all_members = []
for group in group_team_configs:
for member in group["member_list"]:
if member not in all_members:
all_members.append(member)
if not all_members:
print("⚠️ 所有组白名单均为空,跳过本次推送")
logger.end_sync_with_stats(
{"group_count": len(group_team_configs), "overdue_count": 0},
True,
sync_id=sync_id,
)
return
# 4. 获取过期单
try:
fetcher = OverdueFetcher(workspace_id, logger)
items = fetcher.fetch_all_overdue(all_members)
except Exception as e:
print(f"✗ TAPD API调用失败: {e}")
for group in group_team_configs:
_send_error_notification(group["webhook_url"], group["group_name"])
logger.end_sync_with_stats({}, False, f"TAPD API失败: {e}", sync_id=sync_id)
return
print(f"获取到 {len(items)} 条过期单")
# 5. 按组分发推送
today = datetime.now().strftime('%Y-%m-%d')
group_stats: List[Dict] = []
failed_groups: List[str] = []
skipped_empty_groups: List[str] = []
for group in group_team_configs:
group_name = group["group_name"]
webhook_url = group["webhook_url"]
member_list = group["member_list"]
user_mapping = group["user_mapping"]
if not member_list:
print(f"⚠️ {group_name}组白名单为空,跳过该组推送")
skipped_empty_groups.append(group_name)
group_stats.append(
{"group_name": group_name, "overdue_count": 0, "push_success": True, "skipped": True}
)
continue
member_set = set(member_list)
group_items = [item for item in items if item["owner"] in member_set]
if not group_items:
print(f"{group_name}组无过期单")
_send_no_overdue_message(webhook_url, group_name)
group_stats.append(
{"group_name": group_name, "overdue_count": 0, "push_success": True, "skipped": False}
)
continue
formatter = MessageFormatter(logger)
content, mentioned_list = formatter.format_message(
group_items,
user_mapping,
today,
group_name=group_name
)
if formatter.unmapped_users:
unmapped = ", ".join(sorted(formatter.unmapped_users))
print(f"⚠️ {group_name}组未映射用户: {unmapped}")
sender = WebhookSender(webhook_url, logger)
success = sender.send_markdown(content, mentioned_list)
if success:
print(f"{group_name}组推送成功({len(group_items)}条)")
else:
print(f"{group_name}组推送失败")
failed_groups.append(group_name)
group_stats.append(
{
"group_name": group_name,
"overdue_count": len(group_items),
"push_success": success,
"skipped": False,
}
)
overall_success = len(failed_groups) == 0
stats = {
"group_count": len(group_team_configs),
"overdue_count": len(items),
"group_stats": group_stats,
"skipped_empty_groups": skipped_empty_groups,
}
if overall_success:
logger.end_sync_with_stats(stats, True, sync_id=sync_id)
else:
logger.end_sync_with_stats(
stats,
False,
f"推送失败分组: {', '.join(failed_groups)}",
sync_id=sync_id,
)
except Exception as e:
print(f"✗ 未知错误: {e}")
logger.end_sync_with_stats({}, False, f"未知错误: {e}", sync_id=sync_id)
if __name__ == "__main__":
run_once()