TH01_maintenance/app/twitter.py
2026-05-30 23:30:55 +08:00

247 lines
8.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
import calendar
import json
from pathlib import Path
import re
import subprocess
import sys
import time
from typing import Any, Iterable
from .models import RawItem
TWITTER_EPOCH_FORMAT = "%a %b %d %H:%M:%S +0000 %Y"
NORMALIZED_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
@dataclass(frozen=True)
class TwitterScrapeOptions:
username: str
scraper_path: Path
output_dir: Path
browser_provider: str
full_max_no_new: int
incremental_max_no_new: int
thread_max_no_new: int
command_timeout_seconds: int
full_reply_post_limit: int
incremental_reply_parent_limit: int
def parse_twitter_time(value: str | None) -> int | None:
if not value:
return None
text = value.strip()
for fmt in (NORMALIZED_DATE_FORMAT, TWITTER_EPOCH_FORMAT):
try:
parsed = time.strptime(text, fmt)
return calendar.timegm(parsed)
except ValueError:
continue
return None
def _author_from_url(url: str | None) -> str | None:
if not url:
return None
match = re.search(r"(?:x\.com|twitter\.com)/([^/?#]+)/status/\d+", url)
if not match:
return None
value = match.group(1)
return value if value and value.lower() != "i" else None
def _tweet_id_from_item(item: dict[str, Any]) -> str | None:
value = item.get("id")
if value:
return str(value)
url = str(item.get("url") or "")
match = re.search(r"/status/(\d+)", url)
return match.group(1) if match else None
def _tweet_url(username: str, tweet_id: str) -> str:
return f"https://x.com/{username}/status/{tweet_id}"
def _is_original_post(item: dict[str, Any]) -> bool:
return not bool(item.get("is_retweet"))
class TwitterClient:
def __init__(self, options: TwitterScrapeOptions) -> None:
self.options = options
def fetch_items(
self,
*,
full: bool,
since_ts: int | None,
existing_post_urls: Iterable[str] = (),
) -> list[RawItem]:
run_dir = self._new_run_dir()
timeline = self._fetch_timeline(run_dir, full=full)
timeline_items = [
self._post_to_item(item)
for item in timeline
if self._include_by_time(item, since_ts)
]
reply_parent_urls = self._reply_parent_urls(
timeline=timeline,
full=full,
existing_post_urls=existing_post_urls,
)
reply_items: list[RawItem] = []
for parent_url in reply_parent_urls:
thread = self._fetch_thread(run_dir, parent_url)
parent_id = str(thread.get("main_tweet", {}).get("id") or self._id_from_url(parent_url) or "")
for reply in thread.get("replies") or []:
if self._include_by_time(reply, since_ts):
reply_items.append(self._reply_to_item(reply, parent_id=parent_id, parent_url=parent_url))
return [item for item in [*timeline_items, *reply_items] if item.content.strip()]
def _new_run_dir(self) -> Path:
path = self.options.output_dir / time.strftime("%Y%m%d_%H%M%S")
path.mkdir(parents=True, exist_ok=True)
return path
def _fetch_timeline(self, run_dir: Path, *, full: bool) -> list[dict[str, Any]]:
max_no_new = self.options.full_max_no_new if full else self.options.incremental_max_no_new
self._run_scraper(self.options.username, run_dir, max_no_new=max_no_new)
path = run_dir / f"{self.options.username}_posts.json"
return self._read_json(path, expected="timeline posts")
def _fetch_thread(self, run_dir: Path, parent_url: str) -> dict[str, Any]:
tweet_id = self._id_from_url(parent_url)
if not tweet_id:
return {"main_tweet": None, "replies": [], "total_replies": 0}
self._run_scraper(parent_url, run_dir, max_no_new=self.options.thread_max_no_new)
path = run_dir / f"thread_{tweet_id}.json"
return self._read_json(path, expected=f"thread {tweet_id}")
def _run_scraper(self, target: str, run_dir: Path, *, max_no_new: int) -> None:
command = [
sys.executable,
str(self.options.scraper_path),
target,
"--max-no-new",
str(max_no_new),
"--output-dir",
str(run_dir),
"--browser-provider",
self.options.browser_provider,
]
result = subprocess.run(
command,
cwd=Path.cwd(),
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=self.options.command_timeout_seconds,
)
output = "\n".join(part for part in [result.stdout, result.stderr] if part).strip()
if result.returncode != 0:
raise RuntimeError(f"Twitter scraper failed for {target}: {output[-1200:]}")
if "登录提示" in output or "未登录" in output or "login" in output.lower():
raise RuntimeError(
"Twitter scraper requires an authenticated X.com browser profile. "
"Run the configured social-media-scraper once with --keep-browser-open, "
"log in to X.com, then retry."
)
def _read_json(self, path: Path, *, expected: str) -> Any:
if not path.exists():
raise RuntimeError(f"Twitter scraper did not produce {expected}: {path}")
return json.loads(path.read_text(encoding="utf-8"))
def _reply_parent_urls(
self,
*,
timeline: list[dict[str, Any]],
full: bool,
existing_post_urls: Iterable[str],
) -> list[str]:
urls: list[str] = []
for item in timeline:
tweet_id = _tweet_id_from_item(item)
url = item.get("url") or (_tweet_url(self.options.username, tweet_id) if tweet_id else "")
if url and _is_original_post(item):
urls.append(str(url))
if not full:
urls.extend(str(url) for url in existing_post_urls if url)
seen: set[str] = set()
unique_urls: list[str] = []
for url in urls:
if url not in seen:
seen.add(url)
unique_urls.append(url)
limit = self.options.full_reply_post_limit if full else self.options.incremental_reply_parent_limit
if limit > 0:
return unique_urls[:limit]
return unique_urls
def _post_to_item(self, item: dict[str, Any]) -> RawItem:
tweet_id = _tweet_id_from_item(item) or ""
url = item.get("url") or _tweet_url(self.options.username, tweet_id)
author = _author_from_url(str(url)) or self.options.username
raw = dict(item)
raw["source_url"] = url
return RawItem(
source="twitter_posts",
source_item_id=f"post:{tweet_id}",
source_url=str(url),
content_type="twitter_post",
author_id=author,
author_name=author,
title=None,
published_at=parse_twitter_time(item.get("date")),
published_at_text=item.get("date"),
updated_at_source=None,
content=str(item.get("text") or ""),
raw=raw,
)
def _reply_to_item(self, item: dict[str, Any], *, parent_id: str, parent_url: str) -> RawItem:
tweet_id = _tweet_id_from_item(item) or ""
url = item.get("url") or _tweet_url(_author_from_url(parent_url) or self.options.username, tweet_id)
author = _author_from_url(str(url)) or str(item.get("in_reply_to") or "")
raw = dict(item)
raw["parent_tweet_id"] = parent_id
raw["parent_url"] = parent_url
raw["source_url"] = url
return RawItem(
source="twitter_replies",
source_item_id=f"reply:{tweet_id}",
source_url=str(url),
content_type="twitter_reply",
author_id=author or None,
author_name=author or None,
title=f"Reply to {parent_id}" if parent_id else None,
published_at=parse_twitter_time(item.get("date")),
published_at_text=item.get("date"),
updated_at_source=None,
content=str(item.get("text") or ""),
raw=raw,
)
def _include_by_time(self, item: dict[str, Any], since_ts: int | None) -> bool:
if since_ts is None:
return True
published_at = parse_twitter_time(item.get("date"))
if published_at is None:
return True
return published_at >= since_ts
def _id_from_url(self, url: str) -> str | None:
match = re.search(r"/status/(\d+)", url)
return match.group(1) if match else None