322 lines
11 KiB
Python
322 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
from hashlib import sha1
|
|
import re
|
|
import time
|
|
from typing import Any, Iterable
|
|
from urllib.parse import parse_qs, quote, urljoin, urlparse
|
|
|
|
from bs4 import BeautifulSoup
|
|
import httpx
|
|
|
|
from .models import RawItem
|
|
|
|
|
|
STEAM_STORE = "https://store.steampowered.com"
|
|
STEAM_COMMUNITY = "https://steamcommunity.com"
|
|
|
|
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/125.0 Safari/537.36",
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,ja;q=0.7",
|
|
}
|
|
|
|
|
|
def content_hash(text: str) -> str:
|
|
return sha1(text.encode("utf-8", errors="ignore")).hexdigest()
|
|
|
|
|
|
def _text(node: Any) -> str:
|
|
return node.get_text(separator="\n", strip=True) if node else ""
|
|
|
|
|
|
def _abs_url(url: str) -> str:
|
|
return urljoin(STEAM_COMMUNITY, url)
|
|
|
|
|
|
def _topic_id_from_url(url: str) -> str:
|
|
match = re.search(r"/discussions/[^/]+/(\d+)", url)
|
|
if match:
|
|
return match.group(1)
|
|
return content_hash(url)
|
|
|
|
|
|
def _reply_id(comment: Any, topic_id: str, author: str, timestamp: str, text: str) -> str:
|
|
node_id = comment.get("id", "")
|
|
if node_id:
|
|
return node_id
|
|
data_id = comment.get("data-commentid", "")
|
|
if data_id:
|
|
return data_id
|
|
return f"{topic_id}:{content_hash(author + timestamp + text)}"
|
|
|
|
|
|
def parse_steam_time(text: str | None, now: int | None = None) -> int | None:
|
|
if not text:
|
|
return None
|
|
value = text.strip()
|
|
now_ts = now or int(time.time())
|
|
relative = re.match(r"^(\d+)\s*(分钟|小时|天|minute|minutes|hour|hours|day|days)\s*(以前|ago)?$", value, re.I)
|
|
if relative:
|
|
amount = int(relative.group(1))
|
|
unit = relative.group(2).lower()
|
|
seconds = {
|
|
"分钟": 60,
|
|
"minute": 60,
|
|
"minutes": 60,
|
|
"小时": 3600,
|
|
"hour": 3600,
|
|
"hours": 3600,
|
|
"天": 86400,
|
|
"day": 86400,
|
|
"days": 86400,
|
|
}[unit]
|
|
return now_ts - amount * seconds
|
|
|
|
absolute = re.match(
|
|
r"^(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*(上午|下午)\s*(\d{1,2}):(\d{2})$",
|
|
value,
|
|
)
|
|
if absolute:
|
|
current = time.localtime(now_ts)
|
|
return _make_ts(
|
|
current.tm_year,
|
|
int(absolute.group(1)),
|
|
int(absolute.group(2)),
|
|
absolute.group(3),
|
|
int(absolute.group(4)),
|
|
int(absolute.group(5)),
|
|
)
|
|
|
|
absolute_with_year = re.match(
|
|
r"^(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*(上午|下午)\s*(\d{1,2}):(\d{2})$",
|
|
value,
|
|
)
|
|
if absolute_with_year:
|
|
return _make_ts(
|
|
int(absolute_with_year.group(1)),
|
|
int(absolute_with_year.group(2)),
|
|
int(absolute_with_year.group(3)),
|
|
absolute_with_year.group(4),
|
|
int(absolute_with_year.group(5)),
|
|
int(absolute_with_year.group(6)),
|
|
)
|
|
return None
|
|
|
|
|
|
def _make_ts(year: int, month: int, day: int, ampm: str, hour: int, minute: int) -> int:
|
|
if ampm == "下午" and hour != 12:
|
|
hour += 12
|
|
if ampm == "上午" and hour == 12:
|
|
hour = 0
|
|
return int(time.mktime((year, month, day, hour, minute, 0, -1, -1, -1)))
|
|
|
|
|
|
class SteamClient:
|
|
def __init__(self, app_id: str) -> None:
|
|
self.app_id = app_id
|
|
self.client = httpx.Client(headers=HEADERS, timeout=30, follow_redirects=True)
|
|
self.client.cookies.set("birthtime", "568022401", domain="steamcommunity.com")
|
|
|
|
def close(self) -> None:
|
|
self.client.close()
|
|
|
|
def fetch_reviews(self, max_pages: int | None = None) -> list[RawItem]:
|
|
cursor = "*"
|
|
page = 0
|
|
items: list[RawItem] = []
|
|
while True:
|
|
params = {
|
|
"json": "1",
|
|
"num_per_page": "100",
|
|
"language": "all",
|
|
"filter": "recent",
|
|
"purchase_type": "all",
|
|
"cursor": cursor,
|
|
}
|
|
response = self.client.get(f"{STEAM_STORE}/appreviews/{self.app_id}", params=params)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
reviews = data.get("reviews") or []
|
|
if not reviews:
|
|
break
|
|
for review in reviews:
|
|
items.append(self._review_to_item(review))
|
|
new_cursor = data.get("cursor") or cursor
|
|
page += 1
|
|
if new_cursor == cursor:
|
|
break
|
|
if max_pages and page >= max_pages:
|
|
break
|
|
cursor = new_cursor
|
|
time.sleep(0.25)
|
|
return items
|
|
|
|
def fetch_discussions(self, full: bool, max_pages: int, time_limit_seconds: int) -> list[RawItem]:
|
|
started = time.monotonic()
|
|
topic_urls: list[str] = []
|
|
seen_urls: set[str] = set()
|
|
for page in range(1, max_pages + 1):
|
|
if time.monotonic() - started > time_limit_seconds:
|
|
break
|
|
url = f"{STEAM_COMMUNITY}/app/{self.app_id}/discussions/"
|
|
if page > 1:
|
|
url = f"{url}?fp={page}"
|
|
html = self._get_text(url)
|
|
urls = self._extract_topic_urls(html)
|
|
new_urls = [u for u in urls if u not in seen_urls]
|
|
if not new_urls:
|
|
break
|
|
topic_urls.extend(new_urls)
|
|
seen_urls.update(new_urls)
|
|
if not full and page >= max_pages:
|
|
break
|
|
time.sleep(0.25)
|
|
|
|
items: list[RawItem] = []
|
|
for url in topic_urls:
|
|
if time.monotonic() - started > time_limit_seconds:
|
|
break
|
|
items.extend(self.fetch_discussion_topic(url))
|
|
time.sleep(0.35)
|
|
return items
|
|
|
|
def fetch_discussion_topic(self, url: str) -> list[RawItem]:
|
|
html = self._get_text(url)
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
topic_id = _topic_id_from_url(url)
|
|
title = _text(soup.select_one("div.topic")) or _text(soup.select_one(".forum_topic_name"))
|
|
items: list[RawItem] = []
|
|
|
|
op = soup.select_one(".forum_op")
|
|
if op:
|
|
author_el = op.select_one(".authorline a")
|
|
date_el = op.select_one(".date")
|
|
date_text = _text(date_el)
|
|
content_el = op.select_one(".content")
|
|
author = _text(author_el)
|
|
content = _text(content_el)
|
|
source_url = url
|
|
if content:
|
|
items.append(
|
|
RawItem(
|
|
source="steam_discussions",
|
|
source_item_id=f"topic:{topic_id}",
|
|
source_url=source_url,
|
|
content_type="discussion_topic",
|
|
author_id=self._steam_id_from_author(author_el),
|
|
author_name=author,
|
|
title=title,
|
|
published_at=parse_steam_time(date_text),
|
|
published_at_text=date_text,
|
|
updated_at_source=None,
|
|
content=content,
|
|
raw={
|
|
"topic_id": topic_id,
|
|
"topic_url": url,
|
|
"title": title,
|
|
"author": author,
|
|
"date": date_text,
|
|
"content": content,
|
|
},
|
|
)
|
|
)
|
|
|
|
for comment in soup.select(".commentthread_comment"):
|
|
author_el = comment.select_one(".commentthread_author_link")
|
|
date_el = comment.select_one(".commentthread_comment_timestamp")
|
|
text_el = comment.select_one(".commentthread_comment_text")
|
|
text = _text(text_el)
|
|
if not text:
|
|
continue
|
|
author = _text(author_el)
|
|
timestamp = _text(date_el)
|
|
reply_id = _reply_id(comment, topic_id, author, timestamp, text)
|
|
reply_url = f"{url}#{reply_id}" if reply_id else url
|
|
items.append(
|
|
RawItem(
|
|
source="steam_discussions",
|
|
source_item_id=f"reply:{topic_id}:{reply_id}",
|
|
source_url=reply_url,
|
|
content_type="discussion_reply",
|
|
author_id=self._steam_id_from_author(author_el),
|
|
author_name=author,
|
|
title=title,
|
|
published_at=parse_steam_time(timestamp),
|
|
published_at_text=timestamp,
|
|
updated_at_source=None,
|
|
content=text,
|
|
raw={
|
|
"topic_id": topic_id,
|
|
"topic_url": url,
|
|
"reply_id": reply_id,
|
|
"reply_url": reply_url,
|
|
"title": title,
|
|
"reply_author": author,
|
|
"reply_time_text": timestamp,
|
|
"reply_content": text,
|
|
},
|
|
)
|
|
)
|
|
return items
|
|
|
|
def _review_to_item(self, review: dict[str, Any]) -> RawItem:
|
|
author = review.get("author") or {}
|
|
steam_id = str(author.get("steamid") or "")
|
|
recommendation_id = str(review.get("recommendationid"))
|
|
source_url = f"{STEAM_COMMUNITY}/profiles/{steam_id}/recommended/{self.app_id}/"
|
|
raw = dict(review)
|
|
raw["source_url"] = source_url
|
|
return RawItem(
|
|
source="steam_reviews",
|
|
source_item_id=f"review:{recommendation_id}",
|
|
source_url=source_url,
|
|
content_type="review",
|
|
author_id=steam_id or None,
|
|
author_name=author.get("personaname"),
|
|
title=None,
|
|
published_at=review.get("timestamp_created"),
|
|
published_at_text=None,
|
|
updated_at_source=review.get("timestamp_updated"),
|
|
content=review.get("review") or "",
|
|
raw=raw,
|
|
)
|
|
|
|
def _get_text(self, url: str) -> str:
|
|
response = self.client.get(url)
|
|
response.raise_for_status()
|
|
response.encoding = "utf-8"
|
|
return response.text
|
|
|
|
def _extract_topic_urls(self, html: str) -> list[str]:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
urls: list[str] = []
|
|
for link in soup.select("a.forum_topic_overlay, a.forum_topic_name"):
|
|
href = link.get("href")
|
|
if not href:
|
|
continue
|
|
url = _abs_url(href).split("?")[0]
|
|
if f"/app/{self.app_id}/discussions/" in url and url not in urls:
|
|
urls.append(url)
|
|
return urls
|
|
|
|
def _steam_id_from_author(self, author_el: Any) -> str | None:
|
|
if not author_el:
|
|
return None
|
|
href = author_el.get("href") or ""
|
|
parsed = urlparse(href)
|
|
if "/profiles/" in parsed.path:
|
|
return parsed.path.rstrip("/").split("/")[-1]
|
|
if "/id/" in parsed.path:
|
|
return parsed.path.rstrip("/").split("/")[-1]
|
|
query = parse_qs(parsed.query)
|
|
steam_id = query.get("steamid")
|
|
return steam_id[0] if steam_id else None
|
|
|
|
|
|
def iter_nonempty(items: Iterable[RawItem]) -> Iterable[RawItem]:
|
|
for item in items:
|
|
if item.content.strip():
|
|
yield item
|