114 lines
4.2 KiB
Python

"""任务三配置管理"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import ConfigManager
class Task3ConfigManager(ConfigManager):
"""任务三配置管理器"""
def __init__(self):
project_root = Path(__file__).parent.parent
config_path = project_root / "config3" / "config.ini"
super().__init__(config_path)
def get_workspace_id(self) -> str:
"""获取TAPD工作空间ID"""
if not self.config.has_section('TAPD'):
raise ValueError("配置文件缺少[TAPD]节")
if not self.config.has_option('TAPD', 'workspace_id'):
raise ValueError("配置文件[TAPD]节缺少workspace_id配置项")
workspace_id = self.config.get('TAPD', 'workspace_id').strip()
if not workspace_id:
raise ValueError("workspace_id配置项不能为空")
return workspace_id
def get_push_time(self) -> str:
"""获取推送时间"""
if not self.config.has_section('Schedule'):
return "09:30"
return self.config.get('Schedule', 'push_time', fallback='09:30').strip()
def get_skip_weekend(self) -> bool:
"""是否跳过周末"""
if not self.config.has_section('Schedule'):
return True
return self.config.getboolean('Schedule', 'skip_weekend', fallback=True)
def get_webhook_url(self) -> str:
"""获取企微Webhook URL"""
if not self.config.has_section('WeWork'):
raise ValueError("配置文件缺少[WeWork]节")
if not self.config.has_option('WeWork', 'webhook_url'):
raise ValueError("配置文件[WeWork]节缺少webhook_url配置项")
webhook_url = self.config.get('WeWork', 'webhook_url').strip()
if not webhook_url:
raise ValueError("webhook_url配置项不能为空")
return webhook_url
def get_smartsheet_config(self) -> dict:
"""获取智能表格配置"""
if not self.config.has_section('Smartsheet'):
raise ValueError("配置文件缺少[Smartsheet]节")
if not self.config.has_option('Smartsheet', 'docid'):
raise ValueError("配置文件[Smartsheet]节缺少docid配置项")
if not self.config.has_option('Smartsheet', 'sheet_id'):
raise ValueError("配置文件[Smartsheet]节缺少sheet_id配置项")
docid = self.config.get('Smartsheet', 'docid').strip()
sheet_id = self.config.get('Smartsheet', 'sheet_id').strip()
if not docid or not sheet_id:
raise ValueError("docid和sheet_id配置项不能为空")
return {'docid': docid, 'sheet_id': sheet_id}
def get_tech_team_config(self) -> dict:
"""从智能表格加载技术组成员配置
返回: {
'member_list': ['范宇', '杰割', '周浩'],
'user_mapping': {'范宇': 'FanYu', '杰割': 'JieGe', '周浩': 'ZhouHao'}
}
"""
from src.token_manager import TokenManager
from src.smartsheet import SmartSheetAPI
# 获取配置
smartsheet_config = self.get_smartsheet_config()
docid = smartsheet_config['docid']
sheet_id = smartsheet_config['sheet_id']
# 获取access_token
token_manager = TokenManager()
access_token = token_manager.get_token()
# 读取智能表格
api = SmartSheetAPI(access_token, docid)
records_result = api.get_records(sheet_id)
records = records_result['records']
member_list = []
user_mapping = {}
for record in records:
tapd_name = api.get_field_value_by_title(record, 'TAPD用户名')
wework_id = api.get_field_value_by_title(record, '企微UserID')
enabled = api.get_field_value_by_title(record, '是否启用')
if enabled == '' and tapd_name and wework_id:
member_list.append(tapd_name)
user_mapping[tapd_name] = wework_id
if not member_list:
raise ValueError("智能表格中没有启用的技术组成员")
return {
'member_list': member_list,
'user_mapping': user_mapping
}