TH1/Tools/PlayerQuestionnaireViewer/player_questionnaire_viewer.py

505 lines
21 KiB
Python

import base64
import datetime as dt
import hashlib
import hmac
import json
import os
import sys
import threading
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from tkinter import BOTH, END, LEFT, RIGHT, X, Y, BooleanVar, StringVar, Tk, messagebox
from tkinter import ttk
APP_DIR = Path(__file__).resolve().parent
TOOLS_DIR = APP_DIR.parent
if str(TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(TOOLS_DIR))
from oss_viewer_config import merge_default_oss_config, merge_local_oss_config
DATA_DIR = APP_DIR / "Data"
CONFIG_PATH = APP_DIR / "config.local.json"
OSS_PREFIX = "questionnaire/"
def default_config() -> dict:
return {
"access_key_id": "",
"access_key_secret": "",
"endpoint": "oss-cn-shanghai.aliyuncs.com",
"bucket": "th1-oss",
"skip_existing_downloads": True,
}
def load_config() -> dict:
cfg = merge_default_oss_config(default_config())
if CONFIG_PATH.exists():
try:
cfg = merge_local_oss_config(cfg, json.loads(CONFIG_PATH.read_text(encoding="utf-8")))
except Exception:
pass
return cfg
def save_config(cfg: dict) -> None:
CONFIG_PATH.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
def human_bytes(value: int) -> str:
if value >= 1024 * 1024:
return f"{value / 1024 / 1024:.2f} MB"
if value >= 1024:
return f"{value / 1024:.1f} KB"
return f"{value} B"
def parse_oss_time(value: str) -> str:
if not value:
return ""
try:
return dt.datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone().strftime("%Y-%m-%d %H:%M")
except Exception:
return value[:16]
class OssClient:
def __init__(self, access_key_id: str, access_key_secret: str, endpoint: str, bucket: str):
self.access_key_id = access_key_id.strip()
self.access_key_secret = access_key_secret.strip()
self.endpoint = endpoint.strip()
self.bucket = bucket.strip()
def _sign(self, verb: str, date: str, canonicalized_resource: str) -> str:
string_to_sign = f"{verb}\n\n\n{date}\n{canonicalized_resource}"
digest = hmac.new(
self.access_key_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha1,
).digest()
return base64.b64encode(digest).decode("ascii")
def _request(self, verb: str, url: str, canonicalized_resource: str) -> bytes:
now = dt.datetime.now(dt.timezone.utc)
date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")
signature = self._sign(verb, date, canonicalized_resource)
request = urllib.request.Request(url, method=verb)
request.add_header("Date", date)
request.add_header("Authorization", f"OSS {self.access_key_id}:{signature}")
with urllib.request.urlopen(request, timeout=60) as response:
return response.read()
def list_objects(self, prefix: str) -> list[str]:
keys: list[str] = []
marker = ""
while True:
query = f"prefix={urllib.parse.quote(prefix)}&max-keys=1000"
canonicalized_resource = f"/{self.bucket}/"
if marker:
query += f"&marker={urllib.parse.quote(marker)}"
url = f"https://{self.bucket}.{self.endpoint}/?{query}"
xml_bytes = self._request("GET", url, canonicalized_resource)
root = ET.fromstring(xml_bytes)
ns = ""
if root.tag.startswith("{"):
ns = root.tag.split("}", 1)[0] + "}"
for content in root.findall(f"{ns}Contents"):
key = content.findtext(f"{ns}Key") or ""
if key and not key.endswith("/"):
keys.append(key)
is_truncated = (root.findtext(f"{ns}IsTruncated") or "").lower() == "true"
marker = root.findtext(f"{ns}NextMarker") or (keys[-1] if keys else "")
if not is_truncated or not marker:
break
return keys
def download_object(self, object_key: str, local_path: Path) -> None:
encoded_key = "/".join(urllib.parse.quote(part) for part in object_key.split("/"))
url = f"https://{self.bucket}.{self.endpoint}/{encoded_key}"
canonicalized_resource = f"/{self.bucket}/{object_key}"
data = self._request("GET", url, canonicalized_resource)
local_path.parent.mkdir(parents=True, exist_ok=True)
local_path.write_bytes(data)
@dataclass
class ReportEntry:
local_path: Path
object_key: str
response_id: str
questionnaire_id: str
version: str
steam_id: str
submitted_at_utc: str
created_at_utc: str
answer_count: int
question_ids: str
size: int
payload: dict
def local_path_for_key(object_key: str) -> Path:
relative = object_key[len(OSS_PREFIX):] if object_key.startswith(OSS_PREFIX) else object_key
return DATA_DIR / Path(*relative.split("/"))
def read_report(local_path: Path) -> ReportEntry | None:
try:
payload = json.loads(local_path.read_text(encoding="utf-8"))
sheet = payload.get("answerSheet") or {}
answers = sheet.get("Answers") or sheet.get("answers") or []
parts = local_path.relative_to(DATA_DIR).parts
version = payload.get("version") or (parts[0] if len(parts) > 0 else "")
steam_id = payload.get("steamId") or (parts[1] if len(parts) > 1 else "")
object_key = OSS_PREFIX + "/".join(parts).replace("\\", "/")
question_ids = ", ".join(
str(answer.get("QuestionId") or answer.get("questionId") or "")
for answer in answers
if isinstance(answer, dict) and (answer.get("QuestionId") or answer.get("questionId"))
)
return ReportEntry(
local_path=local_path,
object_key=object_key,
response_id=payload.get("responseId", local_path.stem),
questionnaire_id=payload.get("questionnaireId") or sheet.get("QuestionnaireId") or "",
version=version,
steam_id=steam_id,
submitted_at_utc=payload.get("submittedAtUtc") or sheet.get("SubmittedAtUtc") or "",
created_at_utc=payload.get("createdAtUtc", ""),
answer_count=len(answers),
question_ids=question_ids,
size=local_path.stat().st_size,
payload=payload,
)
except Exception:
return None
def load_reports() -> list[ReportEntry]:
DATA_DIR.mkdir(parents=True, exist_ok=True)
reports = [entry for entry in (read_report(path) for path in DATA_DIR.rglob("*.json")) if entry]
reports.sort(
key=lambda item: item.submitted_at_utc or item.created_at_utc or str(item.local_path.stat().st_mtime),
reverse=True,
)
return reports
def download_reports(cfg: dict) -> tuple[int, int, int]:
client = OssClient(
cfg["access_key_id"],
cfg["access_key_secret"],
cfg["endpoint"],
cfg["bucket"],
)
keys = [key for key in client.list_objects(OSS_PREFIX) if key.endswith(".json")]
downloaded = 0
skipped = 0
failed = 0
for key in keys:
local_path = local_path_for_key(key)
if cfg.get("skip_existing_downloads", True) and local_path.exists():
skipped += 1
continue
try:
client.download_object(key, local_path)
downloaded += 1
except Exception:
failed += 1
return downloaded, skipped, failed
def format_answer(answer: dict) -> str:
question_id = answer.get("QuestionId") or answer.get("questionId") or ""
question_type = answer.get("QuestionType") or answer.get("questionType") or ""
selected = answer.get("SelectedOptionIds") or answer.get("selectedOptionIds") or []
open_text = answer.get("OpenText") or answer.get("openText") or ""
lines = [f"题目: {question_id}", f"类型: {question_type}"]
if selected:
lines.append("选项: " + ", ".join(str(item) for item in selected))
if open_text:
lines.append("文本:")
lines.append(str(open_text))
return "\n".join(lines)
class PlayerQuestionnaireViewer(Tk):
def __init__(self):
super().__init__()
self.title("TH1 玩家问卷查看器")
self.geometry("1180x760")
self.minsize(940, 580)
self.cfg = load_config()
self.reports: list[ReportEntry] = []
self.filtered_reports: list[ReportEntry] = []
self.selected_report: ReportEntry | None = None
self.access_key_id = StringVar(value=self.cfg.get("access_key_id", ""))
self.access_key_secret = StringVar(value=self.cfg.get("access_key_secret", ""))
self.endpoint = StringVar(value=self.cfg.get("endpoint", "oss-cn-shanghai.aliyuncs.com"))
self.bucket = StringVar(value=self.cfg.get("bucket", "th1-oss"))
self.skip_existing = BooleanVar(value=bool(self.cfg.get("skip_existing_downloads", True)))
self.version_filter = StringVar(value="全部版本")
self.questionnaire_filter = StringVar(value="全部问卷")
self.steam_filter = StringVar(value="")
self.question_filter = StringVar(value="")
self.status_text = StringVar(value="就绪")
self.answer_box = None
self._build_ui()
self.refresh_local_reports()
def _build_ui(self) -> None:
config_frame = ttk.LabelFrame(self, text="OSS")
config_frame.pack(fill=X, padx=10, pady=(10, 6))
ttk.Label(config_frame, text="AccessKey ID").grid(row=0, column=0, sticky="w", padx=8, pady=4)
ttk.Entry(config_frame, textvariable=self.access_key_id, width=32).grid(row=0, column=1, sticky="ew", padx=4)
ttk.Label(config_frame, text="AccessKey Secret").grid(row=0, column=2, sticky="w", padx=8)
ttk.Entry(config_frame, textvariable=self.access_key_secret, show="*", width=32).grid(row=0, column=3, sticky="ew", padx=4)
ttk.Label(config_frame, text="Endpoint").grid(row=1, column=0, sticky="w", padx=8, pady=4)
ttk.Entry(config_frame, textvariable=self.endpoint, width=32).grid(row=1, column=1, sticky="ew", padx=4)
ttk.Label(config_frame, text="Bucket").grid(row=1, column=2, sticky="w", padx=8)
ttk.Entry(config_frame, textvariable=self.bucket, width=32).grid(row=1, column=3, sticky="ew", padx=4)
ttk.Checkbutton(config_frame, text="跳过已下载 json", variable=self.skip_existing).grid(row=2, column=1, sticky="w", padx=4)
ttk.Button(config_frame, text="保存配置", command=self.save_current_config).grid(row=2, column=2, sticky="e", padx=4, pady=4)
ttk.Button(config_frame, text="更新内容", command=self.update_from_oss).grid(row=2, column=3, sticky="w", padx=4, pady=4)
config_frame.columnconfigure(1, weight=1)
config_frame.columnconfigure(3, weight=1)
filter_frame = ttk.Frame(self)
filter_frame.pack(fill=X, padx=10, pady=(0, 6))
ttk.Label(filter_frame, text="版本").pack(side=LEFT)
self.version_combo = ttk.Combobox(filter_frame, textvariable=self.version_filter, state="readonly", width=15)
self.version_combo.pack(side=LEFT, padx=(4, 10))
self.version_combo.bind("<<ComboboxSelected>>", lambda _event: self.apply_filters())
ttk.Label(filter_frame, text="问卷").pack(side=LEFT)
self.questionnaire_combo = ttk.Combobox(filter_frame, textvariable=self.questionnaire_filter, state="readonly", width=22)
self.questionnaire_combo.pack(side=LEFT, padx=(4, 10))
self.questionnaire_combo.bind("<<ComboboxSelected>>", lambda _event: self.apply_filters())
ttk.Label(filter_frame, text="题目ID").pack(side=LEFT)
question_entry = ttk.Entry(filter_frame, textvariable=self.question_filter, width=14)
question_entry.pack(side=LEFT, padx=4)
question_entry.bind("<KeyRelease>", lambda _event: self.apply_filters())
ttk.Label(filter_frame, text="SteamID").pack(side=LEFT)
steam_entry = ttk.Entry(filter_frame, textvariable=self.steam_filter, width=22)
steam_entry.pack(side=LEFT, padx=4)
steam_entry.bind("<KeyRelease>", lambda _event: self.apply_filters())
ttk.Button(filter_frame, text="刷新本地", command=self.refresh_local_reports).pack(side=LEFT, padx=8)
ttk.Button(filter_frame, text="打开下载目录", command=self.open_data_dir).pack(side=LEFT)
pane = ttk.PanedWindow(self, orient="horizontal")
pane.pack(fill=BOTH, expand=True, padx=10, pady=(0, 6))
list_frame = ttk.Frame(pane)
self.tree = ttk.Treeview(
list_frame,
columns=("submitted", "version", "questionnaire", "steam", "answers", "size"),
show="headings",
selectmode="browse",
)
for col, text, width in (
("submitted", "提交时间", 140),
("version", "版本", 86),
("questionnaire", "问卷ID", 170),
("steam", "SteamID", 150),
("answers", "答案", 58),
("size", "大小", 78),
):
self.tree.heading(col, text=text)
self.tree.column(col, width=width, anchor="w")
self.tree.pack(side=LEFT, fill=BOTH, expand=True)
scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.tree.yview)
scrollbar.pack(side=RIGHT, fill=Y)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.bind("<<TreeviewSelect>>", self.on_select_report)
pane.add(list_frame, weight=3)
preview_frame = ttk.Frame(pane)
self.preview = ttk.Treeview(preview_frame, columns=("field", "value"), show="headings", height=13)
self.preview.heading("field", text="字段")
self.preview.heading("value", text="内容")
self.preview.column("field", width=120, anchor="w")
self.preview.column("value", width=500, anchor="w")
self.preview.pack(fill=X)
self._build_answer_box(preview_frame)
pane.add(preview_frame, weight=4)
status = ttk.Label(self, textvariable=self.status_text, anchor="w")
status.pack(fill=X, padx=10, pady=(0, 8))
def _build_answer_box(self, parent) -> None:
import tkinter as tk
answer_frame = ttk.LabelFrame(parent, text="答案明细")
answer_frame.pack(fill=BOTH, expand=True, pady=(8, 0))
self.answer_box = tk.Text(answer_frame, wrap="word", height=20)
self.answer_box.pack(fill=BOTH, expand=True)
self.answer_box.configure(state="disabled")
def current_config(self) -> dict:
return {
"access_key_id": self.access_key_id.get(),
"access_key_secret": self.access_key_secret.get(),
"endpoint": self.endpoint.get(),
"bucket": self.bucket.get(),
"skip_existing_downloads": self.skip_existing.get(),
}
def save_current_config(self) -> None:
self.cfg = self.current_config()
save_config(self.cfg)
self.status_text.set(f"配置已保存到 {CONFIG_PATH}")
def update_from_oss(self) -> None:
cfg = self.current_config()
if not cfg["access_key_id"] or not cfg["access_key_secret"]:
messagebox.showerror("缺少配置", "请先填写 AccessKey ID 和 AccessKey Secret。")
return
self.save_current_config()
self.status_text.set("正在从 OSS 更新问卷答卷...")
self._run_worker(lambda: download_reports(cfg), self._after_download)
def _after_download(self, result) -> None:
if isinstance(result, Exception):
messagebox.showerror("更新失败", str(result))
self.status_text.set(f"更新失败: {result}")
return
downloaded, skipped, failed = result
self.refresh_local_reports()
self.status_text.set(f"更新完成: 下载 {downloaded}, 跳过 {skipped}, 失败 {failed};本地缓存 {len(self.reports)}")
def refresh_local_reports(self) -> None:
self.reports = load_reports()
versions = ["全部版本"] + sorted({entry.version for entry in self.reports if entry.version}, reverse=True)
questionnaires = ["全部问卷"] + sorted({entry.questionnaire_id for entry in self.reports if entry.questionnaire_id})
self.version_combo.configure(values=versions)
self.questionnaire_combo.configure(values=questionnaires)
if self.version_filter.get() not in versions:
self.version_filter.set("全部版本")
if self.questionnaire_filter.get() not in questionnaires:
self.questionnaire_filter.set("全部问卷")
self.apply_filters()
def apply_filters(self) -> None:
version = self.version_filter.get()
questionnaire_id = self.questionnaire_filter.get()
steam = self.steam_filter.get().strip()
question_id = self.question_filter.get().strip()
self.filtered_reports = []
for report in self.reports:
if version and version != "全部版本" and report.version != version:
continue
if questionnaire_id and questionnaire_id != "全部问卷" and report.questionnaire_id != questionnaire_id:
continue
if steam and steam not in report.steam_id:
continue
if question_id and question_id not in report.question_ids:
continue
self.filtered_reports.append(report)
for item in self.tree.get_children():
self.tree.delete(item)
for index, report in enumerate(self.filtered_reports):
self.tree.insert(
"",
END,
iid=str(index),
values=(
parse_oss_time(report.submitted_at_utc or report.created_at_utc),
report.version,
report.questionnaire_id,
report.steam_id,
report.answer_count,
human_bytes(report.size),
),
)
self.status_text.set(f"本地缓存 {len(self.reports)} 条,当前筛选 {len(self.filtered_reports)}")
def on_select_report(self, _event=None) -> None:
selection = self.tree.selection()
if not selection:
self.selected_report = None
return
index = int(selection[0])
if index < 0 or index >= len(self.filtered_reports):
return
self.selected_report = self.filtered_reports[index]
self.render_preview(self.selected_report)
def render_preview(self, report: ReportEntry) -> None:
for item in self.preview.get_children():
self.preview.delete(item)
rows = [
("ResponseID", report.response_id),
("问卷ID", report.questionnaire_id),
("版本", report.version),
("SteamID", report.steam_id),
("提交时间", parse_oss_time(report.submitted_at_utc)),
("创建时间", parse_oss_time(report.created_at_utc)),
("本地时间", report.payload.get("createdAtLocal", "")),
("时区", report.payload.get("timezone", "")),
("CrashSight设备ID", report.payload.get("crashSightDeviceId", "")),
("设备名", report.payload.get("deviceName", "")),
("设备型号", report.payload.get("deviceModel", "")),
("系统", report.payload.get("operatingSystem", "")),
("CPU", report.payload.get("processorType", "")),
("内存", f"{report.payload.get('systemMemorySizeMb', '')} MB"),
("显卡", report.payload.get("graphicsDeviceName", "")),
("显存", f"{report.payload.get('graphicsMemorySizeMb', '')} MB"),
("OSS Key", report.object_key),
("本地文件", str(report.local_path)),
]
for field, value in rows:
self.preview.insert("", END, values=(field, value))
sheet = report.payload.get("answerSheet") or {}
answers = sheet.get("Answers") or sheet.get("answers") or []
details = []
for index, answer in enumerate(answers, start=1):
if isinstance(answer, dict):
details.append(f"[{index}]\n{format_answer(answer)}")
self._set_text("\n\n".join(details))
def _set_text(self, value: str) -> None:
self.answer_box.configure(state="normal")
self.answer_box.delete("1.0", END)
self.answer_box.insert("1.0", value or "")
self.answer_box.configure(state="disabled")
def open_data_dir(self) -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
os.startfile(DATA_DIR)
def _run_worker(self, func, callback) -> None:
def runner():
try:
result = func()
except Exception as exc:
result = exc
self.after(0, lambda: callback(result))
threading.Thread(target=runner, daemon=True).start()
if __name__ == "__main__":
PlayerQuestionnaireViewer().mainloop()