G41_TAPD_BUG_SYNC/src/smartsheet.py
2025-12-18 16:23:16 +08:00

398 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
智能表格API调用模块
负责与企业微信智能表格API交互
"""
import requests
from typing import Dict, List, Optional, Any
class SmartSheetAPI:
"""智能表格API封装类"""
# API基础URL
BASE_URL = "https://qyapi.weixin.qq.com/cgi-bin/wedoc"
def __init__(self, access_token: str, docid: str, test_mode: bool = False):
"""
初始化智能表格API
Args:
access_token: 企业微信access_token
docid: 智能表格文档ID
test_mode: 是否启用测试模式显示API返回结果
"""
self.access_token = access_token
self.docid = docid
self.test_mode = test_mode
self.session = requests.Session()
def _make_request(self, endpoint: str, method: str = "POST", data: Optional[Dict] = None) -> Dict:
"""
发起API请求的通用方法
Args:
endpoint: API端点
method: HTTP方法GET或POST
data: 请求数据
Returns:
Dict: API响应数据
Raises:
RuntimeError: API调用失败时抛出
"""
url = f"{self.BASE_URL}/{endpoint}?access_token={self.access_token}"
# 测试模式:显示请求信息
if self.test_mode:
print("\n" + "=" * 80)
print(f"【测试模式】API调用: {endpoint}")
print("=" * 80)
print(f"请求方法: {method}")
print(f"请求URL: {self.BASE_URL}/{endpoint}")
if data:
import json
print(f"请求数据:")
print(json.dumps(data, ensure_ascii=False, indent=2))
try:
if method.upper() == "POST":
response = self.session.post(url, json=data, timeout=30)
else:
response = self.session.get(url, params=data, timeout=30)
response.raise_for_status()
result = response.json()
# 测试模式:显示响应信息
if self.test_mode:
import json
print(f"\n响应状态码: {response.status_code}")
print(f"响应数据:")
# 测试模式下显示完整的响应数据
print(json.dumps(result, ensure_ascii=False, indent=2))
print("=" * 80)
# 检查企业微信API返回的错误码
if result.get('errcode', 0) != 0:
error_msg = result.get('errmsg', '未知错误')
raise RuntimeError(f"API调用失败: errcode={result['errcode']}, errmsg={error_msg}")
return result
except requests.exceptions.Timeout:
raise RuntimeError(f"API请求超时: {endpoint}")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"API请求失败: {e}")
def get_sheet_list(self) -> List[Dict]:
"""
获取文档的所有子表信息
Returns:
List[Dict]: 子表列表每个子表包含sheet_id和title
Raises:
RuntimeError: API调用失败时抛出
"""
print(f"正在获取文档子表列表...")
data = {
"docid": self.docid
}
result = self._make_request("smartsheet/get_sheet", data=data)
sheet_list = result.get('sheet_list', [])
print(f" ✓ 成功获取 {len(sheet_list)} 个子表")
return sheet_list
def get_fields(self, sheet_id: str) -> List[Dict]:
"""
获取子表的所有字段信息
Args:
sheet_id: 子表ID
Returns:
List[Dict]: 字段列表每个字段包含field_id、field_title、field_type等信息
Raises:
RuntimeError: API调用失败时抛出
"""
print(f"正在获取子表字段信息...")
data = {
"docid": self.docid,
"sheet_id": sheet_id
}
result = self._make_request("smartsheet/get_fields", data=data)
fields = result.get('fields', [])
print(f" ✓ 成功获取 {len(fields)} 个字段")
return fields
def build_field_mapping(self, fields: List[Dict]) -> Dict[str, str]:
"""
构建字段名称到字段ID的映射
Args:
fields: 字段列表
Returns:
Dict[str, str]: 字段名称到字段ID的映射字典
"""
mapping = {}
for field in fields:
field_title = field.get('field_title', '')
field_id = field.get('field_id', '')
if field_title and field_id:
mapping[field_title] = field_id
print(f" ✓ 构建字段映射完成,共 {len(mapping)} 个字段")
return mapping
def get_records(self, sheet_id: str, filter_spec: Optional[Dict] = None,
limit: int = 100, offset: int = 0) -> Dict:
"""
查询子表记录
Args:
sheet_id: 子表ID
filter_spec: 过滤条件
limit: 返回记录数量限制
offset: 偏移量
Returns:
Dict: 包含records和total的字典
Raises:
RuntimeError: API调用失败时抛出
"""
data = {
"docid": self.docid,
"sheet_id": sheet_id,
"limit": limit,
"offset": offset
}
if filter_spec:
data["filter_spec"] = filter_spec
result = self._make_request("smartsheet/get_records", data=data)
records = result.get('records', [])
total = result.get('total', 0)
return {
'records': records,
'total': total
}
def get_empty_status_records(self, sheet_id: str, status_field_id: str) -> List[Dict]:
"""
获取"开单状态"字段为空的记录
Args:
sheet_id: 子表ID
status_field_id: "开单状态"字段的field_id
Returns:
List[Dict]: 符合条件的记录列表
Raises:
RuntimeError: API调用失败时抛出
"""
print(f"正在查询\"开单状态\"为空的记录...")
# 构造过滤条件:开单状态字段为空
filter_spec = {
"conjunction": "CONJUNCTION_AND",
"conditions": [
{
"field_id": status_field_id,
"operator": "OPERATOR_IS_EMPTY"
}
]
}
all_records = []
offset = 0
limit = 100
while True:
result = self.get_records(sheet_id, filter_spec, limit, offset)
records = result['records']
total = result['total']
all_records.extend(records)
print(f" - 已获取 {len(all_records)}/{total} 条记录")
# 如果已获取所有记录,退出循环
if len(all_records) >= total:
break
offset += limit
print(f" ✓ 共找到 {len(all_records)} 条待开单记录")
return all_records
def get_field_value_by_title(self, record: Dict, field_title: str) -> Any:
"""
从记录中根据字段标题获取字段值
Args:
record: 记录对象
field_title: 字段标题(字段名称)
Returns:
Any: 字段值如果字段不存在或为空则返回None
对于多选字段,返回包含所有选项文本的列表
"""
# API返回的数据结构是 record.values[字段名]
values = record.get('values', {})
if field_title not in values:
return None
field_value_list = values[field_title]
# 如果字段值为空列表返回None
if not field_value_list or len(field_value_list) == 0:
return None
# 获取第一个值来判断字段类型
first_value = field_value_list[0]
# 根据值的结构判断字段类型并提取
# 注意:判断顺序很重要!先判断更具体的类型,再判断通用类型
# 1. 人员字段(包含 user_id
if 'user_id' in first_value:
return first_value.get('user_id', '')
# 2. 单选/多选字段(同时包含 id 和 text
elif 'id' in first_value and 'text' in first_value:
# 如果有多个值说明是多选字段返回所有选项的text列表
if len(field_value_list) > 1:
# 多选字段返回所有选项的text列表
return [item.get('text', '') for item in field_value_list if 'text' in item]
else:
# 单选字段返回单个text值
return first_value.get('text', '')
# 3. 纯文本字段(只包含 text没有 id
elif 'text' in first_value:
return first_value.get('text', '')
# 4. 数字字段
elif 'number' in first_value:
return first_value.get('number')
# 5. 其他未知类型
else:
return first_value
return None
def get_field_value(self, record: Dict, field_id: str) -> Any:
"""
从记录中根据字段ID获取字段值保留用于向后兼容
Args:
record: 记录对象
field_id: 字段ID
Returns:
Any: 字段值如果字段不存在或为空则返回None
"""
fields = record.get('fields', [])
for field in fields:
if field.get('field_id') == field_id:
# 根据字段类型返回相应的值
field_type = field.get('field_type')
if field_type == 'text':
return field.get('value', {}).get('text', '')
elif field_type == 'number':
return field.get('value', {}).get('number')
elif field_type == 'select':
# 单选字段
options = field.get('value', {}).get('options', [])
if options:
return options[0].get('text', '')
return ''
elif field_type == 'user':
# 人员字段
users = field.get('value', {}).get('members', [])
if users:
return users[0].get('userid', '')
return ''
else:
# 其他类型返回原始value
return field.get('value')
return None
def extract_record_data(self, record: Dict, field_mapping: Dict[str, str] = None) -> Dict[str, Any]:
"""
从记录中提取所有需要的字段数据
注意现在直接根据字段标题field_title匹配不再需要field_mapping
Args:
record: 记录对象
field_mapping: 字段名称到字段ID的映射已废弃保留用于向后兼容
Returns:
Dict[str, Any]: 提取的字段数据key为字段标题value为字段值
"""
data = {
'record_id': record.get('record_id', ''),
}
# API返回的数据结构是 record.valueskey就是字段标题
values = record.get('values', {})
# 遍历所有字段标题
for field_title in values.keys():
# 使用新方法根据字段标题获取值
data[field_title] = self.get_field_value_by_title(record, field_title)
return data
def update_records(self, sheet_id: str, records: List[Dict]) -> Dict:
"""
更新智能表格记录
Args:
sheet_id: 子表ID
records: 需要更新的记录列表每个记录包含record_id和values
Returns:
Dict: 包含更新结果的字典
Raises:
RuntimeError: API调用失败时抛出
"""
print(f"正在更新 {len(records)} 条记录...")
data = {
"docid": self.docid,
"sheet_id": sheet_id,
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": records
}
result = self._make_request("smartsheet/update_records", data=data)
updated_records = result.get('records', [])
print(f" ✓ 成功更新 {len(updated_records)} 条记录")
return result