diff --git a/.gitignore b/.gitignore index 2002ece18..7764ac44e 100644 --- a/.gitignore +++ b/.gitignore @@ -125,3 +125,7 @@ __pycache__/ /DOC/marketing/email_outreach/exports/*.mbox /DOC/marketing/email_outreach/exports/*.eml /DOC/marketing/email_outreach/reports/*first_pass_triage* + +# Dashboard private runtime config. Community-monitor sqlite is intentionally tracked for now. +/Tools/Dashboard/private/*.env +/Tools/Dashboard/data/community_monitor/twitter-runs/ diff --git a/Tools/Dashboard/community_monitor.env.example b/Tools/Dashboard/community_monitor.env.example new file mode 100644 index 000000000..075412af6 --- /dev/null +++ b/Tools/Dashboard/community_monitor.env.example @@ -0,0 +1,22 @@ +OPENROUTER_API_KEY= +APP_ID=3774440 +PRODUCT_NAME=帝国幻想乡~TOHOTOPIA +DATABASE_PATH=data/community_monitor/tohotopia_monitor.sqlite3 +SYNC_INTERVAL_MINUTES=30 +AUTO_SYNC_ENABLED=true +TWITTER_ENABLED=false +TWITTER_USERNAME=Tohotopia +TWITTER_BROWSER_PROVIDER=existing +TWITTER_OUTPUT_DIR=data/community_monitor/twitter-runs +TWITTER_FULL_MAX_NO_NEW=6 +TWITTER_INCREMENTAL_MAX_NO_NEW=2 +TWITTER_THREAD_MAX_NO_NEW=3 +TWITTER_COMMAND_TIMEOUT_SECONDS=900 +TWITTER_FULL_REPLY_POST_LIMIT=0 +TWITTER_INCREMENTAL_REPLY_PARENT_LIMIT=20 +DISCUSSION_FULL_SCAN_MAX_PAGES=500 +DISCUSSION_INCREMENTAL_MAX_PAGES=5 +FULL_SCAN_TIME_LIMIT_SECONDS=7200 +OPENROUTER_MODEL=deepseek/deepseek-v4-pro +OPENROUTER_REFERER=http://localhost:8080 +OPENROUTER_TITLE=TH1 Dashboard Community Monitor diff --git a/Tools/Dashboard/community_monitor/__init__.py b/Tools/Dashboard/community_monitor/__init__.py new file mode 100644 index 000000000..5be934020 --- /dev/null +++ b/Tools/Dashboard/community_monitor/__init__.py @@ -0,0 +1 @@ +"""TOHOTOPIA community monitor.""" diff --git a/Tools/Dashboard/community_monitor/api.py b/Tools/Dashboard/community_monitor/api.py new file mode 100644 index 000000000..f9bfe001d --- /dev/null +++ b/Tools/Dashboard/community_monitor/api.py @@ -0,0 +1,393 @@ +from __future__ import annotations + +from hashlib import sha1 +import json +import threading +import time +from pathlib import Path +from typing import Any + +from .config import ENV_PATH, get_settings +from .db import decode_json, init_db, session +from .models import RawItem + + +sync_lock = threading.Lock() +analysis_lock = threading.Lock() +_background_started = False +_stop_event = threading.Event() + + +STATUS_LABELS = { + "new": "未处理", + "read": "已读", + "needs_reply": "待回复", + "replied": "已回复", + "needs_fix": "待修复", + "archived": "已归档", +} + + +def ensure_database() -> Path: + settings = get_settings() + with session(settings.database_path) as conn: + init_db(conn) + return settings.database_path + + +def start_background_sync() -> None: + global _background_started + if _background_started: + return + _background_started = True + settings = get_settings() + ensure_database() + if not settings.auto_sync_enabled: + return + thread = threading.Thread(target=_sync_loop, name="dashboard-community-monitor-sync", daemon=True) + thread.start() + + +def _sync_loop() -> None: + settings = get_settings() + interval_seconds = max(settings.sync_interval_minutes, 1) * 60 + while not _stop_event.wait(interval_seconds): + if not sync_lock.acquire(blocking=False): + continue + try: + from .sync import run_sync + + with session(settings.database_path) as conn: + run_sync(conn, settings, full=False) + except Exception: + pass + finally: + sync_lock.release() + + +def status() -> dict[str, Any]: + settings = get_settings() + exists = settings.database_path.exists() + ensure_database() + return { + "databasePath": str(settings.database_path), + "databaseExists": exists, + "envPath": str(ENV_PATH), + "envExists": ENV_PATH.exists(), + "autoSyncEnabled": settings.auto_sync_enabled, + "syncIntervalMinutes": settings.sync_interval_minutes, + "twitterEnabled": settings.twitter_enabled, + "openrouterConfigured": bool(settings.openrouter_api_key), + "model": settings.openrouter_model, + } + + +def overview() -> dict[str, Any]: + settings = get_settings() + ensure_database() + with session(settings.database_path) as conn: + metrics = conn.execute( + """ + SELECT + COUNT(*) AS total, + SUM(CASE WHEN w.status = 'new' THEN 1 ELSE 0 END) AS new_count, + SUM(CASE WHEN a.is_negative = 1 THEN 1 ELSE 0 END) AS negative_count, + SUM(CASE WHEN a.has_actionable_feedback = 1 THEN 1 ELSE 0 END) AS actionable_count, + SUM(CASE WHEN a.reply_recommended = 1 THEN 1 ELSE 0 END) AS reply_count, + SUM(CASE WHEN a.priority = 'high' THEN 1 ELSE 0 END) AS high_count, + SUM(CASE WHEN r.analysis_status = 'done' THEN 1 ELSE 0 END) AS analyzed_count, + SUM(CASE WHEN r.analysis_status = 'pending' THEN 1 ELSE 0 END) AS pending_count, + SUM(CASE WHEN r.analysis_status = 'error' THEN 1 ELSE 0 END) AS error_count + FROM raw_items r + LEFT JOIN analysis_results a ON a.raw_item_id = r.id + LEFT JOIN work_items w ON w.raw_item_id = r.id + """ + ).fetchone() + by_source = conn.execute( + """ + SELECT source, COUNT(*) AS count + FROM raw_items + GROUP BY source + ORDER BY count DESC + """ + ).fetchall() + runs = conn.execute( + "SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT 8" + ).fetchall() + return { + "status": status(), + "metrics": _metrics_dict(metrics), + "bySource": [_row_to_dict(row) for row in by_source], + "runs": [_public_run(row) for row in runs], + "statusLabels": STATUS_LABELS, + } + + +def list_items(filters: dict[str, str]) -> dict[str, Any]: + settings = get_settings() + ensure_database() + page = max(_int(filters.get("page"), 1), 1) + page_size = min(max(_int(filters.get("pageSize"), 80), 20), 200) + offset = (page - 1) * page_size + where, params = _where(filters) + with session(settings.database_path) as conn: + total = conn.execute( + f""" + SELECT COUNT(*) AS total + FROM raw_items r + LEFT JOIN analysis_results a ON a.raw_item_id = r.id + LEFT JOIN work_items w ON w.raw_item_id = r.id + {where} + """, + params, + ).fetchone()["total"] + rows = conn.execute( + f""" + SELECT r.*, a.sentiment, a.is_positive, a.is_negative, + a.has_actionable_feedback, a.feedback_types, a.reply_recommended, + a.reply_priority, a.reply_suggestion, a.summary, a.priority, + a.confidence, a.reason, a.analyzed_at, w.status, w.owner, w.notes, + w.last_handled_at, w.updated_at AS work_updated_at + FROM raw_items r + LEFT JOIN analysis_results a ON a.raw_item_id = r.id + LEFT JOIN work_items w ON w.raw_item_id = r.id + {where} + ORDER BY + COALESCE(a.reply_recommended, 0) DESC, + COALESCE(r.published_at, r.collected_at) DESC, + r.collected_at DESC, + r.id DESC + LIMIT ? OFFSET ? + """, + [*params, page_size, offset], + ).fetchall() + return { + "items": [_public_item(row) for row in rows], + "page": page, + "pageSize": page_size, + "total": int(total or 0), + "statusLabels": STATUS_LABELS, + } + + +def trigger_sync(full: bool = False, platforms: list[str] | None = None) -> dict[str, Any]: + ensure_database() + if not sync_lock.acquire(blocking=False): + return {"success": False, "error": "已有同步任务正在运行"} + thread = threading.Thread( + target=_run_sync_background, + args=(full, platforms), + name="community-monitor-manual-sync", + daemon=True, + ) + thread.start() + return {"success": True, "message": "同步已在后台开始"} + + +def trigger_analyze(limit: int = 20) -> dict[str, Any]: + ensure_database() + if not analysis_lock.acquire(blocking=False): + return {"success": False, "error": "已有补跑分析正在运行"} + thread = threading.Thread( + target=_run_analysis_background, + args=(limit,), + name="community-monitor-analysis", + daemon=True, + ) + thread.start() + return {"success": True, "message": f"补跑分析已开始,每批最多 {limit} 条"} + + +def update_work(raw_item_id: int, payload: dict[str, Any]) -> dict[str, Any]: + status_value = str(payload.get("status") or "new") + if status_value not in STATUS_LABELS: + status_value = "new" + owner = str(payload.get("owner") or "") + notes = str(payload.get("notes") or "") + now = int(time.time()) + settings = get_settings() + ensure_database() + with session(settings.database_path) as conn: + conn.execute( + """ + UPDATE work_items + SET status = ?, owner = ?, notes = ?, updated_at = ?, + last_handled_at = CASE WHEN ? != 'new' THEN ? ELSE last_handled_at END + WHERE raw_item_id = ? + """, + (status_value, owner, notes, now, status_value, now, raw_item_id), + ) + return {"success": True} + + +def create_manual_item(payload: dict[str, Any]) -> dict[str, Any]: + source_name = str(payload.get("sourceName") or payload.get("source_name") or "").strip() + source_url = str(payload.get("sourceUrl") or payload.get("source_url") or "").strip() + title = str(payload.get("title") or "").strip() + author_name = str(payload.get("authorName") or payload.get("author_name") or "").strip() + published_at_text = str(payload.get("publishedAtText") or payload.get("published_at_text") or "").strip() + content = str(payload.get("content") or "").strip() + owner = str(payload.get("owner") or "").strip() + notes = str(payload.get("notes") or "").strip() + status_value = str(payload.get("status") or "new") + if status_value not in STATUS_LABELS: + status_value = "new" + if not source_name or not content: + return {"success": False, "error": "来源和正文不能为空"} + + settings = get_settings() + ensure_database() + item = RawItem( + source="manual", + source_item_id=_manual_item_id(source_url, source_name, title, author_name, content), + source_url=source_url, + content_type="manual_note", + author_id=None, + author_name=author_name or source_name, + title=title or f"{source_name} 手动信息", + published_at=None, + published_at_text=published_at_text, + updated_at_source=None, + content=content, + raw={ + "source_name": source_name, + "source_url": source_url, + "title": title, + "author_name": author_name, + "published_at_text": published_at_text, + "manual": True, + }, + ) + analysis_error = "" + now = int(time.time()) + try: + from .openrouter import OpenRouterClient + from .sync import save_analysis, upsert_raw_item + + analyzer = OpenRouterClient(settings) + with session(settings.database_path) as conn: + raw_item_id, inserted = upsert_raw_item(conn, item) + conn.execute( + """ + UPDATE work_items + SET status = ?, owner = ?, notes = ?, updated_at = ?, + last_handled_at = CASE WHEN ? != 'new' THEN ? ELSE last_handled_at END + WHERE raw_item_id = ? + """, + (status_value, owner, notes, now, status_value, now, raw_item_id), + ) + try: + analysis = analyzer.analyze(item) + save_analysis(conn, raw_item_id, settings.openrouter_model, analysis) + except Exception as exc: + analysis_error = str(exc) + conn.execute( + "UPDATE raw_items SET analysis_status = 'error' WHERE id = ?", + (raw_item_id,), + ) + finally: + if "analyzer" in locals(): + analyzer.close() + + return { + "success": True, + "inserted": inserted, + "analysisError": analysis_error, + } + + +def _run_sync_background(full: bool, platforms: list[str] | None) -> None: + settings = get_settings() + try: + from .sync import run_sync + + with session(settings.database_path) as conn: + run_sync(conn, settings, full=full, platforms=platforms) + finally: + sync_lock.release() + + +def _run_analysis_background(limit: int) -> None: + settings = get_settings() + try: + from .sync import analyze_pending + + with session(settings.database_path) as conn: + analyze_pending(conn, settings, limit=limit) + finally: + analysis_lock.release() + + +def _where(filters: dict[str, str]) -> tuple[str, list[Any]]: + where = [] + params: list[Any] = [] + if filters.get("source"): + where.append("r.source = ?") + params.append(filters["source"]) + if filters.get("contentType"): + where.append("r.content_type = ?") + params.append(filters["contentType"]) + if filters.get("sentiment"): + where.append("a.sentiment = ?") + params.append(filters["sentiment"]) + if filters.get("status"): + where.append("w.status = ?") + params.append(filters["status"]) + if filters.get("analysisStatus"): + where.append("r.analysis_status = ?") + params.append(filters["analysisStatus"]) + if filters.get("reply") == "1": + where.append("a.reply_recommended = 1") + if filters.get("actionable") == "1": + where.append("a.has_actionable_feedback = 1") + if filters.get("q"): + where.append("(r.content LIKE ? OR r.title LIKE ? OR a.summary LIKE ? OR r.author_name LIKE ?)") + like = f"%{filters['q']}%" + params.extend([like, like, like, like]) + clause = "WHERE " + " AND ".join(where) if where else "" + return clause, params + + +def _public_item(row: Any) -> dict[str, Any]: + raw = _row_to_dict(row) + raw["feedbackTypes"] = decode_json(raw.pop("feedback_types", None), []) + raw["raw"] = decode_json(raw.pop("raw_json", None), {}) + bool_fields = [ + "is_positive", + "is_negative", + "has_actionable_feedback", + "reply_recommended", + ] + for key in bool_fields: + raw[key] = bool(raw.get(key)) + return raw + + +def _public_run(row: Any) -> dict[str, Any]: + data = _row_to_dict(row) + data["stats"] = decode_json(data.pop("stats_json", None), {}) + return data + + +def _row_to_dict(row: Any) -> dict[str, Any]: + if row is None: + return {} + return {key: row[key] for key in row.keys()} + + +def _metrics_dict(row: Any) -> dict[str, int]: + return {key: int(value or 0) for key, value in _row_to_dict(row).items()} + + +def _int(value: Any, default: int) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _manual_item_id(source_url: str, source_name: str, title: str, author_name: str, content: str) -> str: + seed = source_url.strip() or "\n".join( + [source_name.strip(), title.strip(), author_name.strip(), content.strip()] + ) + return sha1(seed.encode("utf-8", errors="ignore")).hexdigest() diff --git a/Tools/Dashboard/community_monitor/config.py b/Tools/Dashboard/community_monitor/config.py new file mode 100644 index 000000000..997f23ef4 --- /dev/null +++ b/Tools/Dashboard/community_monitor/config.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +import os + + +ROOT_DIR = Path(__file__).resolve().parent.parent +ENV_PATH = ROOT_DIR / "private" / "community_monitor.env" + + +def _load_env_file(path: Path) -> None: + if not path.exists(): + return + for raw_line in path.read_text(encoding="utf-8-sig").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value + + +_load_env_file(ENV_PATH) + + +def _int_env(name: str, default: int) -> int: + value = os.getenv(name) + if not value: + return default + return int(value) + + +def _bool_env(name: str, default: bool) -> bool: + value = os.getenv(name) + if value is None: + return default + return value.strip().lower() in {"1", "true", "yes", "on"} + + +@dataclass(frozen=True) +class Settings: + app_id: str + product_name: str + database_path: Path + sync_interval_minutes: int + auto_sync_enabled: bool + twitter_enabled: bool + twitter_username: str + twitter_scraper_path: Path + twitter_output_dir: Path + twitter_browser_provider: str + twitter_full_max_no_new: int + twitter_incremental_max_no_new: int + twitter_thread_max_no_new: int + twitter_command_timeout_seconds: int + twitter_full_reply_post_limit: int + twitter_incremental_reply_parent_limit: int + discussion_full_scan_max_pages: int + discussion_incremental_max_pages: int + full_scan_time_limit_seconds: int + openrouter_api_key: str | None + openrouter_model: str + openrouter_referer: str + openrouter_title: str + + +def get_settings() -> Settings: + database_path = Path(os.getenv("DATABASE_PATH", "data/community_monitor/tohotopia_monitor.sqlite3")) + if not database_path.is_absolute(): + database_path = ROOT_DIR / database_path + twitter_scraper_path = Path( + os.getenv( + "TWITTER_SCRAPER_PATH", + str(Path.home() / ".codex" / "skills" / "social-media-scraper" / "scraper.py"), + ) + ) + if not twitter_scraper_path.is_absolute(): + twitter_scraper_path = ROOT_DIR / twitter_scraper_path + twitter_output_dir = Path(os.getenv("TWITTER_OUTPUT_DIR", "任务/社媒数据/twitter-monitor")) + if not twitter_output_dir.is_absolute(): + twitter_output_dir = ROOT_DIR / twitter_output_dir + return Settings( + app_id=os.getenv("APP_ID", "3774440"), + product_name=os.getenv("PRODUCT_NAME", "帝国幻想乡~TOHOTOPIA"), + database_path=database_path, + sync_interval_minutes=_int_env("SYNC_INTERVAL_MINUTES", 30), + auto_sync_enabled=_bool_env("AUTO_SYNC_ENABLED", True), + twitter_enabled=_bool_env("TWITTER_ENABLED", False), + twitter_username=os.getenv("TWITTER_USERNAME", "Tohotopia"), + twitter_scraper_path=twitter_scraper_path, + twitter_output_dir=twitter_output_dir, + twitter_browser_provider=os.getenv("TWITTER_BROWSER_PROVIDER", "existing"), + twitter_full_max_no_new=_int_env("TWITTER_FULL_MAX_NO_NEW", 6), + twitter_incremental_max_no_new=_int_env("TWITTER_INCREMENTAL_MAX_NO_NEW", 2), + twitter_thread_max_no_new=_int_env("TWITTER_THREAD_MAX_NO_NEW", 3), + twitter_command_timeout_seconds=_int_env("TWITTER_COMMAND_TIMEOUT_SECONDS", 900), + twitter_full_reply_post_limit=_int_env("TWITTER_FULL_REPLY_POST_LIMIT", 0), + twitter_incremental_reply_parent_limit=_int_env("TWITTER_INCREMENTAL_REPLY_PARENT_LIMIT", 20), + discussion_full_scan_max_pages=_int_env("DISCUSSION_FULL_SCAN_MAX_PAGES", 500), + discussion_incremental_max_pages=_int_env("DISCUSSION_INCREMENTAL_MAX_PAGES", 5), + full_scan_time_limit_seconds=_int_env("FULL_SCAN_TIME_LIMIT_SECONDS", 7200), + openrouter_api_key=os.getenv("OPENROUTER_API_KEY"), + openrouter_model=os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-v4-pro"), + openrouter_referer=os.getenv("OPENROUTER_REFERER", "http://localhost:8080"), + openrouter_title=os.getenv("OPENROUTER_TITLE", "TH1 Dashboard Community Monitor"), + ) diff --git a/Tools/Dashboard/community_monitor/db.py b/Tools/Dashboard/community_monitor/db.py new file mode 100644 index 000000000..9d318c958 --- /dev/null +++ b/Tools/Dashboard/community_monitor/db.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from contextlib import contextmanager +from pathlib import Path +import json +import sqlite3 +from typing import Any, Iterator + + +def connect(database_path: Path) -> sqlite3.Connection: + database_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(database_path) + conn.row_factory = sqlite3.Row + # Keep community monitor data in the main sqlite file while it is git-backed. + conn.execute("PRAGMA journal_mode=DELETE") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +@contextmanager +def session(database_path: Path) -> Iterator[sqlite3.Connection]: + conn = connect(database_path) + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def init_db(conn: sqlite3.Connection) -> None: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS raw_items ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT NOT NULL, + source_item_id TEXT NOT NULL, + source_url TEXT NOT NULL, + content_type TEXT NOT NULL, + author_id TEXT, + author_name TEXT, + title TEXT, + published_at INTEGER, + published_at_text TEXT, + collected_at INTEGER NOT NULL, + updated_at_source INTEGER, + content TEXT NOT NULL, + raw_json TEXT NOT NULL, + content_hash TEXT NOT NULL, + analysis_status TEXT NOT NULL DEFAULT 'pending', + UNIQUE(source, source_item_id) + ); + + CREATE TABLE IF NOT EXISTS analysis_results ( + raw_item_id INTEGER PRIMARY KEY, + model TEXT NOT NULL, + sentiment TEXT NOT NULL, + is_positive INTEGER NOT NULL, + is_negative INTEGER NOT NULL, + has_actionable_feedback INTEGER NOT NULL, + feedback_types TEXT NOT NULL, + reply_recommended INTEGER NOT NULL, + reply_priority TEXT NOT NULL, + reply_suggestion TEXT NOT NULL, + summary TEXT NOT NULL, + priority TEXT NOT NULL, + confidence REAL NOT NULL, + reason TEXT NOT NULL, + model_json TEXT NOT NULL, + analyzed_at INTEGER NOT NULL, + FOREIGN KEY(raw_item_id) REFERENCES raw_items(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS work_items ( + raw_item_id INTEGER PRIMARY KEY, + status TEXT NOT NULL DEFAULT 'new', + owner TEXT NOT NULL DEFAULT '', + notes TEXT NOT NULL DEFAULT '', + last_handled_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(raw_item_id) REFERENCES raw_items(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS sync_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS sync_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at INTEGER NOT NULL, + finished_at INTEGER, + mode TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT NOT NULL DEFAULT '', + stats_json TEXT NOT NULL DEFAULT '{}' + ); + + CREATE INDEX IF NOT EXISTS idx_raw_items_collected_at ON raw_items(collected_at DESC); + CREATE INDEX IF NOT EXISTS idx_raw_items_content_type ON raw_items(content_type); + CREATE INDEX IF NOT EXISTS idx_raw_items_analysis_status ON raw_items(analysis_status); + CREATE INDEX IF NOT EXISTS idx_work_items_status ON work_items(status); + """ + ) + + +def encode_json(value: Any) -> str: + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +def decode_json(value: str | None, default: Any = None) -> Any: + if value is None: + return default + try: + return json.loads(value) + except json.JSONDecodeError: + return default diff --git a/Tools/Dashboard/community_monitor/models.py b/Tools/Dashboard/community_monitor/models.py new file mode 100644 index 000000000..b8a472b78 --- /dev/null +++ b/Tools/Dashboard/community_monitor/models.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class RawItem: + source: str + source_item_id: str + source_url: str + content_type: str + author_id: str | None + author_name: str | None + title: str | None + published_at: int | None + published_at_text: str | None + updated_at_source: int | None + content: str + raw: dict[str, Any] diff --git a/Tools/Dashboard/community_monitor/openrouter.py b/Tools/Dashboard/community_monitor/openrouter.py new file mode 100644 index 000000000..8015eea1b --- /dev/null +++ b/Tools/Dashboard/community_monitor/openrouter.py @@ -0,0 +1,238 @@ +from __future__ import annotations + +import json +import re +from typing import Any + +import httpx + +from .config import Settings +from .models import RawItem + + +DEFAULT_ANALYSIS = { + "sentiment": "neutral", + "is_positive": False, + "is_negative": False, + "has_actionable_feedback": False, + "feedback_types": [], + "reply_recommended": False, + "reply_priority": "none", + "reply_suggestion": "", + "summary": "", + "priority": "low", + "confidence": 0.0, + "reason": "", +} + + +TRANSLATION_SCHEMA = { + "type": "object", + "properties": { + "translated_content": {"type": "string"}, + }, + "required": ["translated_content"], + "additionalProperties": False, +} + + +SCHEMA = { + "type": "object", + "properties": { + "sentiment": {"type": "string", "enum": ["positive", "negative", "mixed", "neutral"]}, + "is_positive": {"type": "boolean"}, + "is_negative": {"type": "boolean"}, + "has_actionable_feedback": {"type": "boolean"}, + "feedback_types": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "bug", + "suggestion", + "balance", + "ui", + "localization", + "performance", + "pricing", + "content", + "question", + "other", + ], + }, + }, + "reply_recommended": {"type": "boolean"}, + "reply_priority": {"type": "string", "enum": ["none", "low", "medium", "high"]}, + "reply_suggestion": {"type": "string"}, + "summary": {"type": "string"}, + "priority": {"type": "string", "enum": ["low", "medium", "high"]}, + "confidence": {"type": "number", "minimum": 0, "maximum": 1}, + "reason": {"type": "string"}, + }, + "required": [ + "sentiment", + "is_positive", + "is_negative", + "has_actionable_feedback", + "feedback_types", + "reply_recommended", + "reply_priority", + "reply_suggestion", + "summary", + "priority", + "confidence", + "reason", + ], + "additionalProperties": False, +} + + +class OpenRouterClient: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self.enabled = bool(settings.openrouter_api_key) + self.client = httpx.Client(timeout=60) + + def close(self) -> None: + self.client.close() + + def analyze(self, item: RawItem) -> dict[str, Any]: + if not self.enabled: + raise MissingOpenRouterKey("OPENROUTER_API_KEY is not configured") + + payload = { + "model": self.settings.openrouter_model, + "messages": [ + { + "role": "system", + "content": ( + "你是独立游戏《帝国幻想乡~TOHOTOPIA》的社区运营助手。" + "请判断 Steam、Twitter/X 等社区内容的情绪、是否包含具体可处理反馈、" + "以及是否建议制作人回复。summary、reason、reply_suggestion 必须使用中文。" + "只输出符合 JSON Schema 的 JSON。" + ), + }, + { + "role": "user", + "content": self._prompt(item), + }, + ], + "temperature": 0.1, + "response_format": { + "type": "json_schema", + "json_schema": { + "name": "community_item_analysis", + "strict": True, + "schema": SCHEMA, + }, + }, + } + headers = { + "Authorization": f"Bearer {self.settings.openrouter_api_key}", + "HTTP-Referer": self.settings.openrouter_referer, + "X-Title": self.settings.openrouter_title, + } + response = self.client.post( + "https://openrouter.ai/api/v1/chat/completions", + headers=headers, + json=payload, + ) + response.raise_for_status() + data = response.json() + content = data["choices"][0]["message"]["content"] + parsed = self._parse_json(content) + return self._normalize(parsed) + + def translate_to_chinese(self, content: str) -> str: + if not self.enabled: + raise MissingOpenRouterKey("OPENROUTER_API_KEY is not configured") + + payload = { + "model": self.settings.openrouter_model, + "messages": [ + { + "role": "system", + "content": ( + "你是独立游戏社区运营翻译助手。" + "把用户提供的社区内容准确翻译成简体中文,保留原意、语气、问题细节、游戏术语、链接和编号。" + "不要添加解释。只输出符合 JSON Schema 的 JSON。" + ), + }, + { + "role": "user", + "content": content[:6000], + }, + ], + "temperature": 0, + "response_format": { + "type": "json_schema", + "json_schema": { + "name": "manual_item_translation", + "strict": True, + "schema": TRANSLATION_SCHEMA, + }, + }, + } + headers = { + "Authorization": f"Bearer {self.settings.openrouter_api_key}", + "HTTP-Referer": self.settings.openrouter_referer, + "X-Title": self.settings.openrouter_title, + } + response = self.client.post( + "https://openrouter.ai/api/v1/chat/completions", + headers=headers, + json=payload, + ) + response.raise_for_status() + data = response.json() + parsed = self._parse_json(data["choices"][0]["message"]["content"]) + translated = str(parsed.get("translated_content") or "").strip() + return translated or content + + def _prompt(self, item: RawItem) -> str: + metadata = { + "source": item.source, + "content_type": item.content_type, + "source_url": item.source_url, + "author": item.author_name, + "title": item.title, + "steam_review_voted_up": item.raw.get("voted_up"), + "language": item.raw.get("language"), + "in_reply_to": item.raw.get("parent_url") or item.raw.get("in_reply_to"), + "likes": item.raw.get("likes"), + "replies": item.raw.get("replies"), + "retweets": item.raw.get("retweets"), + "views": item.raw.get("views"), + } + return ( + "请分析以下社区内容。\n\n" + f"元数据:{json.dumps(metadata, ensure_ascii=False)}\n\n" + f"正文:\n{item.content[:6000]}" + ) + + def _parse_json(self, content: str) -> dict[str, Any]: + try: + return json.loads(content) + except json.JSONDecodeError: + match = re.search(r"\{.*\}", content, re.S) + if not match: + raise + return json.loads(match.group(0)) + + def _normalize(self, value: dict[str, Any]) -> dict[str, Any]: + result = dict(DEFAULT_ANALYSIS) + result.update(value) + result["feedback_types"] = list(result.get("feedback_types") or []) + result["is_positive"] = bool(result.get("is_positive")) + result["is_negative"] = bool(result.get("is_negative")) + result["has_actionable_feedback"] = bool(result.get("has_actionable_feedback")) + result["reply_recommended"] = bool(result.get("reply_recommended")) + try: + result["confidence"] = float(result.get("confidence", 0.0)) + except (TypeError, ValueError): + result["confidence"] = 0.0 + return result + + +class MissingOpenRouterKey(RuntimeError): + pass diff --git a/Tools/Dashboard/community_monitor/steam.py b/Tools/Dashboard/community_monitor/steam.py new file mode 100644 index 000000000..e6f41b504 --- /dev/null +++ b/Tools/Dashboard/community_monitor/steam.py @@ -0,0 +1,321 @@ +from __future__ import annotations + +from hashlib import sha1 +import re +import time +from typing import Any, Iterable +from urllib.parse import parse_qs, quote, urljoin, urlparse + +from bs4 import BeautifulSoup +import httpx + +from .models import RawItem + + +STEAM_STORE = "https://store.steampowered.com" +STEAM_COMMUNITY = "https://steamcommunity.com" + + +HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/125.0 Safari/537.36", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,ja;q=0.7", +} + + +def content_hash(text: str) -> str: + return sha1(text.encode("utf-8", errors="ignore")).hexdigest() + + +def _text(node: Any) -> str: + return node.get_text(separator="\n", strip=True) if node else "" + + +def _abs_url(url: str) -> str: + return urljoin(STEAM_COMMUNITY, url) + + +def _topic_id_from_url(url: str) -> str: + match = re.search(r"/discussions/[^/]+/(\d+)", url) + if match: + return match.group(1) + return content_hash(url) + + +def _reply_id(comment: Any, topic_id: str, author: str, timestamp: str, text: str) -> str: + node_id = comment.get("id", "") + if node_id: + return node_id + data_id = comment.get("data-commentid", "") + if data_id: + return data_id + return f"{topic_id}:{content_hash(author + timestamp + text)}" + + +def parse_steam_time(text: str | None, now: int | None = None) -> int | None: + if not text: + return None + value = text.strip() + now_ts = now or int(time.time()) + relative = re.match(r"^(\d+)\s*(分钟|小时|天|minute|minutes|hour|hours|day|days)\s*(以前|ago)?$", value, re.I) + if relative: + amount = int(relative.group(1)) + unit = relative.group(2).lower() + seconds = { + "分钟": 60, + "minute": 60, + "minutes": 60, + "小时": 3600, + "hour": 3600, + "hours": 3600, + "天": 86400, + "day": 86400, + "days": 86400, + }[unit] + return now_ts - amount * seconds + + absolute = re.match( + r"^(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*(上午|下午)\s*(\d{1,2}):(\d{2})$", + value, + ) + if absolute: + current = time.localtime(now_ts) + return _make_ts( + current.tm_year, + int(absolute.group(1)), + int(absolute.group(2)), + absolute.group(3), + int(absolute.group(4)), + int(absolute.group(5)), + ) + + absolute_with_year = re.match( + r"^(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*(上午|下午)\s*(\d{1,2}):(\d{2})$", + value, + ) + if absolute_with_year: + return _make_ts( + int(absolute_with_year.group(1)), + int(absolute_with_year.group(2)), + int(absolute_with_year.group(3)), + absolute_with_year.group(4), + int(absolute_with_year.group(5)), + int(absolute_with_year.group(6)), + ) + return None + + +def _make_ts(year: int, month: int, day: int, ampm: str, hour: int, minute: int) -> int: + if ampm == "下午" and hour != 12: + hour += 12 + if ampm == "上午" and hour == 12: + hour = 0 + return int(time.mktime((year, month, day, hour, minute, 0, -1, -1, -1))) + + +class SteamClient: + def __init__(self, app_id: str) -> None: + self.app_id = app_id + self.client = httpx.Client(headers=HEADERS, timeout=30, follow_redirects=True) + self.client.cookies.set("birthtime", "568022401", domain="steamcommunity.com") + + def close(self) -> None: + self.client.close() + + def fetch_reviews(self, max_pages: int | None = None) -> list[RawItem]: + cursor = "*" + page = 0 + items: list[RawItem] = [] + while True: + params = { + "json": "1", + "num_per_page": "100", + "language": "all", + "filter": "recent", + "purchase_type": "all", + "cursor": cursor, + } + response = self.client.get(f"{STEAM_STORE}/appreviews/{self.app_id}", params=params) + response.raise_for_status() + data = response.json() + reviews = data.get("reviews") or [] + if not reviews: + break + for review in reviews: + items.append(self._review_to_item(review)) + new_cursor = data.get("cursor") or cursor + page += 1 + if new_cursor == cursor: + break + if max_pages and page >= max_pages: + break + cursor = new_cursor + time.sleep(0.25) + return items + + def fetch_discussions(self, full: bool, max_pages: int, time_limit_seconds: int) -> list[RawItem]: + started = time.monotonic() + topic_urls: list[str] = [] + seen_urls: set[str] = set() + for page in range(1, max_pages + 1): + if time.monotonic() - started > time_limit_seconds: + break + url = f"{STEAM_COMMUNITY}/app/{self.app_id}/discussions/" + if page > 1: + url = f"{url}?fp={page}" + html = self._get_text(url) + urls = self._extract_topic_urls(html) + new_urls = [u for u in urls if u not in seen_urls] + if not new_urls: + break + topic_urls.extend(new_urls) + seen_urls.update(new_urls) + if not full and page >= max_pages: + break + time.sleep(0.25) + + items: list[RawItem] = [] + for url in topic_urls: + if time.monotonic() - started > time_limit_seconds: + break + items.extend(self.fetch_discussion_topic(url)) + time.sleep(0.35) + return items + + def fetch_discussion_topic(self, url: str) -> list[RawItem]: + html = self._get_text(url) + soup = BeautifulSoup(html, "html.parser") + topic_id = _topic_id_from_url(url) + title = _text(soup.select_one("div.topic")) or _text(soup.select_one(".forum_topic_name")) + items: list[RawItem] = [] + + op = soup.select_one(".forum_op") + if op: + author_el = op.select_one(".authorline a") + date_el = op.select_one(".date") + date_text = _text(date_el) + content_el = op.select_one(".content") + author = _text(author_el) + content = _text(content_el) + source_url = url + if content: + items.append( + RawItem( + source="steam_discussions", + source_item_id=f"topic:{topic_id}", + source_url=source_url, + content_type="discussion_topic", + author_id=self._steam_id_from_author(author_el), + author_name=author, + title=title, + published_at=parse_steam_time(date_text), + published_at_text=date_text, + updated_at_source=None, + content=content, + raw={ + "topic_id": topic_id, + "topic_url": url, + "title": title, + "author": author, + "date": date_text, + "content": content, + }, + ) + ) + + for comment in soup.select(".commentthread_comment"): + author_el = comment.select_one(".commentthread_author_link") + date_el = comment.select_one(".commentthread_comment_timestamp") + text_el = comment.select_one(".commentthread_comment_text") + text = _text(text_el) + if not text: + continue + author = _text(author_el) + timestamp = _text(date_el) + reply_id = _reply_id(comment, topic_id, author, timestamp, text) + reply_url = f"{url}#{reply_id}" if reply_id else url + items.append( + RawItem( + source="steam_discussions", + source_item_id=f"reply:{topic_id}:{reply_id}", + source_url=reply_url, + content_type="discussion_reply", + author_id=self._steam_id_from_author(author_el), + author_name=author, + title=title, + published_at=parse_steam_time(timestamp), + published_at_text=timestamp, + updated_at_source=None, + content=text, + raw={ + "topic_id": topic_id, + "topic_url": url, + "reply_id": reply_id, + "reply_url": reply_url, + "title": title, + "reply_author": author, + "reply_time_text": timestamp, + "reply_content": text, + }, + ) + ) + return items + + def _review_to_item(self, review: dict[str, Any]) -> RawItem: + author = review.get("author") or {} + steam_id = str(author.get("steamid") or "") + recommendation_id = str(review.get("recommendationid")) + source_url = f"{STEAM_COMMUNITY}/profiles/{steam_id}/recommended/{self.app_id}/" + raw = dict(review) + raw["source_url"] = source_url + return RawItem( + source="steam_reviews", + source_item_id=f"review:{recommendation_id}", + source_url=source_url, + content_type="review", + author_id=steam_id or None, + author_name=author.get("personaname"), + title=None, + published_at=review.get("timestamp_created"), + published_at_text=None, + updated_at_source=review.get("timestamp_updated"), + content=review.get("review") or "", + raw=raw, + ) + + def _get_text(self, url: str) -> str: + response = self.client.get(url) + response.raise_for_status() + response.encoding = "utf-8" + return response.text + + def _extract_topic_urls(self, html: str) -> list[str]: + soup = BeautifulSoup(html, "html.parser") + urls: list[str] = [] + for link in soup.select("a.forum_topic_overlay, a.forum_topic_name"): + href = link.get("href") + if not href: + continue + url = _abs_url(href).split("?")[0] + if f"/app/{self.app_id}/discussions/" in url and url not in urls: + urls.append(url) + return urls + + def _steam_id_from_author(self, author_el: Any) -> str | None: + if not author_el: + return None + href = author_el.get("href") or "" + parsed = urlparse(href) + if "/profiles/" in parsed.path: + return parsed.path.rstrip("/").split("/")[-1] + if "/id/" in parsed.path: + return parsed.path.rstrip("/").split("/")[-1] + query = parse_qs(parsed.query) + steam_id = query.get("steamid") + return steam_id[0] if steam_id else None + + +def iter_nonempty(items: Iterable[RawItem]) -> Iterable[RawItem]: + for item in items: + if item.content.strip(): + yield item diff --git a/Tools/Dashboard/community_monitor/sync.py b/Tools/Dashboard/community_monitor/sync.py new file mode 100644 index 000000000..d7fe9bd6b --- /dev/null +++ b/Tools/Dashboard/community_monitor/sync.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +from collections import Counter +from hashlib import sha1 +import sqlite3 +import time +from typing import Any + +from .config import Settings +from .db import decode_json, encode_json, init_db +from .models import RawItem +from .openrouter import OpenRouterClient +from .steam import SteamClient, iter_nonempty +from .twitter import TwitterClient, TwitterScrapeOptions + + +def _now() -> int: + return int(time.time()) + + +def _hash(text: str) -> str: + return sha1(text.encode("utf-8", errors="ignore")).hexdigest() + + +def upsert_raw_item(conn: sqlite3.Connection, item: RawItem) -> tuple[int, bool]: + now = _now() + item_hash = _hash(item.content) + existing = conn.execute( + "SELECT id, content_hash FROM raw_items WHERE source = ? AND source_item_id = ?", + (item.source, item.source_item_id), + ).fetchone() + if existing: + if existing["content_hash"] != item_hash: + conn.execute( + """ + UPDATE raw_items + SET source_url = ?, author_id = ?, author_name = ?, title = ?, + published_at = ?, published_at_text = ?, updated_at_source = ?, + content = ?, raw_json = ?, content_hash = ?, analysis_status = 'pending', + collected_at = ? + WHERE id = ? + """, + ( + item.source_url, + item.author_id, + item.author_name, + item.title, + item.published_at, + item.published_at_text, + item.updated_at_source, + item.content, + encode_json(item.raw), + item_hash, + now, + existing["id"], + ), + ) + return int(existing["id"]), False + + cursor = conn.execute( + """ + INSERT INTO raw_items ( + source, source_item_id, source_url, content_type, author_id, author_name, + title, published_at, published_at_text, collected_at, updated_at_source, + content, raw_json, content_hash, analysis_status + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending') + """, + ( + item.source, + item.source_item_id, + item.source_url, + item.content_type, + item.author_id, + item.author_name, + item.title, + item.published_at, + item.published_at_text, + now, + item.updated_at_source, + item.content, + encode_json(item.raw), + item_hash, + ), + ) + raw_item_id = int(cursor.lastrowid) + conn.execute( + """ + INSERT INTO work_items (raw_item_id, status, owner, notes, created_at, updated_at) + VALUES (?, 'new', '', '', ?, ?) + """, + (raw_item_id, now, now), + ) + return raw_item_id, True + + +def save_analysis( + conn: sqlite3.Connection, + raw_item_id: int, + model: str, + analysis: dict[str, Any], +) -> None: + now = _now() + conn.execute( + """ + INSERT INTO analysis_results ( + raw_item_id, model, sentiment, is_positive, is_negative, + has_actionable_feedback, feedback_types, reply_recommended, reply_priority, + reply_suggestion, summary, priority, confidence, reason, model_json, analyzed_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(raw_item_id) DO UPDATE SET + model = excluded.model, + sentiment = excluded.sentiment, + is_positive = excluded.is_positive, + is_negative = excluded.is_negative, + has_actionable_feedback = excluded.has_actionable_feedback, + feedback_types = excluded.feedback_types, + reply_recommended = excluded.reply_recommended, + reply_priority = excluded.reply_priority, + reply_suggestion = excluded.reply_suggestion, + summary = excluded.summary, + priority = excluded.priority, + confidence = excluded.confidence, + reason = excluded.reason, + model_json = excluded.model_json, + analyzed_at = excluded.analyzed_at + """, + ( + raw_item_id, + model, + analysis["sentiment"], + int(analysis["is_positive"]), + int(analysis["is_negative"]), + int(analysis["has_actionable_feedback"]), + encode_json(analysis["feedback_types"]), + int(analysis["reply_recommended"]), + analysis["reply_priority"], + analysis["reply_suggestion"], + analysis["summary"], + analysis["priority"], + analysis["confidence"], + analysis["reason"], + encode_json(analysis), + now, + ), + ) + conn.execute("UPDATE raw_items SET analysis_status = 'done' WHERE id = ?", (raw_item_id,)) + + +def _twitter_high_watermark_ts(conn: sqlite3.Connection) -> int | None: + row = conn.execute( + """ + SELECT MAX(COALESCE(published_at, collected_at)) AS watermark + FROM raw_items + WHERE source IN ('twitter_posts', 'twitter_replies') + """ + ).fetchone() + if row and row["watermark"]: + return int(row["watermark"]) + return None + + +def _recent_twitter_post_urls(conn: sqlite3.Connection, limit: int) -> list[str]: + if limit <= 0: + return [] + rows = conn.execute( + """ + SELECT source_url + FROM raw_items + WHERE source = 'twitter_posts' + ORDER BY COALESCE(published_at, collected_at) DESC, collected_at DESC + LIMIT ? + """, + (limit,), + ).fetchall() + return [str(row["source_url"]) for row in rows if row["source_url"]] + + +def _twitter_options(settings: Settings) -> TwitterScrapeOptions: + return TwitterScrapeOptions( + username=settings.twitter_username, + scraper_path=settings.twitter_scraper_path, + output_dir=settings.twitter_output_dir, + browser_provider=settings.twitter_browser_provider, + full_max_no_new=settings.twitter_full_max_no_new, + incremental_max_no_new=settings.twitter_incremental_max_no_new, + thread_max_no_new=settings.twitter_thread_max_no_new, + command_timeout_seconds=settings.twitter_command_timeout_seconds, + full_reply_post_limit=settings.twitter_full_reply_post_limit, + incremental_reply_parent_limit=settings.twitter_incremental_reply_parent_limit, + ) + + +def run_sync( + conn: sqlite3.Connection, + settings: Settings, + full: bool = False, + platforms: list[str] | None = None, +) -> dict[str, Any]: + init_db(conn) + started = _now() + mode = "full" if full else "incremental" + run_id = conn.execute( + "INSERT INTO sync_runs (started_at, mode, status) VALUES (?, ?, 'running')", + (started, mode), + ).lastrowid + conn.commit() + + stats: Counter[str] = Counter() + messages: list[str] = [] + try: + enabled_platforms = platforms or ["steam", "twitter"] + if "twitter" in enabled_platforms and not settings.twitter_enabled: + stats["twitter_skipped"] += 1 + raw_items: list[RawItem] = [] + if "steam" in enabled_platforms: + steam = SteamClient(settings.app_id) + try: + review_pages = None if full else 2 + review_items = steam.fetch_reviews(max_pages=review_pages) + discussion_pages = ( + settings.discussion_full_scan_max_pages + if full + else settings.discussion_incremental_max_pages + ) + discussion_items = steam.fetch_discussions( + full=full, + max_pages=discussion_pages, + time_limit_seconds=settings.full_scan_time_limit_seconds, + ) + steam_items = list(iter_nonempty([*review_items, *discussion_items])) + raw_items.extend(steam_items) + stats["steam_fetched"] = len(steam_items) + finally: + steam.close() + + if "twitter" in enabled_platforms and settings.twitter_enabled: + try: + since_ts = None if full else _twitter_high_watermark_ts(conn) + existing_urls = _recent_twitter_post_urls( + conn, + settings.twitter_incremental_reply_parent_limit, + ) + twitter = TwitterClient(_twitter_options(settings)) + twitter_items = twitter.fetch_items( + full=full, + since_ts=since_ts, + existing_post_urls=existing_urls, + ) + raw_items.extend(twitter_items) + stats["twitter_fetched"] = len(twitter_items) + except Exception as exc: # noqa: BLE001 - keep Steam and old Twitter data intact + stats["twitter_errors"] += 1 + stats[f"twitter_error:{type(exc).__name__}"] += 1 + messages.append(f"twitter: {exc}") + + stats["fetched"] = len(raw_items) + analyzer = OpenRouterClient(settings) + try: + for item in raw_items: + raw_item_id, inserted = upsert_raw_item(conn, item) + prefix = item.source.split("_", 1)[0] + stats["inserted" if inserted else "seen"] += 1 + stats[f"{prefix}_{'inserted' if inserted else 'seen'}"] += 1 + if inserted: + try: + analysis = analyzer.analyze(item) + save_analysis(conn, raw_item_id, settings.openrouter_model, analysis) + stats["analyzed"] += 1 + except Exception as exc: # noqa: BLE001 - keep item pending for retry + conn.execute( + "UPDATE raw_items SET analysis_status = 'error' WHERE id = ?", + (raw_item_id,), + ) + stats["analysis_errors"] += 1 + stats[f"analysis_error:{type(exc).__name__}"] += 1 + conn.commit() + finally: + analyzer.close() + + finished = _now() + status = "partial" if messages else "success" + conn.execute( + """ + UPDATE sync_runs + SET finished_at = ?, status = ?, message = ?, stats_json = ? + WHERE id = ? + """, + (finished, status, "\n".join(messages), encode_json(dict(stats)), run_id), + ) + if status == "success": + conn.execute( + """ + INSERT INTO sync_state (key, value, updated_at) + VALUES ('last_sync_mode', ?, ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at + """, + (mode, finished), + ) + return dict(stats) + except Exception as exc: + finished = _now() + conn.execute( + """ + UPDATE sync_runs + SET finished_at = ?, status = 'failed', message = ?, stats_json = ? + WHERE id = ? + """, + (finished, str(exc), encode_json(dict(stats)), run_id), + ) + raise + + +def analyze_pending( + conn: sqlite3.Connection, + settings: Settings, + limit: int = 50, + since_ts: int | None = None, +) -> dict[str, Any]: + init_db(conn) + analyzer = OpenRouterClient(settings) + stats: Counter[str] = Counter() + try: + params: list[Any] = [] + since_clause = "" + if since_ts is not None: + since_clause = "AND COALESCE(published_at, collected_at) >= ?" + params.append(since_ts) + params.append(limit) + rows = conn.execute( + f""" + SELECT * FROM raw_items + WHERE analysis_status IN ('pending', 'error') + {since_clause} + ORDER BY COALESCE(published_at, collected_at) DESC, collected_at DESC, id DESC + LIMIT ? + """, + params, + ).fetchall() + for row in rows: + item = RawItem( + source=row["source"], + source_item_id=row["source_item_id"], + source_url=row["source_url"], + content_type=row["content_type"], + author_id=row["author_id"], + author_name=row["author_name"], + title=row["title"], + published_at=row["published_at"], + published_at_text=row["published_at_text"], + updated_at_source=row["updated_at_source"], + content=row["content"], + raw=decode_json(row["raw_json"], {}), + ) + try: + analysis = analyzer.analyze(item) + save_analysis(conn, int(row["id"]), settings.openrouter_model, analysis) + stats["analyzed"] += 1 + conn.commit() + except Exception as exc: # noqa: BLE001 + stats["analysis_errors"] += 1 + stats[f"analysis_error:{type(exc).__name__}"] += 1 + return dict(stats) + finally: + analyzer.close() diff --git a/Tools/Dashboard/community_monitor/twitter.py b/Tools/Dashboard/community_monitor/twitter.py new file mode 100644 index 000000000..729b6a990 --- /dev/null +++ b/Tools/Dashboard/community_monitor/twitter.py @@ -0,0 +1,246 @@ +from __future__ import annotations + +from dataclasses import dataclass +import calendar +import json +from pathlib import Path +import re +import subprocess +import sys +import time +from typing import Any, Iterable + +from .models import RawItem + + +TWITTER_EPOCH_FORMAT = "%a %b %d %H:%M:%S +0000 %Y" +NORMALIZED_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + + +@dataclass(frozen=True) +class TwitterScrapeOptions: + username: str + scraper_path: Path + output_dir: Path + browser_provider: str + full_max_no_new: int + incremental_max_no_new: int + thread_max_no_new: int + command_timeout_seconds: int + full_reply_post_limit: int + incremental_reply_parent_limit: int + + +def parse_twitter_time(value: str | None) -> int | None: + if not value: + return None + text = value.strip() + for fmt in (NORMALIZED_DATE_FORMAT, TWITTER_EPOCH_FORMAT): + try: + parsed = time.strptime(text, fmt) + return calendar.timegm(parsed) + except ValueError: + continue + return None + + +def _author_from_url(url: str | None) -> str | None: + if not url: + return None + match = re.search(r"(?:x\.com|twitter\.com)/([^/?#]+)/status/\d+", url) + if not match: + return None + value = match.group(1) + return value if value and value.lower() != "i" else None + + +def _tweet_id_from_item(item: dict[str, Any]) -> str | None: + value = item.get("id") + if value: + return str(value) + url = str(item.get("url") or "") + match = re.search(r"/status/(\d+)", url) + return match.group(1) if match else None + + +def _tweet_url(username: str, tweet_id: str) -> str: + return f"https://x.com/{username}/status/{tweet_id}" + + +def _is_original_post(item: dict[str, Any]) -> bool: + return not bool(item.get("is_retweet")) + + +class TwitterClient: + def __init__(self, options: TwitterScrapeOptions) -> None: + self.options = options + + def fetch_items( + self, + *, + full: bool, + since_ts: int | None, + existing_post_urls: Iterable[str] = (), + ) -> list[RawItem]: + run_dir = self._new_run_dir() + timeline = self._fetch_timeline(run_dir, full=full) + timeline_items = [ + self._post_to_item(item) + for item in timeline + if self._include_by_time(item, since_ts) + ] + + reply_parent_urls = self._reply_parent_urls( + timeline=timeline, + full=full, + existing_post_urls=existing_post_urls, + ) + reply_items: list[RawItem] = [] + for parent_url in reply_parent_urls: + thread = self._fetch_thread(run_dir, parent_url) + parent_id = str(thread.get("main_tweet", {}).get("id") or self._id_from_url(parent_url) or "") + for reply in thread.get("replies") or []: + if self._include_by_time(reply, since_ts): + reply_items.append(self._reply_to_item(reply, parent_id=parent_id, parent_url=parent_url)) + + return [item for item in [*timeline_items, *reply_items] if item.content.strip()] + + def _new_run_dir(self) -> Path: + path = self.options.output_dir / time.strftime("%Y%m%d_%H%M%S") + path.mkdir(parents=True, exist_ok=True) + return path + + def _fetch_timeline(self, run_dir: Path, *, full: bool) -> list[dict[str, Any]]: + max_no_new = self.options.full_max_no_new if full else self.options.incremental_max_no_new + self._run_scraper(self.options.username, run_dir, max_no_new=max_no_new) + path = run_dir / f"{self.options.username}_posts.json" + return self._read_json(path, expected="timeline posts") + + def _fetch_thread(self, run_dir: Path, parent_url: str) -> dict[str, Any]: + tweet_id = self._id_from_url(parent_url) + if not tweet_id: + return {"main_tweet": None, "replies": [], "total_replies": 0} + self._run_scraper(parent_url, run_dir, max_no_new=self.options.thread_max_no_new) + path = run_dir / f"thread_{tweet_id}.json" + return self._read_json(path, expected=f"thread {tweet_id}") + + def _run_scraper(self, target: str, run_dir: Path, *, max_no_new: int) -> None: + command = [ + sys.executable, + str(self.options.scraper_path), + target, + "--max-no-new", + str(max_no_new), + "--output-dir", + str(run_dir), + "--browser-provider", + self.options.browser_provider, + ] + result = subprocess.run( + command, + cwd=Path.cwd(), + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + timeout=self.options.command_timeout_seconds, + ) + output = "\n".join(part for part in [result.stdout, result.stderr] if part).strip() + if result.returncode != 0: + raise RuntimeError(f"Twitter scraper failed for {target}: {output[-1200:]}") + if "登录提示" in output or "未登录" in output or "login" in output.lower(): + raise RuntimeError( + "Twitter scraper requires an authenticated X.com browser profile. " + "Run the configured social-media-scraper once with --keep-browser-open, " + "log in to X.com, then retry." + ) + + def _read_json(self, path: Path, *, expected: str) -> Any: + if not path.exists(): + raise RuntimeError(f"Twitter scraper did not produce {expected}: {path}") + return json.loads(path.read_text(encoding="utf-8")) + + def _reply_parent_urls( + self, + *, + timeline: list[dict[str, Any]], + full: bool, + existing_post_urls: Iterable[str], + ) -> list[str]: + urls: list[str] = [] + for item in timeline: + tweet_id = _tweet_id_from_item(item) + url = item.get("url") or (_tweet_url(self.options.username, tweet_id) if tweet_id else "") + if url and _is_original_post(item): + urls.append(str(url)) + + if not full: + urls.extend(str(url) for url in existing_post_urls if url) + + seen: set[str] = set() + unique_urls: list[str] = [] + for url in urls: + if url not in seen: + seen.add(url) + unique_urls.append(url) + + limit = self.options.full_reply_post_limit if full else self.options.incremental_reply_parent_limit + if limit > 0: + return unique_urls[:limit] + return unique_urls + + def _post_to_item(self, item: dict[str, Any]) -> RawItem: + tweet_id = _tweet_id_from_item(item) or "" + url = item.get("url") or _tweet_url(self.options.username, tweet_id) + author = _author_from_url(str(url)) or self.options.username + raw = dict(item) + raw["source_url"] = url + return RawItem( + source="twitter_posts", + source_item_id=f"post:{tweet_id}", + source_url=str(url), + content_type="twitter_post", + author_id=author, + author_name=author, + title=None, + published_at=parse_twitter_time(item.get("date")), + published_at_text=item.get("date"), + updated_at_source=None, + content=str(item.get("text") or ""), + raw=raw, + ) + + def _reply_to_item(self, item: dict[str, Any], *, parent_id: str, parent_url: str) -> RawItem: + tweet_id = _tweet_id_from_item(item) or "" + url = item.get("url") or _tweet_url(_author_from_url(parent_url) or self.options.username, tweet_id) + author = _author_from_url(str(url)) or str(item.get("in_reply_to") or "") + raw = dict(item) + raw["parent_tweet_id"] = parent_id + raw["parent_url"] = parent_url + raw["source_url"] = url + return RawItem( + source="twitter_replies", + source_item_id=f"reply:{tweet_id}", + source_url=str(url), + content_type="twitter_reply", + author_id=author or None, + author_name=author or None, + title=f"Reply to {parent_id}" if parent_id else None, + published_at=parse_twitter_time(item.get("date")), + published_at_text=item.get("date"), + updated_at_source=None, + content=str(item.get("text") or ""), + raw=raw, + ) + + def _include_by_time(self, item: dict[str, Any], since_ts: int | None) -> bool: + if since_ts is None: + return True + published_at = parse_twitter_time(item.get("date")) + if published_at is None: + return True + return published_at >= since_ts + + def _id_from_url(self, url: str) -> str | None: + match = re.search(r"/status/(\d+)", url) + return match.group(1) if match else None diff --git a/Tools/Dashboard/css/style.css b/Tools/Dashboard/css/style.css index 79beafc44..caf65032d 100644 --- a/Tools/Dashboard/css/style.css +++ b/Tools/Dashboard/css/style.css @@ -571,6 +571,31 @@ body::after { color: var(--accent-red); } +.manual-corner-link { + position: fixed; + right: 18px; + bottom: 18px; + z-index: 950; + display: inline-flex; + align-items: center; + justify-content: center; + min-height: 34px; + padding: 0 12px; + border: 1px solid var(--border-color); + border-radius: 6px; + background: var(--bg-card); + color: var(--text-secondary); + font-size: 12px; + font-weight: 700; + text-decoration: none; + box-shadow: var(--shadow-sm); +} + +.manual-corner-link:hover { + color: var(--accent-blue); + border-color: rgba(79,140,255,0.45); +} + @keyframes toastIn { from { opacity: 0; transform: translateY(-10px); } to { opacity: 1; transform: translateY(0); } @@ -5520,6 +5545,326 @@ body::after { font-size: 15px; } +/* ========== Community Monitor ========== */ +.cm-module .module-body { + display: flex; + flex-direction: column; + gap: 14px; +} + +.cm-topbar, +.cm-filter-bar, +.cm-work, +.cm-pager, +.cm-config { + display: flex; + align-items: center; + flex-wrap: wrap; + gap: 8px; +} + +.cm-actions, +.cm-platforms { + display: flex; + align-items: center; + flex-wrap: wrap; + gap: 8px; +} + +.cm-topbar { + justify-content: space-between; +} + +.cm-config { + color: var(--text-muted); + font-size: 12px; +} + +.cm-btn { + min-height: 32px; + padding: 0 12px; + border: 1px solid var(--border-color); + border-radius: 6px; + background: var(--bg-card); + color: var(--text-primary); + font-size: 12px; + font-weight: 700; + cursor: pointer; +} + +.cm-btn.primary { + border-color: var(--accent-blue); + background: var(--accent-blue); + color: #fff; +} + +.cm-btn:disabled { + cursor: default; + opacity: 0.45; +} + +.cm-summary-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(118px, 1fr)); + gap: 10px; +} + +.cm-metric { + padding: 12px; + border: 1px solid var(--border-color); + border-radius: 8px; + background: var(--bg-card); +} + +.cm-metric span { + display: block; + color: var(--text-muted); + font-size: 12px; +} + +.cm-metric strong { + display: block; + margin-top: 4px; + font-size: 22px; + color: var(--text-primary); +} + +.cm-runs-card { + border: 1px solid var(--border-color); + border-radius: 8px; + background: var(--bg-primary); + padding: 12px; +} + +.cm-section-title { + margin-bottom: 8px; + font-size: 13px; + font-weight: 800; + color: var(--text-secondary); +} + +.cm-run-row { + display: grid; + grid-template-columns: 132px 84px 76px minmax(0, 1fr); + gap: 8px; + align-items: center; + min-height: 28px; + font-size: 12px; + color: var(--text-secondary); +} + +.cm-run-status { + font-weight: 800; +} + +.cm-run-status.ok { color: var(--accent-green); } +.cm-run-status.warn { color: var(--accent-orange); } +.cm-run-status.bad { color: var(--accent-red); } + +.cm-list { + display: grid; + gap: 12px; +} + +.cm-item { + border: 1px solid var(--border-color); + border-radius: 8px; + background: var(--bg-card); + padding: 14px; +} + +.cm-item.urgent { + border-color: rgba(239,68,68,0.45); + box-shadow: inset 4px 0 0 rgba(239,68,68,0.85); +} + +.cm-item-head { + display: flex; + align-items: flex-start; + justify-content: space-between; + gap: 12px; +} + +.cm-item-title-block { + min-width: 0; +} + +.cm-item h3 { + margin: 0 0 8px; + font-size: 15px; + line-height: 1.45; +} + +.cm-meta, +.cm-submeta { + display: flex; + align-items: center; + flex-wrap: wrap; + gap: 6px; +} + +.cm-submeta { + margin-top: 8px; + color: var(--text-muted); + font-size: 12px; +} + +.cm-content, +.cm-reason, +.cm-reply { + margin: 10px 0 0; + color: var(--text-secondary); + font-size: 13px; + line-height: 1.65; + white-space: pre-wrap; +} + +.cm-reason { + color: var(--text-muted); +} + +.cm-reply { + padding: 10px 12px; + border-radius: 6px; + background: rgba(79,140,255,0.08); + color: var(--text-primary); +} + +.cm-link { + flex: 0 0 auto; + color: var(--accent-blue); + font-size: 12px; + font-weight: 700; + text-decoration: none; +} + +.cm-link.disabled { + color: var(--text-muted); + pointer-events: none; +} + +.cm-work { + margin-top: 12px; +} + +.cm-work select, +.cm-work input { + min-height: 32px; + padding: 6px 10px; + border: 1px solid var(--border-color); + border-radius: 6px; + background: var(--bg-card); + color: var(--text-primary); + font-size: 12px; +} + +.cm-work input { + min-width: 180px; +} + +.cm-check { + display: inline-flex; + align-items: center; + gap: 5px; + min-height: 32px; + color: var(--text-secondary); + font-size: 12px; + font-weight: 700; +} + +.cm-empty, +.cm-empty-small, +.cm-error { + padding: 22px; + border: 1px dashed var(--border-color); + border-radius: 8px; + color: var(--text-muted); + text-align: center; +} + +.cm-error { + color: var(--accent-red); + background: rgba(239,68,68,0.06); +} + +.cm-modal { + position: fixed; + inset: 0; + z-index: 2200; + display: flex; + align-items: center; + justify-content: center; + padding: 20px; + background: rgba(15, 23, 42, 0.58); +} + +.cm-modal-card { + width: min(760px, 96vw); + max-height: min(760px, 92vh); + display: flex; + flex-direction: column; + border: 1px solid var(--border-color); + border-radius: 8px; + background: var(--bg-card); + box-shadow: var(--shadow); +} + +.cm-modal-head, +.cm-modal-foot { + display: flex; + align-items: center; + justify-content: space-between; + gap: 10px; + padding: 14px 16px; + border-bottom: 1px solid var(--border-color); +} + +.cm-modal-foot { + justify-content: flex-end; + border-top: 1px solid var(--border-color); + border-bottom: 0; +} + +.cm-modal-title { + font-size: 15px; + font-weight: 800; +} + +.cm-modal-close { + width: 30px; + height: 30px; + border: 0; + border-radius: 6px; + background: transparent; + color: var(--text-muted); + font-size: 22px; + cursor: pointer; +} + +.cm-modal-body { + overflow-y: auto; + padding: 16px; +} + +.cm-form-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); + gap: 10px; +} + +.cm-textarea { + width: 100%; + min-height: 160px; + margin: 10px 0; + padding: 10px 12px; + border: 1px solid var(--border-color); + border-radius: 6px; + background: var(--bg-card); + color: var(--text-primary); + font: inherit; + resize: vertical; +} + /* ========== SNS Assistant ========== */ /* 子选项更大更好点击 */ diff --git a/Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3 b/Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3 new file mode 100644 index 000000000..89349237c Binary files /dev/null and b/Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3 differ diff --git a/Tools/Dashboard/docs/dashboard_manual.html b/Tools/Dashboard/docs/dashboard_manual.html new file mode 100644 index 000000000..4731fe20d --- /dev/null +++ b/Tools/Dashboard/docs/dashboard_manual.html @@ -0,0 +1,175 @@ + + + + + + Dashboard 使用手册 + + + +
+

Dashboard 使用手册

+

本文记录 TH1 Dashboard 的启动、数据存储、社区监控集成和多电脑使用规则。当前 Dashboard 的固定入口是 http://127.0.0.1:8080

+ +

日常启动

+

重启电脑后,直接运行:

+
Tools/Dashboard/启动Dashboard.bat
+

脚本会寻找本机 Python,然后执行 serve.py 8080。Dashboard 只能使用 8080 端口,避免与其它临时服务混在一起。

+ +

社区监控是什么

+

社区监控来自 TH01_maintenance 工程,已整合为 Dashboard 的一级模块。它用于抓取《帝国幻想乡~TOHOTOPIA》的社区内容,统一入库、调用大模型分析,再形成处理队列。

+
平台采集器 -> RawItem -> SQLite raw_items -> OpenRouter 分析 -> analysis_results -> work_items -> Dashboard 页面
+

当前支持 Steam 评测、Steam 讨论区主题、Steam 讨论区回复。Twitter/X 采集代码已保留,但默认关闭,需要本机登录态和外部 scraper。

+ +

JSON 与 SQLite 分工

+ + + + + + + + + +
数据类型推荐存储原因
Unity 导出展示数据、设计索引、小型配置JSON便于 Git diff、人工审阅、静态加载。
社媒、邮件、玩家反馈、持续增长的处理队列SQLite适合去重、分页、筛选、排序、并发写入和状态流转。
运行日志、同步批次、补跑状态SQLite需要记录历史与失败原因,JSON 容易产生冲突和性能问题。
+ +

大模型调用规则

+

页面刷新不会调用大模型。只有这些动作会调用 OpenRouter:

+ +

如果 OPENROUTER_API_KEY 没配置,内容仍会入库,分析状态会保留为待补跑或错误,不会丢数据。

+ +

私有配置

+

示例配置在:

+
Tools/Dashboard/community_monitor.env.example
+

本机私有配置建议放在:

+
Tools/Dashboard/private/community_monitor.env
+

常用字段:

+ + +

Git 备份规则

+

当前阶段按项目要求,社区监控 SQLite 数据库会先同步到 Git,用于备份。路径是:

+
Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3
+
+ SQLite 是二进制文件,不适合多人同时修改后用 Git 合并。当前约定是暂时只有一台电脑开启自动同步;其它电脑可以拉取备份,但不要同时运行自动同步。 +
+ +

另一台电脑运行

+
    +
  1. 拉取代码。
  2. +
  3. 确认本机有 Python。
  4. +
  5. 复制 community_monitor.env.exampleprivate/community_monitor.env,填写本机 Key。
  6. +
  7. 运行 Tools/Dashboard/启动Dashboard.bat
  8. +
  9. 如果只是查看 Git 备份数据,建议设置 AUTO_SYNC_ENABLED=false
  10. +
+ +

如果发生 SQLite Git 冲突

+

不要手工合并 SQLite。最快处理是选择一份保留:

+
# 保留远端数据库
+git checkout --theirs Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3
+git add Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3
+
+# 或保留本地数据库
+git checkout --ours Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3
+git add Tools/Dashboard/data/community_monitor/tohotopia_monitor.sqlite3
+

长期如果需要多电脑同时处理,应改成主机共享服务、云端数据库或专门导入导出流程,不应依赖 Git 合并数据库。

+ +

推荐升级方向

+ +
+ + diff --git a/Tools/Dashboard/index.html b/Tools/Dashboard/index.html index 9b493e7ee..92db9897f 100644 --- a/Tools/Dashboard/index.html +++ b/Tools/Dashboard/index.html @@ -113,6 +113,10 @@ SNS助手 + + + + + + +
+ + +
+ +
+
+
+
最近同步
+
+
+
+ + + + + + + + +
+
+
点击社区监控后加载数据...
+
+
+ + + +
@@ -1312,6 +1396,7 @@ + diff --git a/Tools/Dashboard/js/community_monitor.js b/Tools/Dashboard/js/community_monitor.js new file mode 100644 index 000000000..98ea81c52 --- /dev/null +++ b/Tools/Dashboard/js/community_monitor.js @@ -0,0 +1,427 @@ +/* TH1 Dashboard - Community Monitor */ + +let cmLoaded = false; +let cmState = { + page: 1, + pageSize: 80, + total: 0, + statusLabels: {}, +}; + +const CM_STATUS_LABELS = { + new: '未处理', + read: '已读', + needs_reply: '待回复', + replied: '已回复', + needs_fix: '待修复', + archived: '已归档', +}; + +const CM_SOURCE_LABELS = { + steam_reviews: 'Steam评测', + steam_discussions: 'Steam讨论区', + twitter_posts: 'Twitter帖子', + twitter_replies: 'Twitter回复', + manual: '手动添加', +}; + +const CM_TYPE_LABELS = { + review: '评测', + discussion_topic: '讨论主题', + discussion_reply: '讨论回复', + twitter_post: 'Twitter帖子', + twitter_reply: 'Twitter回复', + manual_note: '手动信息', +}; + +async function cmLoad(force = false) { + if (cmLoaded && !force) return; + cmLoaded = true; + await Promise.all([cmLoadOverview(), cmLoadItems()]); + cmBindOnce(); +} + +async function cmLoadOverview() { + const summaryEl = document.getElementById('cm-summary'); + const runsEl = document.getElementById('cm-runs'); + if (summaryEl) summaryEl.innerHTML = '
正在加载社区监控统计...
'; + try { + const resp = await fetch('/api/community-monitor/overview?t=' + Date.now()); + const data = await resp.json(); + if (!resp.ok || data.error) throw new Error(data.error || `HTTP ${resp.status}`); + cmState.statusLabels = data.statusLabels || CM_STATUS_LABELS; + cmRenderSummary(data); + cmRenderRuns(data.runs || []); + cmRenderConfig(data.status || {}); + } catch (err) { + if (summaryEl) summaryEl.innerHTML = `
加载失败:${cmEsc(err.message)}
`; + if (runsEl) runsEl.innerHTML = ''; + } +} + +async function cmLoadItems() { + const listEl = document.getElementById('cm-list'); + if (listEl) listEl.innerHTML = '
正在加载社区内容...
'; + const params = new URLSearchParams(cmCollectFilters()); + params.set('page', String(cmState.page)); + params.set('pageSize', String(cmState.pageSize)); + try { + const resp = await fetch('/api/community-monitor/items?' + params.toString()); + const data = await resp.json(); + if (!resp.ok || data.error) throw new Error(data.error || `HTTP ${resp.status}`); + cmState.total = data.total || 0; + cmState.page = data.page || 1; + cmState.pageSize = data.pageSize || 80; + cmState.statusLabels = data.statusLabels || CM_STATUS_LABELS; + cmRenderItems(data.items || []); + cmRenderPager(); + } catch (err) { + if (listEl) listEl.innerHTML = `
加载失败:${cmEsc(err.message)}
`; + } +} + +function cmRenderSummary(data) { + const el = document.getElementById('cm-summary'); + if (!el) return; + const m = data.metrics || {}; + const cards = [ + ['总内容', m.total || 0], + ['未处理', m.new_count || 0], + ['负面', m.negative_count || 0], + ['具体反馈', m.actionable_count || 0], + ['建议回复', m.reply_count || 0], + ['高优先级', m.high_count || 0], + ['已分析', m.analyzed_count || 0], + ['待补跑', (m.pending_count || 0) + (m.error_count || 0)], + ]; + el.innerHTML = cards.map(([label, value]) => ` +
+ ${cmEsc(label)} + ${cmEsc(String(value))} +
+ `).join(''); +} + +function cmRenderConfig(status) { + const el = document.getElementById('cm-config'); + if (!el) return; + const openrouter = status.openrouterConfigured ? '已配置' : '未配置'; + const autoSync = status.autoSyncEnabled ? `${status.syncIntervalMinutes || 30} 分钟` : '关闭'; + const twitter = status.twitterEnabled ? '开启' : '关闭'; + el.innerHTML = ` + 数据库:${cmEsc(status.databasePath || '')} + OpenRouter:${openrouter} + 自动同步:${autoSync} + Twitter:${twitter} + `; +} + +function cmRenderRuns(runs) { + const el = document.getElementById('cm-runs'); + if (!el) return; + if (!runs.length) { + el.innerHTML = '
暂无同步记录
'; + return; + } + el.innerHTML = runs.map(run => { + const statusClass = run.status === 'success' ? 'ok' : run.status === 'failed' ? 'bad' : 'warn'; + return `
+ ${cmFormatTs(run.started_at)} + ${cmEsc(run.mode || '')} + ${cmEsc(run.status || '')} + ${cmEsc(cmShortStats(run.stats || {}))} +
`; + }).join(''); +} + +function cmRenderItems(items) { + const el = document.getElementById('cm-list'); + if (!el) return; + if (!items.length) { + el.innerHTML = '
暂无匹配内容。可以先执行增量同步或全量同步。
'; + return; + } + el.innerHTML = items.map(cmRenderItem).join(''); +} + +function cmRenderItem(item) { + const urgent = item.reply_recommended || item.priority === 'high'; + const status = item.status || 'new'; + const summary = item.summary || item.title || item.content || '未分析内容'; + const content = item.content || ''; + const reason = item.reason || ''; + const reply = item.reply_suggestion || ''; + const published = cmFormatTs(item.published_at) || item.published_at_text || cmFormatTs(item.collected_at); + const feedbackTypes = Array.isArray(item.feedbackTypes) ? item.feedbackTypes.join(', ') : ''; + return `
+
+
+

${cmEsc(summary).slice(0, 180)}

+
+ ${cmBadge(CM_SOURCE_LABELS[item.source] || item.source || 'unknown', 'blue')} + ${cmBadge(CM_TYPE_LABELS[item.content_type] || item.content_type || 'unknown', 'gray')} + ${cmBadge(cmSentimentLabel(item.sentiment || item.analysis_status || 'pending'), cmSentimentColor(item.sentiment))} + ${cmBadge(cmPriorityLabel(item.priority), item.priority === 'high' ? 'red' : item.priority === 'medium' ? 'orange' : 'gray')} + ${item.has_actionable_feedback ? cmBadge('具体反馈', 'purple') : ''} + ${item.reply_recommended ? cmBadge('建议回复', 'red') : ''} +
+
+ 原始链接 +
+
+ ${cmEsc(item.author_name || item.author_id || '未知作者')} + ${cmEsc(published || '')} + ${cmEsc(feedbackTypes)} +
+

${cmEsc(cmTruncate(content, 900))}

+ ${reason ? `

${cmEsc(reason)}

` : ''} + ${reply ? `
${cmEsc(reply)}
` : ''} +
+ + + + +
+
`; +} + +function cmRenderPager() { + const el = document.getElementById('cm-pager'); + if (!el) return; + const pages = Math.max(1, Math.ceil(cmState.total / cmState.pageSize)); + el.innerHTML = ` + + 第 ${cmState.page} / ${pages} 页,共 ${cmState.total} 条 + + `; +} + +function cmGoPage(page) { + cmState.page = Math.max(1, page); + cmLoadItems(); +} + +function cmCollectFilters() { + const ids = ['source', 'contentType', 'sentiment', 'status', 'analysisStatus', 'q']; + const result = {}; + ids.forEach(key => { + const el = document.getElementById('cm-filter-' + key); + if (el && el.value) result[key] = el.value; + }); + const reply = document.getElementById('cm-filter-reply'); + const actionable = document.getElementById('cm-filter-actionable'); + if (reply && reply.checked) result.reply = '1'; + if (actionable && actionable.checked) result.actionable = '1'; + return result; +} + +function cmBindOnce() { + const panel = document.getElementById('panel-community-monitor'); + if (!panel || panel.dataset.cmBound) return; + panel.dataset.cmBound = '1'; + panel.querySelectorAll('[data-cm-filter]').forEach(el => { + el.addEventListener(el.type === 'checkbox' ? 'change' : 'input', cmDebouncedFilter); + if (el.tagName === 'SELECT') el.addEventListener('change', cmApplyFilters); + }); +} + +let cmFilterTimer = null; +function cmDebouncedFilter() { + clearTimeout(cmFilterTimer); + cmFilterTimer = setTimeout(cmApplyFilters, 260); +} + +function cmApplyFilters() { + cmState.page = 1; + cmLoadItems(); +} + +async function cmTriggerSync(full = false) { + const platforms = []; + const steam = document.getElementById('cm-platform-steam'); + const twitter = document.getElementById('cm-platform-twitter'); + if (!steam || steam.checked) platforms.push('steam'); + if (twitter && twitter.checked) platforms.push('twitter'); + try { + const resp = await fetch('/api/community-monitor/sync', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ full, platforms }) + }); + const result = await resp.json(); + if (!resp.ok || !result.success) throw new Error(result.error || `HTTP ${resp.status}`); + showToast(result.message || '同步已开始', 'success'); + setTimeout(() => cmLoadOverview(), 800); + } catch (err) { + showToast('同步启动失败: ' + err.message, 'error'); + } +} + +async function cmTriggerAnalyze() { + try { + const resp = await fetch('/api/community-monitor/analyze-pending', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ limit: 20 }) + }); + const result = await resp.json(); + if (!resp.ok || !result.success) throw new Error(result.error || `HTTP ${resp.status}`); + showToast(result.message || '补跑分析已开始', 'success'); + setTimeout(() => cmLoadOverview(), 800); + } catch (err) { + showToast('补跑失败: ' + err.message, 'error'); + } +} + +async function cmSaveWork(id) { + const statusEl = document.querySelector(`.cm-work-status[data-id="${id}"]`); + const ownerEl = document.querySelector(`.cm-work-owner[data-id="${id}"]`); + const notesEl = document.querySelector(`.cm-work-notes[data-id="${id}"]`); + try { + const resp = await fetch(`/api/community-monitor/items/${id}/work`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + status: statusEl ? statusEl.value : 'new', + owner: ownerEl ? ownerEl.value : '', + notes: notesEl ? notesEl.value : '', + }) + }); + const result = await resp.json(); + if (!resp.ok || !result.success) throw new Error(result.error || `HTTP ${resp.status}`); + showToast('处理状态已保存', 'success'); + cmLoadOverview(); + } catch (err) { + showToast('保存失败: ' + err.message, 'error'); + } +} + +function cmShowManualModal() { + const existing = document.getElementById('cm-manual-modal'); + if (existing) existing.remove(); + const overlay = document.createElement('div'); + overlay.id = 'cm-manual-modal'; + overlay.className = 'cm-modal'; + overlay.innerHTML = ` +
+
+
手动添加社区信息
+ +
+
+
+ + + + + + +
+ +
+ + +
+
+
+ + +
+
+ `; + document.body.appendChild(overlay); + overlay.addEventListener('click', e => { + if (e.target === overlay) overlay.remove(); + }); +} + +async function cmSubmitManual() { + const payload = { + sourceName: document.getElementById('cm-manual-source')?.value || '', + sourceUrl: document.getElementById('cm-manual-url')?.value || '', + title: document.getElementById('cm-manual-title')?.value || '', + authorName: document.getElementById('cm-manual-author')?.value || '', + publishedAtText: document.getElementById('cm-manual-time')?.value || '', + content: document.getElementById('cm-manual-content')?.value || '', + status: document.getElementById('cm-manual-status')?.value || 'new', + owner: document.getElementById('cm-manual-owner')?.value || '', + notes: document.getElementById('cm-manual-notes')?.value || '', + }; + try { + const resp = await fetch('/api/community-monitor/manual-items', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload) + }); + const result = await resp.json(); + if (!resp.ok || !result.success) throw new Error(result.error || `HTTP ${resp.status}`); + document.getElementById('cm-manual-modal')?.remove(); + showToast(result.analysisError ? '已入库,分析待补跑' : '已入库并分析', 'success'); + cmState.page = 1; + await Promise.all([cmLoadOverview(), cmLoadItems()]); + } catch (err) { + showToast('提交失败: ' + err.message, 'error'); + } +} + +function cmBadge(text, color) { + return `${cmEsc(text)}`; +} + +function cmSentimentLabel(value) { + return { positive: '正面', negative: '负面', mixed: '混合', neutral: '中性', pending: '待分析', error: '分析失败', done: '已分析' }[value] || value || '待分析'; +} + +function cmSentimentColor(value) { + return { positive: 'green', negative: 'red', mixed: 'orange', neutral: 'blue' }[value] || 'gray'; +} + +function cmPriorityLabel(value) { + return { high: '高', medium: '中', low: '低' }[value] || value || '低'; +} + +function cmFormatTs(value) { + if (!value) return ''; + const date = new Date(Number(value) * 1000); + if (Number.isNaN(date.getTime())) return ''; + const pad = n => String(n).padStart(2, '0'); + return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}`; +} + +function cmShortStats(stats) { + const parts = Object.entries(stats).slice(0, 5).map(([key, value]) => `${key}=${value}`); + return parts.join(' · '); +} + +function cmTruncate(text, max) { + if (!text || text.length <= max) return text || ''; + return text.slice(0, max) + '...'; +} + +function cmEsc(value) { + const div = document.createElement('div'); + div.textContent = value == null ? '' : String(value); + return div.innerHTML; +} + +(function () { + const observer = new MutationObserver(() => { + const panel = document.getElementById('panel-community-monitor'); + if (panel && panel.classList.contains('active')) { + cmLoad(); + } + }); + const panel = document.getElementById('panel-community-monitor'); + if (panel) { + observer.observe(panel, { attributes: true, attributeFilter: ['class'] }); + if (panel.classList.contains('active')) cmLoad(); + } +})(); diff --git a/Tools/Dashboard/serve.py b/Tools/Dashboard/serve.py index c08245b44..dce5c7fc5 100644 --- a/Tools/Dashboard/serve.py +++ b/Tools/Dashboard/serve.py @@ -105,6 +105,7 @@ CODEX_ARCHIVED_SESSIONS_DIR = os.path.join(CODEX_HOME, 'archived_sessions') CODEX_SESSION_INDEX = os.path.join(CODEX_HOME, 'session_index.jsonl') CODEX_JOBS = {} CODEX_JOBS_LOCK = threading.Lock() +COMMUNITY_MONITOR_API = None BALANCE_MODELING_DIR = os.path.join(PROJECT_ROOT, 'Design', 'drafts', 'planning', 'balance_modeling') BALANCE_MODELING_DATA_DIR = os.path.join(BALANCE_MODELING_DIR, 'data') SKILL_DATA_ASSET = os.path.join(PROJECT_ROOT, 'Unity', 'Assets', 'BundleResources', 'DataAssets', 'SkillDataAssets.asset') @@ -180,6 +181,14 @@ DESIGN_DOCS = [ ] +def _community_monitor(): + global COMMUNITY_MONITOR_API + if COMMUNITY_MONITOR_API is None: + from community_monitor import api as monitor_api + COMMUNITY_MONITOR_API = monitor_api + return COMMUNITY_MONITOR_API + + def _load_bugs(): """Load bugs.json from DOC/, return dict with nextId and bugs list.""" path = os.path.normpath(BUGS_FILE) @@ -820,9 +829,10 @@ class DashboardHandler(http.server.SimpleHTTPRequestHandler): extensions_map = { **http.server.SimpleHTTPRequestHandler.extensions_map, - '.js': 'application/javascript', - '.json': 'application/json', - '.css': 'text/css', + '.html': 'text/html; charset=utf-8', + '.js': 'application/javascript; charset=utf-8', + '.json': 'application/json; charset=utf-8', + '.css': 'text/css; charset=utf-8', } def end_headers(self): @@ -937,6 +947,9 @@ class DashboardHandler(http.server.SimpleHTTPRequestHandler): if self.path.startswith('/api/codex/jobs'): self._handle_codex_job_get() return + if self.path.startswith('/api/community-monitor/'): + self._handle_community_monitor_get() + return # SNS APIs if self.path.startswith('/api/sns/'): self._handle_sns_get() @@ -1099,6 +1112,8 @@ class DashboardHandler(http.server.SimpleHTTPRequestHandler): self._handle_dashboard_preferences_save() elif self.path == '/api/codex/run': self._handle_codex_run() + elif self.path.startswith('/api/community-monitor/'): + self._handle_community_monitor_post() # SNS APIs elif self.path.startswith('/api/sns/'): self._handle_sns_post() @@ -1116,6 +1131,63 @@ class DashboardHandler(http.server.SimpleHTTPRequestHandler): self.end_headers() self.wfile.write(body) + # ---- Community monitor ---- + + def _read_json_body(self): + length = int(self.headers.get('Content-Length', 0)) + if length <= 0: + return {} + raw = self.rfile.read(length) + if not raw: + return {} + return json.loads(raw.decode('utf-8')) + + def _handle_community_monitor_get(self): + try: + monitor = _community_monitor() + parsed = urlparse(self.path) + if parsed.path == '/api/community-monitor/overview': + self._send_json(monitor.overview()) + return + if parsed.path == '/api/community-monitor/items': + query = parse_qs(parsed.query) + filters = {key: values[0] for key, values in query.items() if values} + self._send_json(monitor.list_items(filters)) + return + if parsed.path == '/api/community-monitor/status': + self._send_json(monitor.status()) + return + self._send_json({'error': 'Unknown community monitor endpoint'}, 404) + except Exception as e: + self._send_json({'error': str(e), 'type': type(e).__name__}, 500) + + def _handle_community_monitor_post(self): + try: + monitor = _community_monitor() + parsed = urlparse(self.path) + path = parsed.path + payload = self._read_json_body() + if path == '/api/community-monitor/sync': + platforms = payload.get('platforms') + if not isinstance(platforms, list): + platforms = None + platforms = [str(p) for p in platforms] if platforms else None + self._send_json(monitor.trigger_sync(bool(payload.get('full')), platforms)) + return + if path == '/api/community-monitor/analyze-pending': + self._send_json(monitor.trigger_analyze(int(payload.get('limit') or 20))) + return + if path == '/api/community-monitor/manual-items': + self._send_json(monitor.create_manual_item(payload)) + return + match = re.match(r'^/api/community-monitor/items/(\d+)/work$', path) + if match: + self._send_json(monitor.update_work(int(match.group(1)), payload)) + return + self._send_json({'error': 'Unknown community monitor action'}, 404) + except Exception as e: + self._send_json({'success': False, 'error': str(e), 'type': type(e).__name__}, 500) + # ---- Gmail email processing ---- def _email_db(self): @@ -5164,6 +5236,11 @@ def kill_stale_servers(port): def main(): os.chdir(SCRIPT_DIR) + try: + _community_monitor().start_background_sync() + except Exception as e: + print(f'Community monitor not started: {e}') + print(f'Checking port {PORT} for stale processes...') kill_stale_servers(PORT) diff --git a/Tools/Dashboard/启动Dashboard.bat b/Tools/Dashboard/启动Dashboard.bat index 05761a3f3..48d377526 100644 --- a/Tools/Dashboard/启动Dashboard.bat +++ b/Tools/Dashboard/启动Dashboard.bat @@ -28,5 +28,12 @@ if not defined PYTHON_EXE ( ) echo Using Python: "%PYTHON_EXE%" -"%PYTHON_EXE%" serve.py +echo Checking Dashboard community monitor dependencies... +"%PYTHON_EXE%" -c "import httpx, bs4" >nul 2>nul +if errorlevel 1 ( + echo Installing required Python packages: httpx beautifulsoup4 requests + "%PYTHON_EXE%" -m pip install httpx==0.28.1 beautifulsoup4==4.12.3 requests==2.31.0 +) + +"%PYTHON_EXE%" serve.py 8080 pause