TH1/Tools/Dashboard/serve.py
2026-05-27 14:19:32 +08:00

1970 lines
75 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
TH1 Dashboard Server with data refresh API.
Endpoints:
GET /* - Static file serving
GET /api/bugs/list - Load all bugs from DOC/bugs.json
GET /api/suggestions/list - Load all suggestions from DOC/suggestions.json
GET /api/marketing/events - Load marketing events from DOC/marketing/events.json
GET /api/marketing/notes - Load marketing notes from DOC/marketing/notes.json
GET /api/marketing/todos - Load marketing todos from DOC/marketing/todos.json
GET /api/devplan/list - Load devplan tasks from DOC/devplan.json
GET /api/oss/versions - List OSS data version folders
GET /api/oss/matches - List match summaries (paginated, filterable)
GET /api/oss/match?file=... - Load full match JSON data
GET /api/oss/hero-stats - Aggregate hero statistics
POST /api/refresh - Run export_data.py and return result
POST /api/sentiment/create - Create new sentiment record
POST /api/sentiment/delete - Delete a sentiment record
POST /api/bugs/create - Create new bug
POST /api/bugs/update - Update bug fields
POST /api/bugs/delete - Delete bug by id
POST /api/suggestions/create - Create new suggestion
POST /api/suggestions/update - Update suggestion fields
POST /api/suggestions/delete - Delete suggestion by id
POST /api/devplan/update - Update task fields (description, etc.)
POST /api/devplan/subtask/create - Create subtask under a task
POST /api/devplan/subtask/toggle - Toggle subtask done state
POST /api/devplan/subtask/delete - Delete a subtask
Usage:
python serve.py # default port 8080
python serve.py 9090 # custom port
"""
import http.server
import json
import os
import shutil
import subprocess
import sys
import time
import uuid
import webbrowser
import urllib.request
import urllib.error
from urllib.parse import parse_qs, urlparse
from html.parser import HTMLParser
PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8080
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
EXPORT_SCRIPT = os.path.join(SCRIPT_DIR, 'export_data.py')
SENTIMENT_DIR = os.path.join(SCRIPT_DIR, 'data', 'sentiment')
SENTIMENT_INDEX = os.path.join(SENTIMENT_DIR, 'index.json')
MARKETING_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'marketing'))
MARKETING_NOTES = os.path.join(MARKETING_DIR, 'notes.json')
MARKETING_EVENTS = os.path.join(MARKETING_DIR, 'events.json')
DEVPLAN_FILE = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'devplan.json'))
# Project-level data lives in DOC/ (version controlled)
BUGS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'bugs.json')
SUGGESTIONS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'suggestions.json')
MKT_TODOS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'marketing', 'todos.json')
OSS_DATA_DIR = os.path.join(SCRIPT_DIR, 'data', 'oss')
SNS_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'sns'))
QUICK_REPLIES_FILE = os.path.join(SNS_DIR, 'quick_replies.json')
DESIGN_MECHANICS_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'Design', 'final', 'mechanics'))
DESIGN_NARRATIVE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'Design', 'final', 'narrative'))
DESIGN_SHARED_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'Design', 'shared'))
DESIGN_DOCS = [
{
'id': 'overview',
'title': '设计树',
'file': 'index.html',
'status': '首版',
'summary': '设计原则、游戏机制、文案剧情的树形入口。',
},
{
'id': 'design-principles',
'title': '设计原则',
'file': 'design-principles.html',
'status': '草案',
'summary': '设计树根节点,后续承载基础原则正文。',
},
{
'id': 'hero-foundation',
'title': '基础 / 英雄',
'file': 'hero-foundation.html',
'status': '草案',
'summary': '英雄基础定位与五职阶设计。',
},
{
'id': 'core-loop',
'title': '核心循环',
'file': 'core-loop.html',
'status': '首版',
'summary': '回合、行动、表现和结算的设计骨架。',
},
{
'id': 'hero-system',
'title': '东方英雄系统',
'file': 'hero-system.html',
'status': '首版',
'summary': '英雄作为战术核心、阵营识别和成长目标的设计说明。',
},
{
'id': 'faction-system',
'title': '阵营与文明系统',
'file': 'faction-system.html',
'status': '首版',
'summary': '文明基底与东方势力层的边界。',
},
{
'id': 'unit-skill-system',
'title': '单位与技能系统',
'file': 'unit-skill-system.html',
'status': '首版',
'summary': '基础单位、特色单位、英雄单位和技能分类。',
},
{
'id': 'map-city-tech',
'title': '地图、城市与科技',
'file': 'map-city-tech.html',
'status': '首版',
'summary': '经营层中地块、资源、城市、科技和文化卡的关系。',
},
]
def _load_bugs():
"""Load bugs.json from DOC/, return dict with nextId and bugs list."""
path = os.path.normpath(BUGS_FILE)
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
return {'nextId': 1, 'bugs': []}
def _save_bugs(data):
"""Save bugs.json to DOC/."""
path = os.path.normpath(BUGS_FILE)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _load_suggestions():
"""Load suggestions.json from DOC/, return dict with nextId and suggestions list."""
path = os.path.normpath(SUGGESTIONS_FILE)
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
return {'nextId': 1, 'suggestions': []}
def _save_suggestions(data):
"""Save suggestions.json to DOC/."""
path = os.path.normpath(SUGGESTIONS_FILE)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _load_devplan():
"""Load devplan.json from DOC/."""
if os.path.exists(DEVPLAN_FILE):
with open(DEVPLAN_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return []
def _save_devplan(data):
"""Save devplan.json to DOC/."""
os.makedirs(os.path.dirname(DEVPLAN_FILE), exist_ok=True)
with open(DEVPLAN_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _load_mkt_todos():
"""Load marketing todos from DOC/marketing/todos.json."""
path = os.path.normpath(MKT_TODOS_FILE)
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
return {'nextId': 1, 'todos': []}
def _save_mkt_todos(data):
"""Save marketing todos to DOC/marketing/todos.json."""
path = os.path.normpath(MKT_TODOS_FILE)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _load_sentiment_index():
"""Load the sentiment index.json, return list."""
if os.path.exists(SENTIMENT_INDEX):
with open(SENTIMENT_INDEX, 'r', encoding='utf-8') as f:
return json.load(f)
return []
def _save_sentiment_index(records):
"""Save the sentiment index.json."""
os.makedirs(SENTIMENT_DIR, exist_ok=True)
with open(SENTIMENT_INDEX, 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=2)
# ========== Multipart Parser (no cgi dependency) ==========
def parse_multipart(fp, content_type, content_length):
"""
Parse multipart/form-data manually.
Returns: (fields: dict[str, str], files: list[{name, filename, data}])
"""
# Extract boundary
boundary = None
for part in content_type.split(';'):
part = part.strip()
if part.startswith('boundary='):
boundary = part[len('boundary='):]
# Strip quotes if present
if boundary.startswith('"') and boundary.endswith('"'):
boundary = boundary[1:-1]
break
if not boundary:
raise ValueError('No boundary in Content-Type')
raw = fp.read(content_length)
boundary_bytes = ('--' + boundary).encode('utf-8')
end_boundary = boundary_bytes + b'--'
# Split into parts
parts = raw.split(boundary_bytes)
fields = {}
files = []
for part in parts:
# Skip preamble and epilogue
if not part or part == b'--\r\n' or part == b'--':
continue
part = part.strip(b'\r\n')
if not part or part == b'--':
continue
# Split headers from body
header_end = part.find(b'\r\n\r\n')
if header_end < 0:
continue
header_raw = part[:header_end].decode('utf-8', errors='replace')
body = part[header_end + 4:]
# Remove trailing \r\n
if body.endswith(b'\r\n'):
body = body[:-2]
# Parse Content-Disposition
name = None
filename = None
for line in header_raw.split('\r\n'):
if line.lower().startswith('content-disposition:'):
for param in line.split(';'):
param = param.strip()
if param.startswith('name='):
name = param[5:].strip('"')
elif param.startswith('filename='):
filename = param[9:].strip('"')
if name is None:
continue
if filename:
files.append({'name': name, 'filename': filename, 'data': body})
else:
fields[name] = body.decode('utf-8', errors='replace')
return fields, files
class DashboardHandler(http.server.SimpleHTTPRequestHandler):
"""Handles static files and API endpoints."""
extensions_map = {
**http.server.SimpleHTTPRequestHandler.extensions_map,
'.js': 'application/javascript',
'.json': 'application/json',
'.css': 'text/css',
}
def end_headers(self):
if self.path.endswith(('.html', '.js', '.css')) or self.path == '/' or self.path.startswith('/api/design/'):
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
super().end_headers()
def do_GET(self):
if self.path.startswith('/api/design/mechanics/list'):
self._send_json(self._get_design_mechanics_docs())
return
if self.path.startswith('/api/design/mechanics/doc'):
self._handle_design_mechanics_doc()
return
if self.path.startswith('/api/design/mechanics/'):
self._handle_design_mechanics_static()
return
if self.path.startswith('/api/design/narrative/'):
self._handle_design_narrative_static()
return
if self.path.startswith('/api/design/shared/'):
self._handle_design_shared_asset()
return
if self.path.startswith('/api/bugs/list'):
self._send_json(_load_bugs())
return
if self.path.startswith('/api/suggestions/list'):
self._send_json(_load_suggestions())
return
if self.path.startswith('/api/marketing/todos'):
self._send_json(_load_mkt_todos())
return
if self.path.startswith('/api/marketing/events'):
events = []
if os.path.exists(MARKETING_EVENTS):
with open(MARKETING_EVENTS, 'r', encoding='utf-8') as f:
events = json.load(f)
self._send_json(events)
return
if self.path.startswith('/api/marketing/notes'):
notes = {}
if os.path.exists(MARKETING_NOTES):
with open(MARKETING_NOTES, 'r', encoding='utf-8') as f:
notes = json.load(f)
self._send_json(notes)
return
if self.path.startswith('/api/devplan/list'):
self._send_json(_load_devplan())
return
if self.path.startswith('/api/oss/versions'):
self._send_json(self._get_oss_versions())
return
if self.path.startswith('/api/oss/matches'):
self._handle_oss_matches()
return
if self.path.startswith('/api/oss/match?'):
self._handle_oss_match_detail()
return
if self.path.startswith('/api/oss/hero-stats'):
self._handle_oss_hero_stats()
return
# SNS APIs
if self.path.startswith('/api/sns/'):
self._handle_sns_get()
return
# Quick replies (SNS助手 / 常用工具)
if self.path.startswith('/api/quick-replies'):
self._handle_qr_get()
return
super().do_GET()
def _get_design_mechanics_docs(self):
docs = []
for doc in DESIGN_DOCS:
path = os.path.join(DESIGN_MECHANICS_DIR, doc['file'])
item = dict(doc)
item['exists'] = os.path.exists(path)
item['updatedAt'] = os.path.getmtime(path) if item['exists'] else None
docs.append(item)
return {'base': 'Design/final/mechanics', 'docs': docs}
def _handle_design_mechanics_doc(self):
parsed = urlparse(self.path)
query = parse_qs(parsed.query)
file_name = query.get('file', ['index.html'])[0]
if os.path.basename(file_name) != file_name or not file_name.endswith('.html'):
self.send_error(400, 'Invalid design document path')
return
path = os.path.normpath(os.path.join(DESIGN_MECHANICS_DIR, file_name))
if not path.startswith(DESIGN_MECHANICS_DIR) or not os.path.exists(path):
self.send_error(404, 'Design document not found')
return
with open(path, 'rb') as f:
data = f.read()
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
def _handle_design_mechanics_static(self):
parsed = urlparse(self.path)
file_name = parsed.path[len('/api/design/mechanics/'):]
if os.path.basename(file_name) != file_name or not file_name.endswith(('.html', '.md')):
self.send_error(400, 'Invalid design document path')
return
path = os.path.normpath(os.path.join(DESIGN_MECHANICS_DIR, file_name))
if not path.startswith(DESIGN_MECHANICS_DIR) or not os.path.exists(path):
self.send_error(404, 'Design document not found')
return
with open(path, 'rb') as f:
data = f.read()
content_type = 'text/html; charset=utf-8' if path.endswith('.html') else 'text/plain; charset=utf-8'
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
def _handle_design_narrative_static(self):
parsed = urlparse(self.path)
file_name = parsed.path[len('/api/design/narrative/'):]
if os.path.basename(file_name) != file_name or not file_name.endswith('.html'):
self.send_error(400, 'Invalid narrative document path')
return
path = os.path.normpath(os.path.join(DESIGN_NARRATIVE_DIR, file_name))
if not path.startswith(DESIGN_NARRATIVE_DIR) or not os.path.exists(path):
self.send_error(404, 'Narrative document not found')
return
with open(path, 'rb') as f:
data = f.read()
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
def _handle_design_shared_asset(self):
parsed = urlparse(self.path)
rel_path = parsed.path[len('/api/design/shared/'):]
if not rel_path or '..' in rel_path.replace('\\', '/').split('/'):
self.send_error(400, 'Invalid shared design asset path')
return
path = os.path.normpath(os.path.join(DESIGN_SHARED_DIR, rel_path))
if not path.startswith(DESIGN_SHARED_DIR) or not os.path.exists(path):
self.send_error(404, 'Shared design asset not found')
return
with open(path, 'rb') as f:
data = f.read()
content_type = 'text/css; charset=utf-8' if path.endswith('.css') else 'application/octet-stream'
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
def do_POST(self):
if self.path == '/api/refresh':
self.handle_refresh()
elif self.path == '/api/sentiment/create':
self.handle_sentiment_create()
elif self.path == '/api/sentiment/delete':
self.handle_sentiment_delete()
elif self.path == '/api/marketing/save-note':
self.handle_marketing_save_note()
elif self.path == '/api/marketing/create-event':
self.handle_marketing_create_event()
elif self.path == '/api/bugs/create':
self.handle_bugs_create()
elif self.path == '/api/bugs/update':
self.handle_bugs_update()
elif self.path == '/api/bugs/delete':
self.handle_bugs_delete()
elif self.path == '/api/suggestions/create':
self.handle_suggestions_create()
elif self.path == '/api/suggestions/update':
self.handle_suggestions_update()
elif self.path == '/api/suggestions/delete':
self.handle_suggestions_delete()
elif self.path == '/api/marketing/todos/create':
self.handle_mkt_todo_create()
elif self.path == '/api/marketing/todos/toggle':
self.handle_mkt_todo_toggle()
elif self.path == '/api/marketing/todos/delete':
self.handle_mkt_todo_delete()
elif self.path == '/api/devplan/update':
self.handle_devplan_update()
elif self.path == '/api/devplan/subtask/create':
self.handle_devplan_subtask_create()
elif self.path == '/api/devplan/subtask/toggle':
self.handle_devplan_subtask_toggle()
elif self.path == '/api/devplan/subtask/delete':
self.handle_devplan_subtask_delete()
# SNS APIs
elif self.path.startswith('/api/sns/'):
self._handle_sns_post()
# Quick replies (SNS助手 / 常用工具)
elif self.path.startswith('/api/quick-replies/'):
self._handle_qr_post()
else:
self.send_error(404, 'Not Found')
def _send_json(self, data, status=200):
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(status)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
# ── OSS data helpers ──
def _get_oss_versions(self):
"""List available version folders in data/oss/."""
if not os.path.isdir(OSS_DATA_DIR):
return []
versions = []
for name in sorted(os.listdir(OSS_DATA_DIR)):
vdir = os.path.join(OSS_DATA_DIR, name)
if os.path.isdir(vdir):
# Count json files recursively
count = 0
for _root, _dirs, files in os.walk(vdir):
count += sum(1 for f in files if f.endswith('.json'))
versions.append({'version': name, 'matchCount': count})
return versions
def _handle_oss_matches(self):
"""List match summaries for selected versions.
Query params: versions=0.7.0,0.6.10&page=1&pageSize=20
Optional filters: unitType=1&giantType=0&level=0
"""
from urllib.parse import urlparse, parse_qs
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
versions = params.get('versions', [''])[0].split(',')
versions = [v.strip() for v in versions if v.strip()]
page = int(params.get('page', ['1'])[0])
page_size = int(params.get('pageSize', ['20'])[0])
# Filters
filter_unit_type = params.get('unitType', [None])[0]
filter_giant_type = params.get('giantType', [None])[0]
filter_level = params.get('level', [None])[0]
if filter_unit_type is not None:
filter_unit_type = int(filter_unit_type)
if filter_giant_type is not None:
filter_giant_type = int(filter_giant_type)
if filter_level is not None:
filter_level = int(filter_level)
if not os.path.isdir(OSS_DATA_DIR):
self._send_json({'matches': [], 'total': 0, 'page': page, 'pageSize': page_size})
return
# Collect all match file paths
match_files = []
for ver in versions:
ver_dir = os.path.join(OSS_DATA_DIR, ver)
if not os.path.isdir(ver_dir):
continue
for root, dirs, files in os.walk(ver_dir):
for f in files:
if f.endswith('.json'):
match_files.append(os.path.join(root, f))
# Sort by filename (timestamp) descending
match_files.sort(key=lambda p: os.path.basename(p), reverse=True)
# Load summaries and apply filters
matches = []
for fpath in match_files:
try:
with open(fpath, 'r', encoding='utf-8-sig') as fp:
data = json.load(fp)
except Exception:
continue
# Apply unit filter: search addUnits AND damages (heroes are in damages, not addUnits)
if filter_unit_type is not None or filter_giant_type is not None or filter_level is not None:
found = False
# Check addUnits
for au in data.get('addUnits', []):
if filter_unit_type is not None and au.get('unitType') != filter_unit_type:
continue
if filter_giant_type is not None and au.get('giantType') != filter_giant_type:
continue
if filter_level is not None and au.get('level') != filter_level:
continue
found = True
break
# Check damages (origin and target) - heroes appear here even if not in addUnits
if not found:
for d in data.get('damages', []):
for prefix in ('origin', 'target'):
ut = d.get(prefix + 'UnitType')
gt = d.get(prefix + 'GiantType')
lv = d.get(prefix + 'Level')
if filter_unit_type is not None and ut != filter_unit_type:
continue
if filter_giant_type is not None and gt != filter_giant_type:
continue
if filter_level is not None and lv != filter_level:
continue
found = True
break
if found:
break
if not found:
continue
# Build summary
rel = os.path.relpath(fpath, OSS_DATA_DIR).replace('\\', '/')
parts = rel.split('/')
version = parts[0] if len(parts) > 0 else ''
steam_id = parts[1] if len(parts) > 1 else ''
timestamp = os.path.splitext(parts[-1])[0] if parts else ''
# Extract match stats
damages = data.get('damages', [])
add_units = data.get('addUnits', [])
game_ends = data.get('matchGameEnds', [])
player_ends = data.get('playerGameEnds', [])
max_turn = 0
if game_ends:
max_turn = max(e.get('turn', 0) for e in game_ends)
elif damages:
max_turn = max(d.get('turn', 0) for d in damages)
matches.append({
'file': rel,
'version': version,
'steamId': steam_id,
'timestamp': timestamp,
'memberId': data.get('memberId', 0),
'totalTurns': max_turn,
'damageCount': len(damages),
'unitCount': len(add_units),
'playerCount': len(player_ends),
})
total = len(matches)
start = (page - 1) * page_size
end = start + page_size
page_matches = matches[start:end]
self._send_json({
'matches': page_matches,
'total': total,
'page': page,
'pageSize': page_size,
})
def _handle_oss_match_detail(self):
"""Return full match data for a specific file.
Query: ?file=0.7.0/steamid/timestamp.json
"""
from urllib.parse import urlparse, parse_qs
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
file_rel = params.get('file', [''])[0]
if not file_rel:
self._send_json({'error': 'file parameter required'}, 400)
return
fpath = os.path.normpath(os.path.join(OSS_DATA_DIR, file_rel))
# Security: ensure path is under OSS_DATA_DIR
if not fpath.startswith(os.path.normpath(OSS_DATA_DIR)):
self._send_json({'error': 'Invalid path'}, 400)
return
if not os.path.isfile(fpath):
self._send_json({'error': 'File not found'}, 404)
return
try:
with open(fpath, 'r', encoding='utf-8-sig') as fp:
data = json.load(fp)
self._send_json(data)
except Exception as e:
self._send_json({'error': str(e)}, 500)
def _handle_oss_hero_stats(self):
"""Aggregate per-hero stats across selected version matches.
Query: ?versions=0.7.0,0.6.10
Returns: { totalMatches, heroes: [{giantType, matchCount, totalDamage, totalKills, totalDeaths, ...}] }
"""
from urllib.parse import urlparse, parse_qs
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
versions = params.get('versions', [''])[0].split(',')
versions = [v.strip() for v in versions if v.strip()]
if not os.path.isdir(OSS_DATA_DIR):
self._send_json({'totalMatches': 0, 'heroes': []})
return
# Collect match files
match_files = []
for ver in versions:
ver_dir = os.path.join(OSS_DATA_DIR, ver)
if not os.path.isdir(ver_dir):
continue
for root, dirs, files in os.walk(ver_dir):
for f in files:
if f.endswith('.json'):
match_files.append(os.path.join(root, f))
# All known hero giantTypes
HERO_IDS = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,26,31,36]
# Per-hero accumulators
hero_data = {}
for gid in HERO_IDS:
hero_data[gid] = {
'giantType': gid,
'matchCount': 0, # matches where this hero appeared
'totalDamageDealt': 0, # sum of originDamage across all matches
'totalDamageTaken': 0, # sum of targetDamage
'totalKills': 0, # times this hero killed someone
'totalDeaths': 0, # times this hero was killed
'totalAppear': 0, # addUnit entries for this hero
}
total_matches = 0
for fpath in match_files:
try:
with open(fpath, 'r', encoding='utf-8-sig') as fp:
data = json.load(fp)
except Exception:
continue
total_matches += 1
# Track which heroes appeared in this match (via addUnits or damages)
heroes_in_match = set()
# Scan addUnits
for au in data.get('addUnits', []):
gt = au.get('giantType', 0)
if gt in hero_data:
hero_data[gt]['totalAppear'] += 1
heroes_in_match.add(gt)
# Scan damages
for d in data.get('damages', []):
ogt = d.get('originGiantType', 0)
tgt = d.get('targetGiantType', 0)
if ogt in hero_data:
heroes_in_match.add(ogt)
hero_data[ogt]['totalDamageDealt'] += d.get('originDamage', 0)
if d.get('isKill'):
hero_data[ogt]['totalKills'] += 1
if tgt in hero_data:
heroes_in_match.add(tgt)
hero_data[tgt]['totalDamageTaken'] += d.get('targetDamage', 0)
if d.get('isKill'):
hero_data[tgt]['totalDeaths'] += 1
# Update matchCount for all heroes that appeared
for gid in heroes_in_match:
hero_data[gid]['matchCount'] += 1
# Build result with computed averages
heroes = []
for gid in HERO_IDS:
h = hero_data[gid]
mc = h['matchCount'] or 1
heroes.append({
'giantType': gid,
'matchCount': h['matchCount'],
'appearRate': round(h['matchCount'] / max(total_matches, 1) * 100, 1),
'avgDamage': round(h['totalDamageDealt'] / mc, 1),
'avgDeaths': round(h['totalDeaths'] / mc, 2),
'avgKills': round(h['totalKills'] / mc, 2),
'totalAppear': h['totalAppear'],
})
self._send_json({
'totalMatches': total_matches,
'heroes': heroes,
})
def handle_refresh(self):
"""Run export_data.py and return the result as JSON."""
try:
result = subprocess.run(
[sys.executable, EXPORT_SCRIPT],
capture_output=True,
text=True,
timeout=60,
cwd=SCRIPT_DIR,
)
response = {
'success': result.returncode == 0,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode,
}
status = 200 if result.returncode == 0 else 500
except subprocess.TimeoutExpired:
response = {'success': False, 'error': 'Script timed out after 60s'}
status = 504
except Exception as e:
response = {'success': False, 'error': str(e)}
status = 500
self._send_json(response, status)
def handle_sentiment_create(self):
"""Create a new sentiment record from multipart form data."""
try:
content_type = self.headers.get('Content-Type', '')
content_length = int(self.headers.get('Content-Length', 0))
if 'multipart/form-data' in content_type:
fields, uploaded_files = parse_multipart(
self.rfile, content_type, content_length
)
else:
# JSON body fallback
raw = self.rfile.read(content_length)
fields = json.loads(raw.decode('utf-8'))
uploaded_files = []
title = fields.get('title', '').strip()
source_type = fields.get('sourceType', 'file').strip()
summary = fields.get('summary', '').strip()
source = fields.get('source', '').strip()
content_text = fields.get('content', '').strip()
if not title:
self._send_json({'success': False, 'error': 'Title is required'}, 400)
return
# Create record
record_id = time.strftime('%Y%m%d_%H%M%S') + '_' + uuid.uuid4().hex[:6]
record_dir = os.path.join(SENTIMENT_DIR, record_id)
os.makedirs(record_dir, exist_ok=True)
record = {
'id': record_id,
'title': title,
'timestamp': int(time.time() * 1000),
'sourceType': source_type,
'source': source,
'summary': summary,
'analyzed': False,
'files': [],
}
# Handle file uploads
for f in uploaded_files:
if f['filename']:
safe_name = os.path.basename(f['filename'])
filepath = os.path.join(record_dir, safe_name)
with open(filepath, 'wb') as out:
out.write(f['data'])
record['files'].append(safe_name)
# Handle text content
if content_text:
with open(os.path.join(record_dir, 'content.txt'), 'w', encoding='utf-8') as f:
f.write(content_text)
# Handle URL source - save URL to content.txt as well
if source_type == 'url' and source:
with open(os.path.join(record_dir, 'content.txt'), 'w', encoding='utf-8') as f:
f.write(f'来源链接: {source}\n')
# Save per-record meta
with open(os.path.join(record_dir, 'meta.json'), 'w', encoding='utf-8') as f:
json.dump(record, f, ensure_ascii=False, indent=2)
# Update index
records = _load_sentiment_index()
records.append(record)
_save_sentiment_index(records)
self._send_json({'success': True, 'id': record_id})
except Exception as e:
import traceback
traceback.print_exc()
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_sentiment_delete(self):
"""Delete a sentiment record by id."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
record_id = data.get('id', '')
if not record_id:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
records = _load_sentiment_index()
new_records = [r for r in records if r['id'] != record_id]
if len(new_records) == len(records):
self._send_json({'success': False, 'error': 'Record not found'}, 404)
return
# Remove directory
record_dir = os.path.join(SENTIMENT_DIR, record_id)
if os.path.isdir(record_dir):
shutil.rmtree(record_dir)
_save_sentiment_index(new_records)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_bugs_create(self):
"""Create a new bug record."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
title = data.get('title', '').strip()
if not title:
self._send_json({'success': False, 'error': 'Title is required'}, 400)
return
bugs_data = _load_bugs()
bug_id = bugs_data['nextId']
now = int(time.time() * 1000)
bug = {
'id': bug_id,
'title': title,
'description': data.get('description', '').strip(),
'status': 'open',
'priority': data.get('priority', 'medium'),
'module': data.get('module', ''),
'longTerm': False,
'createdAt': now,
'updatedAt': now,
}
bugs_data['bugs'].append(bug)
bugs_data['nextId'] = bug_id + 1
_save_bugs(bugs_data)
self._send_json({'success': True, 'id': bug_id})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_bugs_update(self):
"""Update a bug's fields (status, priority, etc.)."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
bug_id = data.get('id')
if bug_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
bugs_data = _load_bugs()
bug = next((b for b in bugs_data['bugs'] if b['id'] == bug_id), None)
if not bug:
self._send_json({'success': False, 'error': 'Bug not found'}, 404)
return
# Update allowed fields
for field in ('status', 'priority', 'title', 'description', 'module', 'longTerm'):
if field in data:
bug[field] = data[field]
bug['updatedAt'] = int(time.time() * 1000)
_save_bugs(bugs_data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_bugs_delete(self):
"""Delete a bug by id."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
bug_id = data.get('id')
if bug_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
bugs_data = _load_bugs()
original_len = len(bugs_data['bugs'])
bugs_data['bugs'] = [b for b in bugs_data['bugs'] if b['id'] != bug_id]
if len(bugs_data['bugs']) == original_len:
self._send_json({'success': False, 'error': 'Bug not found'}, 404)
return
_save_bugs(bugs_data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_suggestions_create(self):
"""Create a new suggestion record."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
title = data.get('title', '').strip()
if not title:
self._send_json({'success': False, 'error': 'Title is required'}, 400)
return
suggestions_data = _load_suggestions()
suggestion_id = suggestions_data['nextId']
now = int(time.time() * 1000)
suggestion = {
'id': suggestion_id,
'title': title,
'description': data.get('description', '').strip(),
'status': 'open',
'module': data.get('module', ''),
'createdAt': now,
'updatedAt': now,
}
suggestions_data['suggestions'].append(suggestion)
suggestions_data['nextId'] = suggestion_id + 1
_save_suggestions(suggestions_data)
self._send_json({'success': True, 'id': suggestion_id})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_suggestions_update(self):
"""Update a suggestion's fields."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
suggestion_id = data.get('id')
if suggestion_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
suggestions_data = _load_suggestions()
suggestion = next((s for s in suggestions_data['suggestions'] if s['id'] == suggestion_id), None)
if not suggestion:
self._send_json({'success': False, 'error': 'Suggestion not found'}, 404)
return
for field in ('status', 'title', 'description', 'module'):
if field in data:
suggestion[field] = data[field]
suggestion['updatedAt'] = int(time.time() * 1000)
_save_suggestions(suggestions_data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_suggestions_delete(self):
"""Delete a suggestion by id."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
suggestion_id = data.get('id')
if suggestion_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
suggestions_data = _load_suggestions()
original_len = len(suggestions_data['suggestions'])
suggestions_data['suggestions'] = [s for s in suggestions_data['suggestions'] if s['id'] != suggestion_id]
if len(suggestions_data['suggestions']) == original_len:
self._send_json({'success': False, 'error': 'Suggestion not found'}, 404)
return
_save_suggestions(suggestions_data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_mkt_todo_create(self):
"""Create a new marketing TODO item."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
title = data.get('title', '').strip()
if not title:
self._send_json({'success': False, 'error': 'Title is required'}, 400)
return
todos_data = _load_mkt_todos()
todo_id = todos_data['nextId']
now = int(time.time() * 1000)
todo = {
'id': todo_id,
'title': title,
'done': False,
'createdAt': now,
}
todos_data['todos'].append(todo)
todos_data['nextId'] = todo_id + 1
_save_mkt_todos(todos_data)
self._send_json({'success': True, 'id': todo_id})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_mkt_todo_toggle(self):
"""Toggle a TODO item's done state."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
todo_id = data.get('id')
if todo_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
todos_data = _load_mkt_todos()
todo = next((t for t in todos_data['todos'] if t['id'] == todo_id), None)
if not todo:
self._send_json({'success': False, 'error': 'TODO not found'}, 404)
return
todo['done'] = not todo['done']
if todo['done']:
todo['doneAt'] = int(time.time() * 1000)
else:
todo.pop('doneAt', None)
_save_mkt_todos(todos_data)
self._send_json({'success': True, 'done': todo['done']})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_mkt_todo_delete(self):
"""Delete a TODO item."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
todo_id = data.get('id')
if todo_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
todos_data = _load_mkt_todos()
original_len = len(todos_data['todos'])
todos_data['todos'] = [t for t in todos_data['todos'] if t['id'] != todo_id]
if len(todos_data['todos']) == original_len:
self._send_json({'success': False, 'error': 'TODO not found'}, 404)
return
_save_mkt_todos(todos_data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_devplan_update(self):
"""Update a task's description or other fields."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
task_id = data.get('id')
if task_id is None:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
tasks = _load_devplan()
task = next((t for t in tasks if t['id'] == task_id), None)
if not task:
self._send_json({'success': False, 'error': 'Task not found'}, 404)
return
for field in ('description', 'title', 'status', 'priority'):
if field in data:
task[field] = data[field]
_save_devplan(tasks)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_devplan_subtask_create(self):
"""Create a subtask under a task."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
task_id = data.get('taskId')
title = data.get('title', '').strip()
if task_id is None or not title:
self._send_json({'success': False, 'error': 'taskId and title required'}, 400)
return
tasks = _load_devplan()
task = next((t for t in tasks if t['id'] == task_id), None)
if not task:
self._send_json({'success': False, 'error': 'Task not found'}, 404)
return
if 'subtasks' not in task:
task['subtasks'] = []
task['subtaskNextId'] = 1
st_id = task.get('subtaskNextId', 1)
subtask = {
'id': st_id,
'title': title,
'done': False,
'createdAt': int(time.time() * 1000),
}
task['subtasks'].append(subtask)
task['subtaskNextId'] = st_id + 1
_save_devplan(tasks)
self._send_json({'success': True, 'id': st_id})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_devplan_subtask_toggle(self):
"""Toggle a subtask's done state."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
task_id = data.get('taskId')
st_id = data.get('subtaskId')
if task_id is None or st_id is None:
self._send_json({'success': False, 'error': 'taskId and subtaskId required'}, 400)
return
tasks = _load_devplan()
task = next((t for t in tasks if t['id'] == task_id), None)
if not task:
self._send_json({'success': False, 'error': 'Task not found'}, 404)
return
subtask = next((s for s in task.get('subtasks', []) if s['id'] == st_id), None)
if not subtask:
self._send_json({'success': False, 'error': 'Subtask not found'}, 404)
return
subtask['done'] = not subtask['done']
if subtask['done']:
subtask['doneAt'] = int(time.time() * 1000)
else:
subtask.pop('doneAt', None)
_save_devplan(tasks)
self._send_json({'success': True, 'done': subtask['done']})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_devplan_subtask_delete(self):
"""Delete a subtask."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
task_id = data.get('taskId')
st_id = data.get('subtaskId')
if task_id is None or st_id is None:
self._send_json({'success': False, 'error': 'taskId and subtaskId required'}, 400)
return
tasks = _load_devplan()
task = next((t for t in tasks if t['id'] == task_id), None)
if not task:
self._send_json({'success': False, 'error': 'Task not found'}, 404)
return
subs = task.get('subtasks', [])
original_len = len(subs)
task['subtasks'] = [s for s in subs if s['id'] != st_id]
if len(task['subtasks']) == original_len:
self._send_json({'success': False, 'error': 'Subtask not found'}, 404)
return
_save_devplan(tasks)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_marketing_save_note(self):
"""Save a marketing event note."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
event_id = data.get('id', '')
note = data.get('note', '')
if not event_id:
self._send_json({'success': False, 'error': 'ID is required'}, 400)
return
os.makedirs(MARKETING_DIR, exist_ok=True)
notes = {}
if os.path.exists(MARKETING_NOTES):
with open(MARKETING_NOTES, 'r', encoding='utf-8') as f:
notes = json.load(f)
notes[event_id] = note
with open(MARKETING_NOTES, 'w', encoding='utf-8') as f:
json.dump(notes, f, ensure_ascii=False, indent=2)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def handle_marketing_create_event(self):
"""Create a new marketing event."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
title = data.get('title', '').strip()
if not title:
self._send_json({'success': False, 'error': 'Title is required'}, 400)
return
os.makedirs(MARKETING_DIR, exist_ok=True)
# Load existing events
events = []
if os.path.exists(MARKETING_EVENTS):
with open(MARKETING_EVENTS, 'r', encoding='utf-8') as f:
events = json.load(f)
# Generate next ID
max_num = 0
for ev in events:
digits = ''.join(c for c in ev.get('id', '') if c.isdigit())
if digits:
max_num = max(max_num, int(digits))
next_id = 'ev' + str(max_num + 1).zfill(2)
new_event = {
'id': next_id,
'title': title,
'date': data.get('date', ''),
'endDate': data.get('endDate', ''),
'category': data.get('category', 'video'),
'platform': data.get('platform', []),
'tags': data.get('tags', []),
'status': data.get('status', 'pending'),
'folder': data.get('folder', ''),
'summary': data.get('summary', ''),
'assets': [],
'notes': '',
}
# Remove empty endDate
if not new_event['endDate']:
del new_event['endDate']
events.append(new_event)
with open(MARKETING_EVENTS, 'w', encoding='utf-8') as f:
json.dump(events, f, ensure_ascii=False, indent=2)
self._send_json({'success': True, 'event': new_event})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
# ── SNS API handlers ──
def _get_sns_platform(self, path):
"""Extract platform name from /api/sns/{platform}..."""
parts = path.split('/')
if len(parts) >= 4:
return parts[3]
return None
def _get_sns_file(self, platform):
"""Get path to the platform's JSON file."""
return os.path.join(SNS_DIR, f'{platform}.json')
def _load_sns_data(self, platform):
"""Load a platform's SNS data."""
path = self._get_sns_file(platform)
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
# Return default structure
names = {
'xiaoheihe': '小黑盒',
'steam': 'Steam商店',
'bilibili': 'Bilibili',
'twitter': 'Twitter',
'qq': 'Q群',
'discord': 'Discord'
}
return {
'nextId': 1,
'platform': platform,
'name': names.get(platform, platform),
'comments': []
}
def _save_sns_data(self, platform, data):
"""Save a platform's SNS data."""
os.makedirs(SNS_DIR, exist_ok=True)
path = self._get_sns_file(platform)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _handle_sns_get(self):
"""Handle GET /api/sns/{platform}"""
platform = self._get_sns_platform(self.path)
if not platform:
self._send_json({'error': 'Platform required'}, 400)
return
data = self._load_sns_data(platform)
self._send_json(data)
def _handle_sns_post(self):
"""Handle POST /api/sns/{platform}/update and /delete"""
path = self.path
platform = self._get_sns_platform(path)
if not platform:
self._send_json({'error': 'Platform required'}, 400)
return
if path.endswith('/update'):
self._handle_sns_update(platform)
elif path.endswith('/delete'):
self._handle_sns_delete(platform)
elif path.endswith('/fetch'):
self._handle_sns_fetch(platform)
elif path.endswith('/bulk-create'):
self._handle_sns_bulk_create(platform)
else:
self._send_json({'error': 'Unknown action'}, 404)
def _handle_sns_update(self, platform):
"""Update (create or modify) a comment."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
comment = json.loads(raw.decode('utf-8'))
data = self._load_sns_data(platform)
comments = data.get('comments', [])
# Find existing comment
existing = next((c for c in comments if c.get('id') == comment.get('id')), None)
if existing:
# Update
existing.update(comment)
existing['updatedAt'] = int(time.time() * 1000)
else:
# Create new
if 'id' not in comment:
comment['id'] = data.get('nextId', 1)
data['nextId'] = comment['id'] + 1
comment['createdAt'] = int(time.time() * 1000)
comment['updatedAt'] = comment['createdAt']
if 'status' not in comment:
comment['status'] = 'pending'
comments.append(comment)
data['comments'] = comments
self._save_sns_data(platform, data)
self._send_json({'success': True, 'comment': comment})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def _handle_sns_delete(self, platform):
"""Delete a comment."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
comment_id = data.get('id')
if comment_id is None:
self._send_json({'success': False, 'error': 'ID required'}, 400)
return
sns_data = self._load_sns_data(platform)
comments = sns_data.get('comments', [])
original_len = len(comments)
sns_data['comments'] = [c for c in comments if c.get('id') != comment_id]
if len(sns_data['comments']) == original_len:
self._send_json({'success': False, 'error': 'Comment not found'}, 404)
return
self._save_sns_data(platform, sns_data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def _handle_sns_fetch(self, platform):
"""Fetch new discussions from Steam Community."""
if platform != 'steam':
self._send_json({'error': 'Only steam platform supported'}, 400)
return
try:
# Steam Community URL for the app
url = 'https://steamcommunity.com/app/3774440/discussions/'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cookie': 'Steam_Language=schinese',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
req = urllib.request.Request(url, headers=headers)
# Try with retry logic
max_retries = 3
last_error = None
for attempt in range(max_retries):
try:
# Use TLS 1.2+ and disable SSL verification for better compatibility
import ssl
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with urllib.request.urlopen(req, timeout=30, context=ctx) as response:
html = response.read().decode('utf-8')
# Parse discussions from HTML
discussions = self._parse_steam_discussions(html)
# Limit to reasonable amount
discussions = discussions[:50]
# Load existing data
existing_data = self._load_sns_data('steam')
existing_titles = {c.get('title', '') for c in existing_data.get('comments', [])}
# Filter out already existing posts
new_discussions = []
for d in discussions:
new_discussions.append({
'id': d.get('id', str(int(time.time() * 1000))),
'title': d.get('title', ''),
'author': d.get('author', ''),
'url': d.get('url', ''),
'replies': d.get('replies', 0),
'timestamp': int(time.time() * 1000)
})
self._send_json({
'success': True,
'discussions': new_discussions,
'count': len(new_discussions)
})
return
except urllib.error.URLError as ue:
last_error = ue
if attempt < max_retries - 1:
time.sleep(2) # Wait before retry
continue
raise
except urllib.error.URLError as e:
# Provide fallback demo data if Steam is unreachable
import traceback
traceback.print_exc()
self._send_json({
'success': True,
'discussions': self._get_steam_fallback_data(),
'count': 3,
'warning': '无法连接到Steam社区已返回示例数据。请检查网络连接。'
})
except Exception as e:
import traceback
traceback.print_exc()
self._send_json({
'success': True,
'discussions': self._get_steam_fallback_data(),
'count': 3,
'warning': f'获取Steam数据失败: {str(e)[:50]},已返回示例数据'
})
def _get_steam_fallback_data(self):
"""Return demo data when Steam is unreachable."""
return [
{
'id': 'demo1',
'title': '[示例] 游戏新手攻略讨论',
'author': '玩家A',
'url': 'https://steamcommunity.com/app/3774440/discussions/',
'replies': 15,
'timestamp': int(time.time() * 1000)
},
{
'id': 'demo2',
'title': '[示例] 联机对战心得分享',
'author': '玩家B',
'url': 'https://steamcommunity.com/app/3774440/discussions/',
'replies': 8,
'timestamp': int(time.time() * 1000)
},
{
'id': 'demo3',
'title': '[示例] BUG反馈闪退问题',
'author': '玩家C',
'url': 'https://steamcommunity.com/app/3774440/discussions/',
'replies': 3,
'timestamp': int(time.time() * 1000)
}
]
def _parse_steam_discussions(self, html):
"""Parse Steam Community discussion list from HTML."""
discussions = []
import re
# Steam discussion page uses specific HTML structure
# Look for topic entries with class "forum_topic"
# Pattern 1: Modern Steam discussion format
# Find all rows with class containing "forum_topic"
topic_blocks = re.findall(
r'<div[^>]*class="forum_topic[^"]*"[^>]*>(.*?)</div>\s*</div>(?:\s*</div>)?',
html, re.DOTALL
)
for block in topic_blocks:
# Extract topic ID
topic_id_match = re.search(r'data-topic="([^"]+)"', block)
topic_id = topic_id_match.group(1) if topic_id_match else ''
# Extract title
title_match = re.search(r'class="forum_topic_name[^"]*"[^>]*>([^<]+)', block)
title = title_match.group(1).strip() if title_match else ''
# Extract author
author_match = re.search(r'class="[^"]*author[^"]*"[^>]*>([^<]+)', block)
author = author_match.group(1).strip() if author_match else ''
# Extract replies count
replies_match = re.search(r'class="[^"]*replies[^"]*"[^>]*>(\d+)', block)
replies = int(replies_match.group(1)) if replies_match else 0
if title:
discussions.append({
'id': topic_id,
'title': title,
'author': author,
'url': f'https://steamcommunity.com/app/3774440/discussions/?t={topic_id}',
'replies': replies
})
# Pattern 2: Alternative format - look for discussion links
if not discussions:
# Find all discussion links
links = re.findall(
r'<a[^>]*href="(/app/3774440/discussions/\d+/[^"]+)"[^>]*>(.*?)</a>',
html, re.DOTALL
)
for url_path, content in links:
# Extract title from the content
title_match = re.search(r'>([^<]+)<', content)
if title_match:
title = title_match.group(1).strip()
if title and len(title) > 3: # Filter out short/empty titles
discussions.append({
'id': url_path.split('/')[-2] if '/' in url_path else 'unknown',
'title': title,
'author': '',
'url': f'https://steamcommunity.com{url_path}',
'replies': 0
})
# Pattern 3: Last resort - look for any link containing discussion
if not discussions:
all_links = re.findall(
r'href="(/app/3774440/discussions[^"]+)"[^>]*>([^<]+)</a>',
html
)
seen = set()
for url_path, title in all_links:
title = title.strip()
if title and title not in seen and len(title) > 5:
seen.add(title)
discussions.append({
'id': url_path.split('/')[-1] or url_path,
'title': title,
'author': '',
'url': f'https://steamcommunity.com{url_path}',
'replies': 0
})
return discussions
def _handle_sns_bulk_create(self, platform):
"""Bulk create comments from selected discussions."""
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw.decode('utf-8'))
discussions = data.get('discussions', [])
if not discussions:
self._send_json({'success': True, 'created': 0})
return
# Load existing data
sns_data = self._load_sns_data(platform)
comments = sns_data.get('comments', [])
next_id = sns_data.get('nextId', 1)
created_count = 0
for d in discussions:
# Check if already exists
exists = any(c.get('title') == d.get('title') for c in comments)
if not exists:
comment = {
'id': next_id,
'title': d.get('title', 'Untitled'),
'status': 'pending',
'author': d.get('author', ''),
'url': d.get('url', ''),
'source': 'steam',
'createdAt': int(time.time() * 1000),
'updatedAt': int(time.time() * 1000)
}
comments.append(comment)
next_id += 1
created_count += 1
sns_data['comments'] = comments
sns_data['nextId'] = next_id
self._save_sns_data(platform, sns_data)
self._send_json({'success': True, 'created': created_count})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
# SNS API handlers end
# ── Quick replies (常用回复) ──
def _load_quick_replies(self):
"""Load quick replies data."""
if os.path.exists(QUICK_REPLIES_FILE):
try:
with open(QUICK_REPLIES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict):
data = {}
data.setdefault('nextId', 1)
data.setdefault('replies', [])
return data
except Exception:
pass
return {'nextId': 1, 'replies': []}
def _save_quick_replies(self, data):
"""Persist quick replies data."""
os.makedirs(os.path.dirname(QUICK_REPLIES_FILE), exist_ok=True)
with open(QUICK_REPLIES_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _handle_qr_get(self):
"""GET /api/quick-replies"""
self._send_json(self._load_quick_replies())
def _handle_qr_post(self):
"""POST /api/quick-replies/{create|update|delete}"""
path = self.path
if path.endswith('/create'):
self._handle_qr_create()
elif path.endswith('/update'):
self._handle_qr_update()
elif path.endswith('/delete'):
self._handle_qr_delete()
else:
self._send_json({'error': 'Unknown action'}, 404)
def _handle_qr_create(self):
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
payload = json.loads(raw.decode('utf-8'))
title = (payload.get('title') or '').strip()
content = payload.get('content') or ''
if not title:
self._send_json({'success': False, 'error': '标题不能为空'}, 400)
return
if not content.strip():
self._send_json({'success': False, 'error': '内容不能为空'}, 400)
return
data = self._load_quick_replies()
now = int(time.time() * 1000)
reply = {
'id': data.get('nextId', 1),
'title': title,
'content': content,
'createdAt': now,
'updatedAt': now
}
data['nextId'] = reply['id'] + 1
data['replies'].append(reply)
self._save_quick_replies(data)
self._send_json({'success': True, 'reply': reply})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def _handle_qr_update(self):
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
payload = json.loads(raw.decode('utf-8'))
reply_id = payload.get('id')
if reply_id is None:
self._send_json({'success': False, 'error': 'id required'}, 400)
return
data = self._load_quick_replies()
target = next((r for r in data['replies'] if r.get('id') == reply_id), None)
if not target:
self._send_json({'success': False, 'error': 'not found'}, 404)
return
if 'title' in payload:
title = (payload.get('title') or '').strip()
if not title:
self._send_json({'success': False, 'error': '标题不能为空'}, 400)
return
target['title'] = title
if 'content' in payload:
target['content'] = payload.get('content') or ''
target['updatedAt'] = int(time.time() * 1000)
self._save_quick_replies(data)
self._send_json({'success': True, 'reply': target})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def _handle_qr_delete(self):
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
payload = json.loads(raw.decode('utf-8'))
reply_id = payload.get('id')
if reply_id is None:
self._send_json({'success': False, 'error': 'id required'}, 400)
return
data = self._load_quick_replies()
original_len = len(data['replies'])
data['replies'] = [r for r in data['replies'] if r.get('id') != reply_id]
if len(data['replies']) == original_len:
self._send_json({'success': False, 'error': 'not found'}, 404)
return
self._save_quick_replies(data)
self._send_json({'success': True})
except Exception as e:
self._send_json({'success': False, 'error': str(e)}, 500)
def log_message(self, format, *args):
"""Quieter logging."""
msg = format % args
if '/api/' in msg or 'POST' in msg:
sys.stderr.write(f'[API] {msg}\n')
def kill_stale_servers(port):
"""Kill any existing processes listening on the target port."""
if sys.platform == 'win32':
try:
# netstat to find PIDs on the port
result = subprocess.run(
['netstat', '-ano'],
capture_output=True, text=True, timeout=5
)
my_pid = os.getpid()
killed = set()
for line in result.stdout.splitlines():
# Match lines like: TCP 0.0.0.0:8080 ... LISTENING 12345
parts = line.split()
if len(parts) >= 5 and f':{port}' in parts[1] and 'LISTENING' in parts[3]:
pid = int(parts[4])
if pid != my_pid and pid not in killed:
try:
subprocess.run(
['taskkill', '/PID', str(pid), '/F'],
capture_output=True, timeout=5
)
killed.add(pid)
print(f' Killed stale process PID {pid}')
except Exception:
pass
if killed:
time.sleep(0.3) # brief wait for port release
print(f' Cleaned up {len(killed)} old process(es)')
except Exception as e:
print(f' Warning: cleanup failed: {e}')
else:
# Unix: use lsof
try:
result = subprocess.run(
['lsof', '-ti', f':{port}'],
capture_output=True, text=True, timeout=5
)
my_pid = os.getpid()
for pid_str in result.stdout.strip().split('\n'):
if pid_str.strip():
pid = int(pid_str.strip())
if pid != my_pid:
os.kill(pid, 9)
print(f' Killed stale process PID {pid}')
time.sleep(0.3)
except Exception:
pass
os.chdir(SCRIPT_DIR)
print(f'Checking port {PORT} for stale processes...')
kill_stale_servers(PORT)
with http.server.ThreadingHTTPServer(('', PORT), DashboardHandler) as httpd:
url = f'http://localhost:{PORT}'
print(f'TH1 Dashboard serving at {url}')
print('Press Ctrl+C to stop.')
webbrowser.open(url)
try:
httpd.serve_forever()
except KeyboardInterrupt:
print('\nStopped.')