TH1/Tools/PlayerMultilingualReportViewer/player_multilingual_report_viewer.py
2026-05-29 17:57:24 +08:00

503 lines
21 KiB
Python

import base64
import datetime as dt
import hashlib
import hmac
import json
import os
import sys
import threading
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
import zipfile
from dataclasses import dataclass
from pathlib import Path
from tkinter import BOTH, END, LEFT, RIGHT, X, Y, BooleanVar, StringVar, Tk, messagebox
from tkinter import ttk
APP_DIR = Path(__file__).resolve().parent
TOOLS_DIR = APP_DIR.parent
if str(TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(TOOLS_DIR))
from oss_viewer_config import merge_default_oss_config, merge_local_oss_config
DATA_DIR = APP_DIR / "Data"
CONFIG_PATH = APP_DIR / "config.local.json"
OSS_PREFIX = "multilingualreport/"
def default_config() -> dict:
return {
"access_key_id": "",
"access_key_secret": "",
"endpoint": "oss-cn-shanghai.aliyuncs.com",
"bucket": "th1-oss",
"skip_existing_downloads": True,
}
def load_config() -> dict:
cfg = merge_default_oss_config(default_config())
if CONFIG_PATH.exists():
try:
cfg = merge_local_oss_config(cfg, json.loads(CONFIG_PATH.read_text(encoding="utf-8")))
except Exception:
pass
return cfg
def save_config(cfg: dict) -> None:
CONFIG_PATH.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
def human_bytes(value: int) -> str:
if value >= 1024 * 1024:
return f"{value / 1024 / 1024:.2f} MB"
if value >= 1024:
return f"{value / 1024:.1f} KB"
return f"{value} B"
def parse_oss_time(value: str) -> str:
if not value:
return ""
try:
return dt.datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone().strftime("%Y-%m-%d %H:%M")
except Exception:
return value[:16]
class OssClient:
def __init__(self, access_key_id: str, access_key_secret: str, endpoint: str, bucket: str):
self.access_key_id = access_key_id.strip()
self.access_key_secret = access_key_secret.strip()
self.endpoint = endpoint.strip()
self.bucket = bucket.strip()
def _sign(self, verb: str, date: str, canonicalized_resource: str) -> str:
string_to_sign = f"{verb}\n\n\n{date}\n{canonicalized_resource}"
digest = hmac.new(
self.access_key_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha1,
).digest()
return base64.b64encode(digest).decode("ascii")
def _request(self, verb: str, url: str, canonicalized_resource: str) -> bytes:
now = dt.datetime.now(dt.timezone.utc)
date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")
signature = self._sign(verb, date, canonicalized_resource)
request = urllib.request.Request(url, method=verb)
request.add_header("Date", date)
request.add_header("Authorization", f"OSS {self.access_key_id}:{signature}")
with urllib.request.urlopen(request, timeout=60) as response:
return response.read()
def list_objects(self, prefix: str) -> list[str]:
keys: list[str] = []
marker = ""
while True:
query = f"prefix={urllib.parse.quote(prefix)}&max-keys=1000"
canonicalized_resource = f"/{self.bucket}/"
if marker:
query += f"&marker={urllib.parse.quote(marker)}"
url = f"https://{self.bucket}.{self.endpoint}/?{query}"
xml_bytes = self._request("GET", url, canonicalized_resource)
root = ET.fromstring(xml_bytes)
ns = ""
if root.tag.startswith("{"):
ns = root.tag.split("}", 1)[0] + "}"
for content in root.findall(f"{ns}Contents"):
key = content.findtext(f"{ns}Key") or ""
if key and not key.endswith("/"):
keys.append(key)
is_truncated = (root.findtext(f"{ns}IsTruncated") or "").lower() == "true"
marker = root.findtext(f"{ns}NextMarker") or (keys[-1] if keys else "")
if not is_truncated or not marker:
break
return keys
def download_object(self, object_key: str, local_path: Path) -> None:
encoded_key = "/".join(urllib.parse.quote(part) for part in object_key.split("/"))
url = f"https://{self.bucket}.{self.endpoint}/{encoded_key}"
canonicalized_resource = f"/{self.bucket}/{object_key}"
data = self._request("GET", url, canonicalized_resource)
local_path.parent.mkdir(parents=True, exist_ok=True)
local_path.write_bytes(data)
@dataclass
class ReportEntry:
local_path: Path
object_key: str
report_id: str
version: str
steam_id: str
created_at_utc: str
multilingual_id: str
language: str
reported_text: str
description: str
resolved_text: str
size: int
manifest: dict
def local_path_for_key(object_key: str) -> Path:
relative = object_key[len(OSS_PREFIX):] if object_key.startswith(OSS_PREFIX) else object_key
return DATA_DIR / Path(*relative.split("/"))
def read_report(local_path: Path) -> ReportEntry | None:
try:
with zipfile.ZipFile(local_path, "r") as zf:
manifest = json.loads(zf.read("manifest.json").decode("utf-8"))
reported_text = manifest.get("reportedText", "")
if not reported_text and "reported_text.txt" in zf.namelist():
reported_text = zf.read("reported_text.txt").decode("utf-8", errors="replace")
description = manifest.get("description", "")
if not description and "description.txt" in zf.namelist():
description = zf.read("description.txt").decode("utf-8", errors="replace")
parts = local_path.relative_to(DATA_DIR).parts
version = manifest.get("version") or (parts[0] if len(parts) > 0 else "")
steam_id = manifest.get("steamId") or (parts[1] if len(parts) > 1 else "")
object_key = OSS_PREFIX + "/".join(parts).replace("\\", "/")
multilingual_id = str(manifest.get("multilingualIdText") or manifest.get("multilingualId") or "")
return ReportEntry(
local_path=local_path,
object_key=object_key,
report_id=manifest.get("reportId", local_path.stem),
version=version,
steam_id=steam_id,
created_at_utc=manifest.get("createdAtUtc", ""),
multilingual_id=multilingual_id,
language=manifest.get("language", ""),
reported_text=reported_text,
description=description,
resolved_text=manifest.get("resolvedText", ""),
size=local_path.stat().st_size,
manifest=manifest,
)
except Exception:
return None
def load_reports() -> list[ReportEntry]:
DATA_DIR.mkdir(parents=True, exist_ok=True)
reports = [entry for entry in (read_report(path) for path in DATA_DIR.rglob("*.zip")) if entry]
reports.sort(key=lambda item: item.created_at_utc or str(item.local_path.stat().st_mtime), reverse=True)
return reports
def download_reports(cfg: dict) -> tuple[int, int, int]:
client = OssClient(
cfg["access_key_id"],
cfg["access_key_secret"],
cfg["endpoint"],
cfg["bucket"],
)
keys = [key for key in client.list_objects(OSS_PREFIX) if key.endswith(".zip")]
downloaded = 0
skipped = 0
failed = 0
for key in keys:
local_path = local_path_for_key(key)
if cfg.get("skip_existing_downloads", True) and local_path.exists():
skipped += 1
continue
try:
client.download_object(key, local_path)
downloaded += 1
except Exception:
failed += 1
return downloaded, skipped, failed
class PlayerMultilingualReportViewer(Tk):
def __init__(self):
super().__init__()
self.title("TH1 玩家多语言汇报查看器")
self.geometry("1120x720")
self.minsize(900, 560)
self.cfg = load_config()
self.reports: list[ReportEntry] = []
self.filtered_reports: list[ReportEntry] = []
self.selected_report: ReportEntry | None = None
self.access_key_id = StringVar(value=self.cfg.get("access_key_id", ""))
self.access_key_secret = StringVar(value=self.cfg.get("access_key_secret", ""))
self.endpoint = StringVar(value=self.cfg.get("endpoint", "oss-cn-shanghai.aliyuncs.com"))
self.bucket = StringVar(value=self.cfg.get("bucket", "th1-oss"))
self.skip_existing = BooleanVar(value=bool(self.cfg.get("skip_existing_downloads", True)))
self.version_filter = StringVar(value="全部版本")
self.language_filter = StringVar(value="全部语种")
self.steam_filter = StringVar(value="")
self.id_filter = StringVar(value="")
self.status_text = StringVar(value="就绪")
self.reported_text_box = None
self.description_box = None
self.resolved_text_box = None
self._build_ui()
self.refresh_local_reports()
def _build_ui(self) -> None:
config_frame = ttk.LabelFrame(self, text="OSS")
config_frame.pack(fill=X, padx=10, pady=(10, 6))
ttk.Label(config_frame, text="AccessKey ID").grid(row=0, column=0, sticky="w", padx=8, pady=4)
ttk.Entry(config_frame, textvariable=self.access_key_id, width=32).grid(row=0, column=1, sticky="ew", padx=4)
ttk.Label(config_frame, text="AccessKey Secret").grid(row=0, column=2, sticky="w", padx=8)
ttk.Entry(config_frame, textvariable=self.access_key_secret, show="*", width=32).grid(row=0, column=3, sticky="ew", padx=4)
ttk.Label(config_frame, text="Endpoint").grid(row=1, column=0, sticky="w", padx=8, pady=4)
ttk.Entry(config_frame, textvariable=self.endpoint, width=32).grid(row=1, column=1, sticky="ew", padx=4)
ttk.Label(config_frame, text="Bucket").grid(row=1, column=2, sticky="w", padx=8)
ttk.Entry(config_frame, textvariable=self.bucket, width=32).grid(row=1, column=3, sticky="ew", padx=4)
ttk.Checkbutton(config_frame, text="跳过已下载 zip", variable=self.skip_existing).grid(row=2, column=1, sticky="w", padx=4)
ttk.Button(config_frame, text="保存配置", command=self.save_current_config).grid(row=2, column=2, sticky="e", padx=4, pady=4)
ttk.Button(config_frame, text="更新内容", command=self.update_from_oss).grid(row=2, column=3, sticky="w", padx=4, pady=4)
config_frame.columnconfigure(1, weight=1)
config_frame.columnconfigure(3, weight=1)
filter_frame = ttk.Frame(self)
filter_frame.pack(fill=X, padx=10, pady=(0, 6))
ttk.Label(filter_frame, text="版本").pack(side=LEFT)
self.version_combo = ttk.Combobox(filter_frame, textvariable=self.version_filter, state="readonly", width=16)
self.version_combo.pack(side=LEFT, padx=(4, 10))
self.version_combo.bind("<<ComboboxSelected>>", lambda _event: self.apply_filters())
ttk.Label(filter_frame, text="语种").pack(side=LEFT)
self.language_combo = ttk.Combobox(filter_frame, textvariable=self.language_filter, state="readonly", width=12)
self.language_combo.pack(side=LEFT, padx=(4, 10))
self.language_combo.bind("<<ComboboxSelected>>", lambda _event: self.apply_filters())
ttk.Label(filter_frame, text="ID").pack(side=LEFT)
id_entry = ttk.Entry(filter_frame, textvariable=self.id_filter, width=12)
id_entry.pack(side=LEFT, padx=4)
id_entry.bind("<KeyRelease>", lambda _event: self.apply_filters())
ttk.Label(filter_frame, text="SteamID").pack(side=LEFT)
steam_entry = ttk.Entry(filter_frame, textvariable=self.steam_filter, width=22)
steam_entry.pack(side=LEFT, padx=4)
steam_entry.bind("<KeyRelease>", lambda _event: self.apply_filters())
ttk.Button(filter_frame, text="刷新本地", command=self.refresh_local_reports).pack(side=LEFT, padx=8)
ttk.Button(filter_frame, text="打开下载目录", command=self.open_data_dir).pack(side=LEFT)
pane = ttk.PanedWindow(self, orient="horizontal")
pane.pack(fill=BOTH, expand=True, padx=10, pady=(0, 6))
list_frame = ttk.Frame(pane)
self.tree = ttk.Treeview(
list_frame,
columns=("created", "version", "language", "mid", "steam", "size"),
show="headings",
selectmode="browse",
)
for col, text, width in (
("created", "时间", 140),
("version", "版本", 90),
("language", "语种", 80),
("mid", "ID", 80),
("steam", "SteamID", 150),
("size", "大小", 80),
):
self.tree.heading(col, text=text)
self.tree.column(col, width=width, anchor="w")
self.tree.pack(side=LEFT, fill=BOTH, expand=True)
scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.tree.yview)
scrollbar.pack(side=RIGHT, fill=Y)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.bind("<<TreeviewSelect>>", self.on_select_report)
pane.add(list_frame, weight=3)
preview_frame = ttk.Frame(pane)
self.preview = ttk.Treeview(preview_frame, columns=("field", "value"), show="headings", height=11)
self.preview.heading("field", text="字段")
self.preview.heading("value", text="内容")
self.preview.column("field", width=110, anchor="w")
self.preview.column("value", width=440, anchor="w")
self.preview.pack(fill=X)
self._build_text_boxes(preview_frame)
pane.add(preview_frame, weight=4)
status = ttk.Label(self, textvariable=self.status_text, anchor="w")
status.pack(fill=X, padx=10, pady=(0, 8))
def _build_text_boxes(self, parent) -> None:
import tkinter as tk
reported_frame = ttk.LabelFrame(parent, text="玩家选择的字符串")
reported_frame.pack(fill=BOTH, expand=True, pady=(8, 0))
self.reported_text_box = tk.Text(reported_frame, wrap="word", height=8)
self.reported_text_box.pack(fill=BOTH, expand=True)
self.reported_text_box.configure(state="disabled")
description_frame = ttk.LabelFrame(parent, text="玩家自述")
description_frame.pack(fill=BOTH, expand=True, pady=(8, 0))
self.description_box = tk.Text(description_frame, wrap="word", height=8)
self.description_box.pack(fill=BOTH, expand=True)
self.description_box.configure(state="disabled")
resolved_frame = ttk.LabelFrame(parent, text="当前解析文本")
resolved_frame.pack(fill=BOTH, expand=True, pady=(8, 0))
self.resolved_text_box = tk.Text(resolved_frame, wrap="word", height=8)
self.resolved_text_box.pack(fill=BOTH, expand=True)
self.resolved_text_box.configure(state="disabled")
def current_config(self) -> dict:
return {
"access_key_id": self.access_key_id.get(),
"access_key_secret": self.access_key_secret.get(),
"endpoint": self.endpoint.get(),
"bucket": self.bucket.get(),
"skip_existing_downloads": self.skip_existing.get(),
}
def save_current_config(self) -> None:
self.cfg = self.current_config()
save_config(self.cfg)
self.status_text.set(f"配置已保存到 {CONFIG_PATH}")
def update_from_oss(self) -> None:
cfg = self.current_config()
if not cfg["access_key_id"] or not cfg["access_key_secret"]:
messagebox.showerror("缺少配置", "请先填写 AccessKey ID 和 AccessKey Secret。")
return
self.save_current_config()
self.status_text.set("正在从 OSS 更新多语言汇报...")
self._run_worker(lambda: download_reports(cfg), self._after_download)
def _after_download(self, result) -> None:
if isinstance(result, Exception):
messagebox.showerror("更新失败", str(result))
self.status_text.set(f"更新失败: {result}")
return
downloaded, skipped, failed = result
self.refresh_local_reports()
self.status_text.set(f"更新完成: 下载 {downloaded}, 跳过 {skipped}, 失败 {failed};本地缓存 {len(self.reports)}")
def refresh_local_reports(self) -> None:
self.reports = load_reports()
versions = ["全部版本"] + sorted({entry.version for entry in self.reports if entry.version}, reverse=True)
languages = ["全部语种"] + sorted({entry.language for entry in self.reports if entry.language})
self.version_combo.configure(values=versions)
self.language_combo.configure(values=languages)
if self.version_filter.get() not in versions:
self.version_filter.set("全部版本")
if self.language_filter.get() not in languages:
self.language_filter.set("全部语种")
self.apply_filters()
def apply_filters(self) -> None:
version = self.version_filter.get()
language = self.language_filter.get()
steam = self.steam_filter.get().strip()
multilingual_id = self.id_filter.get().strip()
self.filtered_reports = []
for report in self.reports:
if version and version != "全部版本" and report.version != version:
continue
if language and language != "全部语种" and report.language != language:
continue
if steam and steam not in report.steam_id:
continue
if multilingual_id and multilingual_id not in report.multilingual_id:
continue
self.filtered_reports.append(report)
for item in self.tree.get_children():
self.tree.delete(item)
for index, report in enumerate(self.filtered_reports):
self.tree.insert(
"",
END,
iid=str(index),
values=(
parse_oss_time(report.created_at_utc),
report.version,
report.language,
report.multilingual_id,
report.steam_id,
human_bytes(report.size),
),
)
self.status_text.set(f"本地缓存 {len(self.reports)} 条,当前筛选 {len(self.filtered_reports)}")
def on_select_report(self, _event=None) -> None:
selection = self.tree.selection()
if not selection:
self.selected_report = None
return
index = int(selection[0])
if index < 0 or index >= len(self.filtered_reports):
return
self.selected_report = self.filtered_reports[index]
self.render_preview(self.selected_report)
def render_preview(self, report: ReportEntry) -> None:
for item in self.preview.get_children():
self.preview.delete(item)
rows = [
("ReportID", report.report_id),
("多语言ID", report.multilingual_id),
("语种", report.language),
("版本", report.version),
("SteamID", report.steam_id),
("时间(UTC)", parse_oss_time(report.created_at_utc)),
("时间(本地)", report.manifest.get("createdAtLocal", "")),
("时区", report.manifest.get("timezone", "")),
("CrashSight设备ID", report.manifest.get("crashSightDeviceId", "")),
("设备名", report.manifest.get("deviceName", "")),
("设备型号", report.manifest.get("deviceModel", "")),
("系统", report.manifest.get("operatingSystem", "")),
("CPU", report.manifest.get("processorType", "")),
("内存", f"{report.manifest.get('systemMemorySizeMb', '')} MB"),
("显卡", report.manifest.get("graphicsDeviceName", "")),
("显存", f"{report.manifest.get('graphicsMemorySizeMb', '')} MB"),
("OSS Key", report.object_key),
("本地文件", str(report.local_path)),
]
for field, value in rows:
self.preview.insert("", END, values=(field, value))
self._set_text(self.reported_text_box, report.reported_text)
self._set_text(self.description_box, report.description)
self._set_text(self.resolved_text_box, report.resolved_text)
@staticmethod
def _set_text(widget, value: str) -> None:
widget.configure(state="normal")
widget.delete("1.0", END)
widget.insert("1.0", value or "")
widget.configure(state="disabled")
def open_data_dir(self) -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
os.startfile(DATA_DIR)
def _run_worker(self, func, callback) -> None:
def runner():
try:
result = func()
except Exception as exc:
result = exc
self.after(0, lambda: callback(result))
threading.Thread(target=runner, daemon=True).start()
if __name__ == "__main__":
PlayerMultilingualReportViewer().mainloop()