2026-05-30 23:30:55 +08:00

322 lines
11 KiB
Python

from __future__ import annotations
from hashlib import sha1
import re
import time
from typing import Any, Iterable
from urllib.parse import parse_qs, quote, urljoin, urlparse
from bs4 import BeautifulSoup
import httpx
from .models import RawItem
STEAM_STORE = "https://store.steampowered.com"
STEAM_COMMUNITY = "https://steamcommunity.com"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/125.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,ja;q=0.7",
}
def content_hash(text: str) -> str:
return sha1(text.encode("utf-8", errors="ignore")).hexdigest()
def _text(node: Any) -> str:
return node.get_text(separator="\n", strip=True) if node else ""
def _abs_url(url: str) -> str:
return urljoin(STEAM_COMMUNITY, url)
def _topic_id_from_url(url: str) -> str:
match = re.search(r"/discussions/[^/]+/(\d+)", url)
if match:
return match.group(1)
return content_hash(url)
def _reply_id(comment: Any, topic_id: str, author: str, timestamp: str, text: str) -> str:
node_id = comment.get("id", "")
if node_id:
return node_id
data_id = comment.get("data-commentid", "")
if data_id:
return data_id
return f"{topic_id}:{content_hash(author + timestamp + text)}"
def parse_steam_time(text: str | None, now: int | None = None) -> int | None:
if not text:
return None
value = text.strip()
now_ts = now or int(time.time())
relative = re.match(r"^(\d+)\s*(分钟|小时|天|minute|minutes|hour|hours|day|days)\s*(以前|ago)?$", value, re.I)
if relative:
amount = int(relative.group(1))
unit = relative.group(2).lower()
seconds = {
"分钟": 60,
"minute": 60,
"minutes": 60,
"小时": 3600,
"hour": 3600,
"hours": 3600,
"": 86400,
"day": 86400,
"days": 86400,
}[unit]
return now_ts - amount * seconds
absolute = re.match(
r"^(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*(上午|下午)\s*(\d{1,2}):(\d{2})$",
value,
)
if absolute:
current = time.localtime(now_ts)
return _make_ts(
current.tm_year,
int(absolute.group(1)),
int(absolute.group(2)),
absolute.group(3),
int(absolute.group(4)),
int(absolute.group(5)),
)
absolute_with_year = re.match(
r"^(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*(上午|下午)\s*(\d{1,2}):(\d{2})$",
value,
)
if absolute_with_year:
return _make_ts(
int(absolute_with_year.group(1)),
int(absolute_with_year.group(2)),
int(absolute_with_year.group(3)),
absolute_with_year.group(4),
int(absolute_with_year.group(5)),
int(absolute_with_year.group(6)),
)
return None
def _make_ts(year: int, month: int, day: int, ampm: str, hour: int, minute: int) -> int:
if ampm == "下午" and hour != 12:
hour += 12
if ampm == "上午" and hour == 12:
hour = 0
return int(time.mktime((year, month, day, hour, minute, 0, -1, -1, -1)))
class SteamClient:
def __init__(self, app_id: str) -> None:
self.app_id = app_id
self.client = httpx.Client(headers=HEADERS, timeout=30, follow_redirects=True)
self.client.cookies.set("birthtime", "568022401", domain="steamcommunity.com")
def close(self) -> None:
self.client.close()
def fetch_reviews(self, max_pages: int | None = None) -> list[RawItem]:
cursor = "*"
page = 0
items: list[RawItem] = []
while True:
params = {
"json": "1",
"num_per_page": "100",
"language": "all",
"filter": "recent",
"purchase_type": "all",
"cursor": cursor,
}
response = self.client.get(f"{STEAM_STORE}/appreviews/{self.app_id}", params=params)
response.raise_for_status()
data = response.json()
reviews = data.get("reviews") or []
if not reviews:
break
for review in reviews:
items.append(self._review_to_item(review))
new_cursor = data.get("cursor") or cursor
page += 1
if new_cursor == cursor:
break
if max_pages and page >= max_pages:
break
cursor = new_cursor
time.sleep(0.25)
return items
def fetch_discussions(self, full: bool, max_pages: int, time_limit_seconds: int) -> list[RawItem]:
started = time.monotonic()
topic_urls: list[str] = []
seen_urls: set[str] = set()
for page in range(1, max_pages + 1):
if time.monotonic() - started > time_limit_seconds:
break
url = f"{STEAM_COMMUNITY}/app/{self.app_id}/discussions/"
if page > 1:
url = f"{url}?fp={page}"
html = self._get_text(url)
urls = self._extract_topic_urls(html)
new_urls = [u for u in urls if u not in seen_urls]
if not new_urls:
break
topic_urls.extend(new_urls)
seen_urls.update(new_urls)
if not full and page >= max_pages:
break
time.sleep(0.25)
items: list[RawItem] = []
for url in topic_urls:
if time.monotonic() - started > time_limit_seconds:
break
items.extend(self.fetch_discussion_topic(url))
time.sleep(0.35)
return items
def fetch_discussion_topic(self, url: str) -> list[RawItem]:
html = self._get_text(url)
soup = BeautifulSoup(html, "html.parser")
topic_id = _topic_id_from_url(url)
title = _text(soup.select_one("div.topic")) or _text(soup.select_one(".forum_topic_name"))
items: list[RawItem] = []
op = soup.select_one(".forum_op")
if op:
author_el = op.select_one(".authorline a")
date_el = op.select_one(".date")
date_text = _text(date_el)
content_el = op.select_one(".content")
author = _text(author_el)
content = _text(content_el)
source_url = url
if content:
items.append(
RawItem(
source="steam_discussions",
source_item_id=f"topic:{topic_id}",
source_url=source_url,
content_type="discussion_topic",
author_id=self._steam_id_from_author(author_el),
author_name=author,
title=title,
published_at=parse_steam_time(date_text),
published_at_text=date_text,
updated_at_source=None,
content=content,
raw={
"topic_id": topic_id,
"topic_url": url,
"title": title,
"author": author,
"date": date_text,
"content": content,
},
)
)
for comment in soup.select(".commentthread_comment"):
author_el = comment.select_one(".commentthread_author_link")
date_el = comment.select_one(".commentthread_comment_timestamp")
text_el = comment.select_one(".commentthread_comment_text")
text = _text(text_el)
if not text:
continue
author = _text(author_el)
timestamp = _text(date_el)
reply_id = _reply_id(comment, topic_id, author, timestamp, text)
reply_url = f"{url}#{reply_id}" if reply_id else url
items.append(
RawItem(
source="steam_discussions",
source_item_id=f"reply:{topic_id}:{reply_id}",
source_url=reply_url,
content_type="discussion_reply",
author_id=self._steam_id_from_author(author_el),
author_name=author,
title=title,
published_at=parse_steam_time(timestamp),
published_at_text=timestamp,
updated_at_source=None,
content=text,
raw={
"topic_id": topic_id,
"topic_url": url,
"reply_id": reply_id,
"reply_url": reply_url,
"title": title,
"reply_author": author,
"reply_time_text": timestamp,
"reply_content": text,
},
)
)
return items
def _review_to_item(self, review: dict[str, Any]) -> RawItem:
author = review.get("author") or {}
steam_id = str(author.get("steamid") or "")
recommendation_id = str(review.get("recommendationid"))
source_url = f"{STEAM_COMMUNITY}/profiles/{steam_id}/recommended/{self.app_id}/"
raw = dict(review)
raw["source_url"] = source_url
return RawItem(
source="steam_reviews",
source_item_id=f"review:{recommendation_id}",
source_url=source_url,
content_type="review",
author_id=steam_id or None,
author_name=author.get("personaname"),
title=None,
published_at=review.get("timestamp_created"),
published_at_text=None,
updated_at_source=review.get("timestamp_updated"),
content=review.get("review") or "",
raw=raw,
)
def _get_text(self, url: str) -> str:
response = self.client.get(url)
response.raise_for_status()
response.encoding = "utf-8"
return response.text
def _extract_topic_urls(self, html: str) -> list[str]:
soup = BeautifulSoup(html, "html.parser")
urls: list[str] = []
for link in soup.select("a.forum_topic_overlay, a.forum_topic_name"):
href = link.get("href")
if not href:
continue
url = _abs_url(href).split("?")[0]
if f"/app/{self.app_id}/discussions/" in url and url not in urls:
urls.append(url)
return urls
def _steam_id_from_author(self, author_el: Any) -> str | None:
if not author_el:
return None
href = author_el.get("href") or ""
parsed = urlparse(href)
if "/profiles/" in parsed.path:
return parsed.path.rstrip("/").split("/")[-1]
if "/id/" in parsed.path:
return parsed.path.rstrip("/").split("/")[-1]
query = parse_qs(parsed.query)
steam_id = query.get("steamid")
return steam_id[0] if steam_id else None
def iter_nonempty(items: Iterable[RawItem]) -> Iterable[RawItem]:
for item in items:
if item.content.strip():
yield item