367 lines
13 KiB
Python
367 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import Counter
|
|
from hashlib import sha1
|
|
import sqlite3
|
|
import time
|
|
from typing import Any
|
|
|
|
from .config import Settings
|
|
from .db import decode_json, encode_json, init_db
|
|
from .models import RawItem
|
|
from .openrouter import OpenRouterClient
|
|
from .steam import SteamClient, iter_nonempty
|
|
from .twitter import TwitterClient, TwitterScrapeOptions
|
|
|
|
|
|
def _now() -> int:
|
|
return int(time.time())
|
|
|
|
|
|
def _hash(text: str) -> str:
|
|
return sha1(text.encode("utf-8", errors="ignore")).hexdigest()
|
|
|
|
|
|
def upsert_raw_item(conn: sqlite3.Connection, item: RawItem) -> tuple[int, bool]:
|
|
now = _now()
|
|
item_hash = _hash(item.content)
|
|
existing = conn.execute(
|
|
"SELECT id, content_hash FROM raw_items WHERE source = ? AND source_item_id = ?",
|
|
(item.source, item.source_item_id),
|
|
).fetchone()
|
|
if existing:
|
|
if existing["content_hash"] != item_hash:
|
|
conn.execute(
|
|
"""
|
|
UPDATE raw_items
|
|
SET source_url = ?, author_id = ?, author_name = ?, title = ?,
|
|
published_at = ?, published_at_text = ?, updated_at_source = ?,
|
|
content = ?, raw_json = ?, content_hash = ?, analysis_status = 'pending',
|
|
collected_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
item.source_url,
|
|
item.author_id,
|
|
item.author_name,
|
|
item.title,
|
|
item.published_at,
|
|
item.published_at_text,
|
|
item.updated_at_source,
|
|
item.content,
|
|
encode_json(item.raw),
|
|
item_hash,
|
|
now,
|
|
existing["id"],
|
|
),
|
|
)
|
|
return int(existing["id"]), False
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO raw_items (
|
|
source, source_item_id, source_url, content_type, author_id, author_name,
|
|
title, published_at, published_at_text, collected_at, updated_at_source,
|
|
content, raw_json, content_hash, analysis_status
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')
|
|
""",
|
|
(
|
|
item.source,
|
|
item.source_item_id,
|
|
item.source_url,
|
|
item.content_type,
|
|
item.author_id,
|
|
item.author_name,
|
|
item.title,
|
|
item.published_at,
|
|
item.published_at_text,
|
|
now,
|
|
item.updated_at_source,
|
|
item.content,
|
|
encode_json(item.raw),
|
|
item_hash,
|
|
),
|
|
)
|
|
raw_item_id = int(cursor.lastrowid)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO work_items (raw_item_id, status, owner, notes, created_at, updated_at)
|
|
VALUES (?, 'new', '', '', ?, ?)
|
|
""",
|
|
(raw_item_id, now, now),
|
|
)
|
|
return raw_item_id, True
|
|
|
|
|
|
def save_analysis(
|
|
conn: sqlite3.Connection,
|
|
raw_item_id: int,
|
|
model: str,
|
|
analysis: dict[str, Any],
|
|
) -> None:
|
|
now = _now()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO analysis_results (
|
|
raw_item_id, model, sentiment, is_positive, is_negative,
|
|
has_actionable_feedback, feedback_types, reply_recommended, reply_priority,
|
|
reply_suggestion, summary, priority, confidence, reason, model_json, analyzed_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(raw_item_id) DO UPDATE SET
|
|
model = excluded.model,
|
|
sentiment = excluded.sentiment,
|
|
is_positive = excluded.is_positive,
|
|
is_negative = excluded.is_negative,
|
|
has_actionable_feedback = excluded.has_actionable_feedback,
|
|
feedback_types = excluded.feedback_types,
|
|
reply_recommended = excluded.reply_recommended,
|
|
reply_priority = excluded.reply_priority,
|
|
reply_suggestion = excluded.reply_suggestion,
|
|
summary = excluded.summary,
|
|
priority = excluded.priority,
|
|
confidence = excluded.confidence,
|
|
reason = excluded.reason,
|
|
model_json = excluded.model_json,
|
|
analyzed_at = excluded.analyzed_at
|
|
""",
|
|
(
|
|
raw_item_id,
|
|
model,
|
|
analysis["sentiment"],
|
|
int(analysis["is_positive"]),
|
|
int(analysis["is_negative"]),
|
|
int(analysis["has_actionable_feedback"]),
|
|
encode_json(analysis["feedback_types"]),
|
|
int(analysis["reply_recommended"]),
|
|
analysis["reply_priority"],
|
|
analysis["reply_suggestion"],
|
|
analysis["summary"],
|
|
analysis["priority"],
|
|
analysis["confidence"],
|
|
analysis["reason"],
|
|
encode_json(analysis),
|
|
now,
|
|
),
|
|
)
|
|
conn.execute("UPDATE raw_items SET analysis_status = 'done' WHERE id = ?", (raw_item_id,))
|
|
|
|
|
|
def _twitter_high_watermark_ts(conn: sqlite3.Connection) -> int | None:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT MAX(COALESCE(published_at, collected_at)) AS watermark
|
|
FROM raw_items
|
|
WHERE source IN ('twitter_posts', 'twitter_replies')
|
|
"""
|
|
).fetchone()
|
|
if row and row["watermark"]:
|
|
return int(row["watermark"])
|
|
return None
|
|
|
|
|
|
def _recent_twitter_post_urls(conn: sqlite3.Connection, limit: int) -> list[str]:
|
|
if limit <= 0:
|
|
return []
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT source_url
|
|
FROM raw_items
|
|
WHERE source = 'twitter_posts'
|
|
ORDER BY COALESCE(published_at, collected_at) DESC, collected_at DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [str(row["source_url"]) for row in rows if row["source_url"]]
|
|
|
|
|
|
def _twitter_options(settings: Settings) -> TwitterScrapeOptions:
|
|
return TwitterScrapeOptions(
|
|
username=settings.twitter_username,
|
|
scraper_path=settings.twitter_scraper_path,
|
|
output_dir=settings.twitter_output_dir,
|
|
browser_provider=settings.twitter_browser_provider,
|
|
full_max_no_new=settings.twitter_full_max_no_new,
|
|
incremental_max_no_new=settings.twitter_incremental_max_no_new,
|
|
thread_max_no_new=settings.twitter_thread_max_no_new,
|
|
command_timeout_seconds=settings.twitter_command_timeout_seconds,
|
|
full_reply_post_limit=settings.twitter_full_reply_post_limit,
|
|
incremental_reply_parent_limit=settings.twitter_incremental_reply_parent_limit,
|
|
)
|
|
|
|
|
|
def run_sync(
|
|
conn: sqlite3.Connection,
|
|
settings: Settings,
|
|
full: bool = False,
|
|
platforms: list[str] | None = None,
|
|
) -> dict[str, Any]:
|
|
init_db(conn)
|
|
started = _now()
|
|
mode = "full" if full else "incremental"
|
|
run_id = conn.execute(
|
|
"INSERT INTO sync_runs (started_at, mode, status) VALUES (?, ?, 'running')",
|
|
(started, mode),
|
|
).lastrowid
|
|
conn.commit()
|
|
|
|
stats: Counter[str] = Counter()
|
|
messages: list[str] = []
|
|
try:
|
|
enabled_platforms = platforms or ["steam", "twitter"]
|
|
if "twitter" in enabled_platforms and not settings.twitter_enabled:
|
|
stats["twitter_skipped"] += 1
|
|
raw_items: list[RawItem] = []
|
|
if "steam" in enabled_platforms:
|
|
steam = SteamClient(settings.app_id)
|
|
try:
|
|
review_pages = None if full else 2
|
|
review_items = steam.fetch_reviews(max_pages=review_pages)
|
|
discussion_pages = (
|
|
settings.discussion_full_scan_max_pages
|
|
if full
|
|
else settings.discussion_incremental_max_pages
|
|
)
|
|
discussion_items = steam.fetch_discussions(
|
|
full=full,
|
|
max_pages=discussion_pages,
|
|
time_limit_seconds=settings.full_scan_time_limit_seconds,
|
|
)
|
|
steam_items = list(iter_nonempty([*review_items, *discussion_items]))
|
|
raw_items.extend(steam_items)
|
|
stats["steam_fetched"] = len(steam_items)
|
|
finally:
|
|
steam.close()
|
|
|
|
if "twitter" in enabled_platforms and settings.twitter_enabled:
|
|
try:
|
|
since_ts = None if full else _twitter_high_watermark_ts(conn)
|
|
existing_urls = _recent_twitter_post_urls(
|
|
conn,
|
|
settings.twitter_incremental_reply_parent_limit,
|
|
)
|
|
twitter = TwitterClient(_twitter_options(settings))
|
|
twitter_items = twitter.fetch_items(
|
|
full=full,
|
|
since_ts=since_ts,
|
|
existing_post_urls=existing_urls,
|
|
)
|
|
raw_items.extend(twitter_items)
|
|
stats["twitter_fetched"] = len(twitter_items)
|
|
except Exception as exc: # noqa: BLE001 - keep Steam and old Twitter data intact
|
|
stats["twitter_errors"] += 1
|
|
stats[f"twitter_error:{type(exc).__name__}"] += 1
|
|
messages.append(f"twitter: {exc}")
|
|
|
|
stats["fetched"] = len(raw_items)
|
|
analyzer = OpenRouterClient(settings)
|
|
try:
|
|
for item in raw_items:
|
|
raw_item_id, inserted = upsert_raw_item(conn, item)
|
|
prefix = item.source.split("_", 1)[0]
|
|
stats["inserted" if inserted else "seen"] += 1
|
|
stats[f"{prefix}_{'inserted' if inserted else 'seen'}"] += 1
|
|
if inserted:
|
|
try:
|
|
analysis = analyzer.analyze(item)
|
|
save_analysis(conn, raw_item_id, settings.openrouter_model, analysis)
|
|
stats["analyzed"] += 1
|
|
except Exception as exc: # noqa: BLE001 - keep item pending for retry
|
|
conn.execute(
|
|
"UPDATE raw_items SET analysis_status = 'error' WHERE id = ?",
|
|
(raw_item_id,),
|
|
)
|
|
stats["analysis_errors"] += 1
|
|
stats[f"analysis_error:{type(exc).__name__}"] += 1
|
|
conn.commit()
|
|
finally:
|
|
analyzer.close()
|
|
|
|
finished = _now()
|
|
status = "partial" if messages else "success"
|
|
conn.execute(
|
|
"""
|
|
UPDATE sync_runs
|
|
SET finished_at = ?, status = ?, message = ?, stats_json = ?
|
|
WHERE id = ?
|
|
""",
|
|
(finished, status, "\n".join(messages), encode_json(dict(stats)), run_id),
|
|
)
|
|
if status == "success":
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO sync_state (key, value, updated_at)
|
|
VALUES ('last_sync_mode', ?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at
|
|
""",
|
|
(mode, finished),
|
|
)
|
|
return dict(stats)
|
|
except Exception as exc:
|
|
finished = _now()
|
|
conn.execute(
|
|
"""
|
|
UPDATE sync_runs
|
|
SET finished_at = ?, status = 'failed', message = ?, stats_json = ?
|
|
WHERE id = ?
|
|
""",
|
|
(finished, str(exc), encode_json(dict(stats)), run_id),
|
|
)
|
|
raise
|
|
|
|
|
|
def analyze_pending(
|
|
conn: sqlite3.Connection,
|
|
settings: Settings,
|
|
limit: int = 50,
|
|
since_ts: int | None = None,
|
|
) -> dict[str, Any]:
|
|
init_db(conn)
|
|
analyzer = OpenRouterClient(settings)
|
|
stats: Counter[str] = Counter()
|
|
try:
|
|
params: list[Any] = []
|
|
since_clause = ""
|
|
if since_ts is not None:
|
|
since_clause = "AND COALESCE(published_at, collected_at) >= ?"
|
|
params.append(since_ts)
|
|
params.append(limit)
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT * FROM raw_items
|
|
WHERE analysis_status IN ('pending', 'error')
|
|
{since_clause}
|
|
ORDER BY COALESCE(published_at, collected_at) DESC, collected_at DESC, id DESC
|
|
LIMIT ?
|
|
""",
|
|
params,
|
|
).fetchall()
|
|
for row in rows:
|
|
item = RawItem(
|
|
source=row["source"],
|
|
source_item_id=row["source_item_id"],
|
|
source_url=row["source_url"],
|
|
content_type=row["content_type"],
|
|
author_id=row["author_id"],
|
|
author_name=row["author_name"],
|
|
title=row["title"],
|
|
published_at=row["published_at"],
|
|
published_at_text=row["published_at_text"],
|
|
updated_at_source=row["updated_at_source"],
|
|
content=row["content"],
|
|
raw=decode_json(row["raw_json"], {}),
|
|
)
|
|
try:
|
|
analysis = analyzer.analyze(item)
|
|
save_analysis(conn, int(row["id"]), settings.openrouter_model, analysis)
|
|
stats["analyzed"] += 1
|
|
conn.commit()
|
|
except Exception as exc: # noqa: BLE001
|
|
stats["analysis_errors"] += 1
|
|
stats[f"analysis_error:{type(exc).__name__}"] += 1
|
|
return dict(stats)
|
|
finally:
|
|
analyzer.close()
|