2025-11-24 14:05:30 +08:00

149 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
import json
try:
import markdown
except ImportError:
raise ImportError("错误: 'markdown' 库未安装。请运行: pip install markdown")
# --- 1. 辅助函数与模块常量加载 ---
def _load_asset_file(filename: str) -> str:
"""从 assets 文件夹加载一个文本文件的内容。"""
# 获取当前文件 (analyzer.py) 的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构造 assets 文件的完整路径
file_path = os.path.join(current_dir, 'assets', filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
print(f"CRITICAL ERROR: Asset file not found at '{file_path}'")
raise
except Exception as e:
print(f"CRITICAL ERROR: Failed to read asset file '{file_path}'. Error: {e}")
raise
# 在模块加载时,只执行一次文件读取操作
try:
FULL_SYSTEM_PROMPT = _load_asset_file('main_prompt.md')
HTML_TEMPLATE = _load_asset_file('report_template.html')
except Exception:
# 如果加载失败,程序应中止
exit(1)
# --- 2. 内部功能函数 ---
def _convert_markdown_to_html(markdown_text: str, title: str) -> str:
"""将Markdown文本转换为带样式的完整HTML页面。"""
html_body = markdown.markdown(markdown_text, extensions=['fenced_code', 'tables'])
html_body_styled = html_body.replace('<table>', '<table class="table table-striped table-bordered table-hover">')
# 使用从文件加载的模板,并替换占位符
return HTML_TEMPLATE.format(title=title, html_body_styled=html_body_styled)
def _analyze_chat_file(filepath: str, api_key: str, model_name: str, site_url: str, site_name: str) -> str | None:
"""读取单个聊天文件并调用OpenRouter API进行分析。"""
print(f"-> 正在分析新文件: {os.path.basename(filepath)}...")
try:
with open(filepath, 'r', encoding='utf-8') as file:
chat_content = file.read()
except FileNotFoundError:
print(f" 错误: 文件未找到 {filepath}")
return None
if len(chat_content.strip()) < 50:
print(f" 警告: 文件 {os.path.basename(filepath)} 内容过短,已跳过。")
return "NO_CONTENT"
api_payload = {
"model": model_name,
"messages": [
{"role": "system", "content": FULL_SYSTEM_PROMPT},
{"role": "user", "content": f"请根据系统提示中的需求,分析以下聊天记录:\\n\\n---\\n{chat_content}\\n---"}
],
"temperature": 0.7, "top_p": 1, "max_tokens": 8192
}
try:
response = requests.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": site_url,
"X-Title": site_name,
},
data=json.dumps(api_payload),
timeout=180
)
if response.status_code == 200:
print(f" 成功接收到来自 {model_name} 的分析结果。")
analysis_result = response.json()
return analysis_result['choices'][0]['message']['content']
else:
print(f" API请求失败状态码: {response.status_code}, 错误: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f" API请求异常: {e}")
return None
# --- 3. 主调用函数 (无变动) ---
def run_analysis_on_new_files(input_folder: str, output_folder: str, config: dict) -> int:
"""
扫描输入文件夹对新出现的聊天记录进行分析并生成HTML报告。
"""
if not os.path.exists(input_folder):
print(f"错误:找不到输入文件夹 '{input_folder}'")
return 0
if not os.path.exists(output_folder):
os.makedirs(output_folder)
print(f"已创建输出文件夹: {output_folder}")
newly_analyzed_count = 0
all_log_files = sorted([f for f in os.listdir(input_folder) if f.endswith(".txt")])
if not all_log_files:
return 0
latest_log_filename = all_log_files[-1]
for filename in all_log_files:
base_name = os.path.splitext(filename)[0]
input_path = os.path.join(input_folder, filename)
output_filename = f"{base_name}_analysis.html"
output_path = os.path.join(output_folder, output_filename)
is_latest_file = (filename == latest_log_filename)
if os.path.exists(output_path) and not is_latest_file:
continue
if os.path.exists(output_path) and is_latest_file:
print(f"-> 发现最新日期的报告已存在,将强制重新分析 '{filename}'...")
analysis_markdown = _analyze_chat_file(
filepath=input_path,
api_key=config.get("OPENROUTER_API_KEY"),
model_name=config.get("MODEL_NAME"),
site_url=config.get("YOUR_SITE_URL"),
site_name=config.get("YOUR_SITE_NAME")
)
if analysis_markdown and analysis_markdown != "NO_CONTENT":
report_title = f"玩家社群分析报告 - {base_name}"
html_content = _convert_markdown_to_html(analysis_markdown, title=report_title)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f" ✔ HTML分析报告已成功保存至: {output_path}\\n")
newly_analyzed_count += 1
return newly_analyzed_count