2809 lines
110 KiB
Python
2809 lines
110 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
TH1 Dashboard Server with data refresh API.
|
||
|
||
Endpoints:
|
||
GET /* - Static file serving
|
||
GET /api/bugs/list - Load all bugs from DOC/bugs.json
|
||
GET /api/todos/list - Load all todos from DOC/todos.json
|
||
GET /api/suggestions/list - Load all suggestions from DOC/suggestions.json
|
||
GET /api/marketing/events - Load marketing events from DOC/marketing/events.json
|
||
GET /api/marketing/notes - Load marketing notes from DOC/marketing/notes.json
|
||
GET /api/marketing/todos - Load marketing todos from DOC/marketing/todos.json
|
||
GET /api/devplan/list - Load devplan tasks from DOC/devplan.json
|
||
GET /api/oss/versions - List OSS data version folders
|
||
GET /api/oss/matches - List match summaries (paginated, filterable)
|
||
GET /api/oss/match?file=... - Load full match JSON data
|
||
GET /api/oss/hero-stats - Aggregate hero statistics
|
||
GET /api/gamebalance/hero-data - Load current hero unit/skill data from Unity DataAssets
|
||
GET /api/balance-modeling/hero-pricing/latest - Load latest hero pricing model output
|
||
POST /api/refresh - Run export_data.py and return result
|
||
POST /api/sentiment/create - Create new sentiment record
|
||
POST /api/sentiment/delete - Delete a sentiment record
|
||
POST /api/bugs/create - Create new bug
|
||
POST /api/bugs/update - Update bug fields
|
||
POST /api/bugs/delete - Delete bug by id
|
||
POST /api/todos/create - Create new todo
|
||
POST /api/todos/update - Update todo fields
|
||
POST /api/todos/delete - Delete todo by id
|
||
POST /api/suggestions/create - Create new suggestion
|
||
POST /api/suggestions/update - Update suggestion fields
|
||
POST /api/suggestions/delete - Delete suggestion by id
|
||
GET /api/core-feedback/list - Load core player feedback collections
|
||
POST /api/core-feedback/update-feedback - Update a feedback item
|
||
GET /api/official-replies/list - Load official reply collections
|
||
POST /api/official-replies/update-reply - Update an official reply item
|
||
GET /api/raw-feedback/list - Load raw feedback sets
|
||
POST /api/raw-feedback/create-set - Create a raw feedback set
|
||
POST /api/raw-feedback/create-item - Create a raw feedback item
|
||
POST /api/devplan/update - Update task fields (description, etc.)
|
||
POST /api/devplan/subtask/create - Create subtask under a task
|
||
POST /api/devplan/subtask/toggle - Toggle subtask done state
|
||
POST /api/devplan/subtask/delete - Delete a subtask
|
||
|
||
Usage:
|
||
python serve.py # default port 8080
|
||
python serve.py 9090 # custom port
|
||
"""
|
||
|
||
import http.server
|
||
import csv
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import uuid
|
||
import webbrowser
|
||
import urllib.request
|
||
import urllib.error
|
||
import importlib.util
|
||
from urllib.parse import parse_qs, urlparse
|
||
from html.parser import HTMLParser
|
||
|
||
PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8080
|
||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
PROJECT_ROOT = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..'))
|
||
EXPORT_SCRIPT = os.path.join(SCRIPT_DIR, 'export_data.py')
|
||
SENTIMENT_DIR = os.path.join(SCRIPT_DIR, 'data', 'sentiment')
|
||
SENTIMENT_INDEX = os.path.join(SENTIMENT_DIR, 'index.json')
|
||
MARKETING_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'marketing'))
|
||
MARKETING_NOTES = os.path.join(MARKETING_DIR, 'notes.json')
|
||
MARKETING_EVENTS = os.path.join(MARKETING_DIR, 'events.json')
|
||
DEVPLAN_FILE = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'devplan.json'))
|
||
|
||
# Project-level data lives in DOC/ (version controlled)
|
||
BUGS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'bugs.json')
|
||
TODOS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'todos.json')
|
||
SUGGESTIONS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'suggestions.json')
|
||
MKT_TODOS_FILE = os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'marketing', 'todos.json')
|
||
OSS_DATA_DIR = os.path.join(SCRIPT_DIR, 'data', 'oss')
|
||
SNS_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'DOC', 'sns'))
|
||
QUICK_REPLIES_FILE = os.path.join(SNS_DIR, 'quick_replies.json')
|
||
CORE_FEEDBACK_FILE = os.path.join(SCRIPT_DIR, 'data', 'core_player_feedback.json')
|
||
OFFICIAL_REPLIES_FILE = os.path.join(SCRIPT_DIR, 'data', 'official_reply_collections.json')
|
||
RAW_FEEDBACK_FILE = os.path.join(SCRIPT_DIR, 'data', 'raw_feedback_sets.json')
|
||
BALANCE_MODELING_DIR = os.path.join(PROJECT_ROOT, 'Design', 'drafts', 'planning', 'balance_modeling')
|
||
BALANCE_MODELING_DATA_DIR = os.path.join(BALANCE_MODELING_DIR, 'data')
|
||
DESIGN_MECHANICS_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'Design', 'final', 'mechanics'))
|
||
DESIGN_NARRATIVE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'Design', 'final', 'narrative'))
|
||
DESIGN_SHARED_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, '..', '..', 'Design', 'shared'))
|
||
DESIGN_DOCS = [
|
||
{
|
||
'id': 'overview',
|
||
'title': '设计树',
|
||
'file': 'index.html',
|
||
'status': '首版',
|
||
'summary': '设计原则、游戏机制、文案剧情的树形入口。',
|
||
},
|
||
{
|
||
'id': 'design-principles',
|
||
'title': '设计原则',
|
||
'file': 'design-principles.html',
|
||
'status': '草案',
|
||
'summary': '设计树根节点,后续承载基础原则正文。',
|
||
},
|
||
{
|
||
'id': 'hero-foundation',
|
||
'title': '基础 / 英雄',
|
||
'file': 'hero-foundation.html',
|
||
'status': '草案',
|
||
'summary': '英雄基础定位与五职阶设计。',
|
||
},
|
||
{
|
||
'id': 'core-loop',
|
||
'title': '核心循环',
|
||
'file': 'core-loop.html',
|
||
'status': '首版',
|
||
'summary': '回合、行动、表现和结算的设计骨架。',
|
||
},
|
||
{
|
||
'id': 'hero-system',
|
||
'title': '东方英雄系统',
|
||
'file': 'hero-system.html',
|
||
'status': '首版',
|
||
'summary': '英雄作为战术核心、阵营识别和成长目标的设计说明。',
|
||
},
|
||
{
|
||
'id': 'faction-system',
|
||
'title': '阵营与文明系统',
|
||
'file': 'faction-system.html',
|
||
'status': '首版',
|
||
'summary': '文明基底与东方势力层的边界。',
|
||
},
|
||
{
|
||
'id': 'unit-skill-system',
|
||
'title': '单位与技能系统',
|
||
'file': 'unit-skill-system.html',
|
||
'status': '首版',
|
||
'summary': '基础单位、特色单位、英雄单位和技能分类。',
|
||
},
|
||
{
|
||
'id': 'map-city-tech',
|
||
'title': '地图、城市与科技',
|
||
'file': 'map-city-tech.html',
|
||
'status': '首版',
|
||
'summary': '经营层中地块、资源、城市、科技和文化卡的关系。',
|
||
},
|
||
]
|
||
|
||
|
||
def _load_bugs():
|
||
"""Load bugs.json from DOC/, return dict with nextId and bugs list."""
|
||
path = os.path.normpath(BUGS_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {'nextId': 1, 'bugs': []}
|
||
|
||
|
||
def _save_bugs(data):
|
||
"""Save bugs.json to DOC/."""
|
||
path = os.path.normpath(BUGS_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_todos():
|
||
"""Load todos.json from DOC/, return dict with nextId and todos list."""
|
||
path = os.path.normpath(TODOS_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {'nextId': 1, 'todos': []}
|
||
|
||
|
||
def _save_todos(data):
|
||
"""Save todos.json to DOC/."""
|
||
path = os.path.normpath(TODOS_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_suggestions():
|
||
"""Load suggestions.json from DOC/, return dict with nextId and suggestions list."""
|
||
path = os.path.normpath(SUGGESTIONS_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {'nextId': 1, 'suggestions': []}
|
||
|
||
|
||
def _save_suggestions(data):
|
||
"""Save suggestions.json to DOC/."""
|
||
path = os.path.normpath(SUGGESTIONS_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_devplan():
|
||
"""Load devplan.json from DOC/."""
|
||
if os.path.exists(DEVPLAN_FILE):
|
||
with open(DEVPLAN_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return []
|
||
|
||
|
||
def _save_devplan(data):
|
||
"""Save devplan.json to DOC/."""
|
||
os.makedirs(os.path.dirname(DEVPLAN_FILE), exist_ok=True)
|
||
with open(DEVPLAN_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_mkt_todos():
|
||
"""Load marketing todos from DOC/marketing/todos.json."""
|
||
path = os.path.normpath(MKT_TODOS_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {'nextId': 1, 'todos': []}
|
||
|
||
|
||
def _save_mkt_todos(data):
|
||
"""Save marketing todos to DOC/marketing/todos.json."""
|
||
path = os.path.normpath(MKT_TODOS_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_sentiment_index():
|
||
"""Load the sentiment index.json, return list."""
|
||
if os.path.exists(SENTIMENT_INDEX):
|
||
with open(SENTIMENT_INDEX, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return []
|
||
|
||
|
||
def _save_sentiment_index(records):
|
||
"""Save the sentiment index.json."""
|
||
os.makedirs(SENTIMENT_DIR, exist_ok=True)
|
||
with open(SENTIMENT_INDEX, 'w', encoding='utf-8') as f:
|
||
json.dump(records, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_core_feedback():
|
||
"""Load core player feedback collections from Dashboard data."""
|
||
path = os.path.normpath(CORE_FEEDBACK_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
now = time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime())
|
||
return {'schema_version': 1, 'updated_at': now, 'collections': []}
|
||
|
||
|
||
def _save_core_feedback(data):
|
||
"""Save core player feedback collections to Dashboard data."""
|
||
path = os.path.normpath(CORE_FEEDBACK_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
data['updated_at'] = time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime())
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_official_replies():
|
||
"""Load official reply collections from Dashboard data."""
|
||
path = os.path.normpath(OFFICIAL_REPLIES_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8-sig') as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, dict):
|
||
data = {}
|
||
data.setdefault('schema_version', 1)
|
||
data.setdefault('updated_at', time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime()))
|
||
data.setdefault('collections', [])
|
||
return data
|
||
now = time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime())
|
||
return {'schema_version': 1, 'updated_at': now, 'collections': []}
|
||
|
||
|
||
def _save_official_replies(data):
|
||
"""Save official reply collections to Dashboard data."""
|
||
path = os.path.normpath(OFFICIAL_REPLIES_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
data['updated_at'] = time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime())
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_raw_feedback():
|
||
"""Load raw feedback sets from Dashboard data."""
|
||
path = os.path.normpath(RAW_FEEDBACK_FILE)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8-sig') as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, dict):
|
||
data = {}
|
||
data.setdefault('schema_version', 1)
|
||
data.setdefault('updated_at', time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime()))
|
||
data.setdefault('sets', [])
|
||
return data
|
||
now = time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime())
|
||
return {'schema_version': 1, 'updated_at': now, 'sets': []}
|
||
|
||
|
||
def _save_raw_feedback(data):
|
||
"""Save raw feedback sets to Dashboard data."""
|
||
path = os.path.normpath(RAW_FEEDBACK_FILE)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
data['updated_at'] = time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime())
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _slugify_feedback_id(text, fallback):
|
||
"""Build a stable-ish ASCII id for local Dashboard records."""
|
||
raw = ''.join(ch.lower() if ch.isalnum() else '-' for ch in (text or ''))
|
||
raw = '-'.join(part for part in raw.split('-') if part)
|
||
if not raw:
|
||
raw = fallback
|
||
return raw[:48]
|
||
|
||
|
||
# ========== Multipart Parser (no cgi dependency) ==========
|
||
|
||
def parse_multipart(fp, content_type, content_length):
|
||
"""
|
||
Parse multipart/form-data manually.
|
||
Returns: (fields: dict[str, str], files: list[{name, filename, data}])
|
||
"""
|
||
# Extract boundary
|
||
boundary = None
|
||
for part in content_type.split(';'):
|
||
part = part.strip()
|
||
if part.startswith('boundary='):
|
||
boundary = part[len('boundary='):]
|
||
# Strip quotes if present
|
||
if boundary.startswith('"') and boundary.endswith('"'):
|
||
boundary = boundary[1:-1]
|
||
break
|
||
|
||
if not boundary:
|
||
raise ValueError('No boundary in Content-Type')
|
||
|
||
raw = fp.read(content_length)
|
||
boundary_bytes = ('--' + boundary).encode('utf-8')
|
||
end_boundary = boundary_bytes + b'--'
|
||
|
||
# Split into parts
|
||
parts = raw.split(boundary_bytes)
|
||
|
||
fields = {}
|
||
files = []
|
||
|
||
for part in parts:
|
||
# Skip preamble and epilogue
|
||
if not part or part == b'--\r\n' or part == b'--':
|
||
continue
|
||
part = part.strip(b'\r\n')
|
||
if not part or part == b'--':
|
||
continue
|
||
|
||
# Split headers from body
|
||
header_end = part.find(b'\r\n\r\n')
|
||
if header_end < 0:
|
||
continue
|
||
header_raw = part[:header_end].decode('utf-8', errors='replace')
|
||
body = part[header_end + 4:]
|
||
# Remove trailing \r\n
|
||
if body.endswith(b'\r\n'):
|
||
body = body[:-2]
|
||
|
||
# Parse Content-Disposition
|
||
name = None
|
||
filename = None
|
||
for line in header_raw.split('\r\n'):
|
||
if line.lower().startswith('content-disposition:'):
|
||
for param in line.split(';'):
|
||
param = param.strip()
|
||
if param.startswith('name='):
|
||
name = param[5:].strip('"')
|
||
elif param.startswith('filename='):
|
||
filename = param[9:].strip('"')
|
||
|
||
if name is None:
|
||
continue
|
||
|
||
if filename:
|
||
files.append({'name': name, 'filename': filename, 'data': body})
|
||
else:
|
||
fields[name] = body.decode('utf-8', errors='replace')
|
||
|
||
return fields, files
|
||
|
||
|
||
class DashboardHandler(http.server.SimpleHTTPRequestHandler):
|
||
"""Handles static files and API endpoints."""
|
||
|
||
extensions_map = {
|
||
**http.server.SimpleHTTPRequestHandler.extensions_map,
|
||
'.js': 'application/javascript',
|
||
'.json': 'application/json',
|
||
'.css': 'text/css',
|
||
}
|
||
|
||
def end_headers(self):
|
||
if self.path.endswith(('.html', '.js', '.css')) or self.path == '/' or self.path.startswith('/api/design/'):
|
||
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
|
||
self.send_header('Pragma', 'no-cache')
|
||
self.send_header('Expires', '0')
|
||
super().end_headers()
|
||
|
||
def do_GET(self):
|
||
if self.path.startswith('/api/design/mechanics/list'):
|
||
self._send_json(self._get_design_mechanics_docs())
|
||
return
|
||
if self.path.startswith('/api/design/mechanics/doc'):
|
||
self._handle_design_mechanics_doc()
|
||
return
|
||
if self.path.startswith('/api/design/mechanics/'):
|
||
self._handle_design_mechanics_static()
|
||
return
|
||
if self.path.startswith('/api/design/narrative/'):
|
||
self._handle_design_narrative_static()
|
||
return
|
||
if self.path.startswith('/api/design/shared/'):
|
||
self._handle_design_shared_asset()
|
||
return
|
||
if self.path.startswith('/api/bugs/list'):
|
||
self._send_json(_load_bugs())
|
||
return
|
||
if self.path.startswith('/api/todos/list'):
|
||
self._send_json(_load_todos())
|
||
return
|
||
if self.path.startswith('/api/suggestions/list'):
|
||
self._send_json(_load_suggestions())
|
||
return
|
||
if self.path.startswith('/api/core-feedback/list'):
|
||
self._send_json(_load_core_feedback())
|
||
return
|
||
if self.path.startswith('/api/official-replies/list'):
|
||
self._send_json(_load_official_replies())
|
||
return
|
||
if self.path.startswith('/api/raw-feedback/list'):
|
||
self._send_json(_load_raw_feedback())
|
||
return
|
||
if self.path.startswith('/api/marketing/todos'):
|
||
self._send_json(_load_mkt_todos())
|
||
return
|
||
if self.path.startswith('/api/marketing/events'):
|
||
events = []
|
||
if os.path.exists(MARKETING_EVENTS):
|
||
with open(MARKETING_EVENTS, 'r', encoding='utf-8') as f:
|
||
events = json.load(f)
|
||
self._send_json(events)
|
||
return
|
||
if self.path.startswith('/api/marketing/notes'):
|
||
notes = {}
|
||
if os.path.exists(MARKETING_NOTES):
|
||
with open(MARKETING_NOTES, 'r', encoding='utf-8') as f:
|
||
notes = json.load(f)
|
||
self._send_json(notes)
|
||
return
|
||
if self.path.startswith('/api/devplan/list'):
|
||
self._send_json(_load_devplan())
|
||
return
|
||
if self.path.startswith('/api/oss/versions'):
|
||
self._send_json(self._get_oss_versions())
|
||
return
|
||
if self.path.startswith('/api/oss/matches'):
|
||
self._handle_oss_matches()
|
||
return
|
||
if self.path.startswith('/api/oss/match?'):
|
||
self._handle_oss_match_detail()
|
||
return
|
||
if self.path.startswith('/api/oss/hero-stats'):
|
||
self._handle_oss_hero_stats()
|
||
return
|
||
if self.path.startswith('/api/gamebalance/hero-data'):
|
||
self._handle_gamebalance_hero_data()
|
||
return
|
||
if self.path.startswith('/api/balance-modeling/hero-pricing/latest'):
|
||
self._handle_balance_modeling_hero_pricing_latest()
|
||
return
|
||
# SNS APIs
|
||
if self.path.startswith('/api/sns/'):
|
||
self._handle_sns_get()
|
||
return
|
||
# Quick replies (SNS助手 / 常用工具)
|
||
if self.path.startswith('/api/quick-replies'):
|
||
self._handle_qr_get()
|
||
return
|
||
super().do_GET()
|
||
|
||
def _get_design_mechanics_docs(self):
|
||
docs = []
|
||
for doc in DESIGN_DOCS:
|
||
path = os.path.join(DESIGN_MECHANICS_DIR, doc['file'])
|
||
item = dict(doc)
|
||
item['exists'] = os.path.exists(path)
|
||
item['updatedAt'] = os.path.getmtime(path) if item['exists'] else None
|
||
docs.append(item)
|
||
return {'base': 'Design/final/mechanics', 'docs': docs}
|
||
|
||
def _handle_design_mechanics_doc(self):
|
||
parsed = urlparse(self.path)
|
||
query = parse_qs(parsed.query)
|
||
file_name = query.get('file', ['index.html'])[0]
|
||
if os.path.basename(file_name) != file_name or not file_name.endswith('.html'):
|
||
self.send_error(400, 'Invalid design document path')
|
||
return
|
||
path = os.path.normpath(os.path.join(DESIGN_MECHANICS_DIR, file_name))
|
||
if not path.startswith(DESIGN_MECHANICS_DIR) or not os.path.exists(path):
|
||
self.send_error(404, 'Design document not found')
|
||
return
|
||
with open(path, 'rb') as f:
|
||
data = f.read()
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||
self.send_header('Content-Length', str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def _handle_design_mechanics_static(self):
|
||
parsed = urlparse(self.path)
|
||
file_name = parsed.path[len('/api/design/mechanics/'):]
|
||
if os.path.basename(file_name) != file_name or not file_name.endswith(('.html', '.md')):
|
||
self.send_error(400, 'Invalid design document path')
|
||
return
|
||
path = os.path.normpath(os.path.join(DESIGN_MECHANICS_DIR, file_name))
|
||
if not path.startswith(DESIGN_MECHANICS_DIR) or not os.path.exists(path):
|
||
self.send_error(404, 'Design document not found')
|
||
return
|
||
with open(path, 'rb') as f:
|
||
data = f.read()
|
||
content_type = 'text/html; charset=utf-8' if path.endswith('.html') else 'text/plain; charset=utf-8'
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', content_type)
|
||
self.send_header('Content-Length', str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def _handle_design_narrative_static(self):
|
||
parsed = urlparse(self.path)
|
||
file_name = parsed.path[len('/api/design/narrative/'):]
|
||
if os.path.basename(file_name) != file_name or not file_name.endswith('.html'):
|
||
self.send_error(400, 'Invalid narrative document path')
|
||
return
|
||
path = os.path.normpath(os.path.join(DESIGN_NARRATIVE_DIR, file_name))
|
||
if not path.startswith(DESIGN_NARRATIVE_DIR) or not os.path.exists(path):
|
||
self.send_error(404, 'Narrative document not found')
|
||
return
|
||
with open(path, 'rb') as f:
|
||
data = f.read()
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||
self.send_header('Content-Length', str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def _handle_design_shared_asset(self):
|
||
parsed = urlparse(self.path)
|
||
rel_path = parsed.path[len('/api/design/shared/'):]
|
||
if not rel_path or '..' in rel_path.replace('\\', '/').split('/'):
|
||
self.send_error(400, 'Invalid shared design asset path')
|
||
return
|
||
path = os.path.normpath(os.path.join(DESIGN_SHARED_DIR, rel_path))
|
||
if not path.startswith(DESIGN_SHARED_DIR) or not os.path.exists(path):
|
||
self.send_error(404, 'Shared design asset not found')
|
||
return
|
||
with open(path, 'rb') as f:
|
||
data = f.read()
|
||
content_type = 'text/css; charset=utf-8' if path.endswith('.css') else 'application/octet-stream'
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', content_type)
|
||
self.send_header('Content-Length', str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def do_POST(self):
|
||
if self.path == '/api/refresh':
|
||
self.handle_refresh()
|
||
elif self.path == '/api/sentiment/create':
|
||
self.handle_sentiment_create()
|
||
elif self.path == '/api/sentiment/delete':
|
||
self.handle_sentiment_delete()
|
||
elif self.path == '/api/marketing/save-note':
|
||
self.handle_marketing_save_note()
|
||
elif self.path == '/api/marketing/create-event':
|
||
self.handle_marketing_create_event()
|
||
elif self.path == '/api/bugs/create':
|
||
self.handle_bugs_create()
|
||
elif self.path == '/api/bugs/update':
|
||
self.handle_bugs_update()
|
||
elif self.path == '/api/bugs/delete':
|
||
self.handle_bugs_delete()
|
||
elif self.path == '/api/todos/create':
|
||
self.handle_todos_create()
|
||
elif self.path == '/api/todos/update':
|
||
self.handle_todos_update()
|
||
elif self.path == '/api/todos/delete':
|
||
self.handle_todos_delete()
|
||
elif self.path == '/api/suggestions/create':
|
||
self.handle_suggestions_create()
|
||
elif self.path == '/api/suggestions/update':
|
||
self.handle_suggestions_update()
|
||
elif self.path == '/api/suggestions/delete':
|
||
self.handle_suggestions_delete()
|
||
elif self.path == '/api/core-feedback/update-feedback':
|
||
self.handle_core_feedback_update()
|
||
elif self.path == '/api/official-replies/update-reply':
|
||
self.handle_official_replies_update()
|
||
elif self.path == '/api/raw-feedback/create-set':
|
||
self.handle_raw_feedback_create_set()
|
||
elif self.path == '/api/raw-feedback/create-item':
|
||
self.handle_raw_feedback_create_item()
|
||
elif self.path == '/api/marketing/todos/create':
|
||
self.handle_mkt_todo_create()
|
||
elif self.path == '/api/marketing/todos/toggle':
|
||
self.handle_mkt_todo_toggle()
|
||
elif self.path == '/api/marketing/todos/delete':
|
||
self.handle_mkt_todo_delete()
|
||
elif self.path == '/api/devplan/update':
|
||
self.handle_devplan_update()
|
||
elif self.path == '/api/devplan/subtask/create':
|
||
self.handle_devplan_subtask_create()
|
||
elif self.path == '/api/devplan/subtask/toggle':
|
||
self.handle_devplan_subtask_toggle()
|
||
elif self.path == '/api/devplan/subtask/delete':
|
||
self.handle_devplan_subtask_delete()
|
||
# SNS APIs
|
||
elif self.path.startswith('/api/sns/'):
|
||
self._handle_sns_post()
|
||
# Quick replies (SNS助手 / 常用工具)
|
||
elif self.path.startswith('/api/quick-replies/'):
|
||
self._handle_qr_post()
|
||
else:
|
||
self.send_error(404, 'Not Found')
|
||
|
||
def _send_json(self, data, status=200):
|
||
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
|
||
self.send_response(status)
|
||
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
||
self.send_header('Content-Length', str(len(body)))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
# ── Game balance helpers ──
|
||
|
||
def _handle_gamebalance_hero_data(self):
|
||
"""Return current hero unit/skill/task data without rewriting dashboard JSON files."""
|
||
try:
|
||
exporter = self._load_dashboard_exporter()
|
||
units = exporter.export_units()
|
||
skills = exporter.export_skills()
|
||
heroes = exporter.export_heroes()
|
||
players = exporter.export_players()
|
||
player_map = {
|
||
(player.get('forceId'), player.get('civId')): player
|
||
for player in players
|
||
}
|
||
hero_rows = [
|
||
unit for unit in units
|
||
if (unit.get('unitType') == 14 or unit.get('giantType', 0) > 0)
|
||
and unit.get('giantType', 0) > 0
|
||
and 1 <= unit.get('unitLevel', 0) <= 4
|
||
]
|
||
for unit in hero_rows:
|
||
empire = unit.get('empire') or {}
|
||
player = (
|
||
player_map.get((empire.get('force', 0) - 1, empire.get('civ', 0) - 1))
|
||
or player_map.get((empire.get('force'), empire.get('civ')))
|
||
)
|
||
if player:
|
||
empire['forceNameLocal'] = player.get('forceNameLocal') or ''
|
||
empire['civNameLocal'] = player.get('civNameLocal') or ''
|
||
empire['leaderName'] = player.get('leaderName') or ''
|
||
self._send_json({
|
||
'source': 'BundleResources/DataAssets',
|
||
'generatedAt': time.strftime('%Y-%m-%dT%H:%M:%S+08:00', time.localtime()),
|
||
'units': hero_rows,
|
||
'skills': skills,
|
||
'heroes': heroes,
|
||
'players': players,
|
||
})
|
||
except Exception as e:
|
||
self._send_json({'error': str(e)}, 500)
|
||
|
||
def _load_dashboard_exporter(self):
|
||
spec = importlib.util.spec_from_file_location('dashboard_export_data', EXPORT_SCRIPT)
|
||
if spec is None or spec.loader is None:
|
||
raise RuntimeError('Unable to load export_data.py')
|
||
module = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(module)
|
||
return module
|
||
|
||
# ── Balance modeling helpers ──
|
||
|
||
def _handle_balance_modeling_hero_pricing_latest(self):
|
||
"""Return the latest hero pricing model output from Design/drafts."""
|
||
latest = self._find_latest_hero_pricing_output()
|
||
if latest is None:
|
||
self._send_json({
|
||
'error': 'No hero pricing output found',
|
||
'expectedPattern': 'Design/drafts/planning/balance_modeling/data/hero_pricing_v*.csv',
|
||
}, 404)
|
||
return
|
||
|
||
version_key, pricing_path = latest
|
||
try:
|
||
self._send_json(self._load_hero_pricing_payload(version_key, pricing_path))
|
||
except Exception as e:
|
||
self._send_json({'error': str(e)}, 500)
|
||
|
||
def _find_latest_hero_pricing_output(self):
|
||
if not os.path.isdir(BALANCE_MODELING_DATA_DIR):
|
||
return None
|
||
|
||
pattern = re.compile(r'^hero_pricing_(v\d+(?:_\d+)*)\.csv$', re.IGNORECASE)
|
||
candidates = []
|
||
for name in os.listdir(BALANCE_MODELING_DATA_DIR):
|
||
match = pattern.match(name)
|
||
if not match:
|
||
continue
|
||
version_key = match.group(1).lower()
|
||
candidates.append((
|
||
self._balance_version_tuple(version_key),
|
||
version_key,
|
||
os.path.join(BALANCE_MODELING_DATA_DIR, name),
|
||
))
|
||
|
||
if not candidates:
|
||
return None
|
||
|
||
candidates.sort(key=lambda item: item[0])
|
||
_version_tuple, version_key, pricing_path = candidates[-1]
|
||
return version_key, pricing_path
|
||
|
||
def _load_hero_pricing_payload(self, version_key, pricing_path):
|
||
meta = self._load_balance_meta(version_key)
|
||
baselines = self._load_balance_baselines(version_key)
|
||
rows = self._load_hero_pricing_rows(pricing_path)
|
||
heroes = self._group_hero_pricing_rows(rows)
|
||
total_prices = [r['totalPrice'] for r in rows if isinstance(r.get('totalPrice'), (int, float))]
|
||
|
||
version = meta.get('version') or self._format_balance_version(version_key)
|
||
model_doc = os.path.join(BALANCE_MODELING_DIR, f'01-model-{version_key}.md')
|
||
pricing_doc = os.path.join(BALANCE_MODELING_DIR, f'02-hero-pricing-{version_key}.md')
|
||
|
||
return {
|
||
'version': version,
|
||
'versionKey': version_key,
|
||
'generatedOn': meta.get('generated_on'),
|
||
'formula': meta.get('formula'),
|
||
'baseUnitPrice': meta.get('base_unit_price'),
|
||
'source': self._project_relpath(pricing_path),
|
||
'modelDoc': self._project_relpath(model_doc) if os.path.exists(model_doc) else None,
|
||
'pricingDoc': self._project_relpath(pricing_doc) if os.path.exists(pricing_doc) else None,
|
||
'baselineSource': self._project_relpath(self._baseline_path(version_key)) if baselines else None,
|
||
'summary': {
|
||
'heroCount': len(heroes),
|
||
'rowCount': len(rows),
|
||
'minTotal': round(min(total_prices), 2) if total_prices else None,
|
||
'maxTotal': round(max(total_prices), 2) if total_prices else None,
|
||
'avgTotal': round(sum(total_prices) / len(total_prices), 2) if total_prices else None,
|
||
},
|
||
'baselines': baselines,
|
||
'heroes': heroes,
|
||
'rows': rows,
|
||
}
|
||
|
||
def _load_hero_pricing_rows(self, pricing_path):
|
||
rows = []
|
||
with open(pricing_path, 'r', encoding='utf-8-sig', newline='') as f:
|
||
reader = csv.DictReader(f)
|
||
fieldnames = reader.fieldnames or []
|
||
total_field = self._field_by_prefix(fieldnames, 'TotalPrice')
|
||
|
||
for raw in reader:
|
||
priced_skills_text = self._row_value(raw, 'PricedSkills') or ''
|
||
row = {
|
||
'giantTypeKey': self._row_value(raw, 'GiantType') or '',
|
||
'hero': self._row_value(raw, 'Hero') or '',
|
||
'chessType': self._row_value(raw, 'ChessType') or '',
|
||
'level': self._to_int(self._row_value(raw, 'Level')),
|
||
'stats': {
|
||
'hp': self._to_float(self._row_value(raw, 'HP')),
|
||
'attack': self._to_float(self._row_value(raw, 'Attack')),
|
||
'defense': self._to_float(self._row_value(raw, 'Defense')),
|
||
'move': self._to_float(self._row_value(raw, 'Move')),
|
||
'range': self._to_float(self._row_value(raw, 'Range')),
|
||
},
|
||
'baselines': {
|
||
'hp': self._to_float(self._row_value(raw, 'BaselineHP')),
|
||
'attack': self._to_float(self._row_value(raw, 'BaselineAttack')),
|
||
'defense': self._to_float(self._row_value(raw, 'BaselineDefense')),
|
||
'move': self._to_float(self._row_value(raw, 'BaselineMove')),
|
||
'range': self._to_float(self._row_value(raw, 'BaselineRange')),
|
||
},
|
||
'baseUnitPrice': self._to_float(self._row_value(raw, 'BaseUnitPrice')),
|
||
'attributePremium': self._to_float(self._row_value(raw, 'AttributePremium', 'AttributePrice')),
|
||
'rawSkillPrice': self._to_float(self._row_value(raw, 'RawSkillPrice', 'SkillPrice')),
|
||
'skillPackageBaseline': self._to_float(self._row_value(raw, 'SkillPackageBaseline')),
|
||
'skillPremium': self._to_float(self._row_value(raw, 'SkillPremium', 'SkillPrice')),
|
||
'totalPrice': self._to_float(raw.get(total_field) if total_field else self._row_value(raw, 'TotalPrice')),
|
||
'pricedSkillsText': priced_skills_text,
|
||
'pricedSkills': self._parse_priced_skills(priced_skills_text),
|
||
}
|
||
rows.append(row)
|
||
return rows
|
||
|
||
def _group_hero_pricing_rows(self, rows):
|
||
grouped = {}
|
||
for row in rows:
|
||
key = row.get('giantTypeKey') or row.get('hero') or 'Unknown'
|
||
hero = grouped.setdefault(key, {
|
||
'giantTypeKey': key,
|
||
'hero': row.get('hero') or key,
|
||
'chessType': row.get('chessType') or '',
|
||
'levels': [],
|
||
'levelTotals': {},
|
||
})
|
||
if row.get('chessType'):
|
||
hero['chessType'] = row['chessType']
|
||
hero['levels'].append(row)
|
||
if row.get('level') is not None:
|
||
hero['levelTotals'][str(row['level'])] = row.get('totalPrice')
|
||
|
||
heroes = list(grouped.values())
|
||
for hero in heroes:
|
||
hero['levels'].sort(key=lambda r: r.get('level') if r.get('level') is not None else 999)
|
||
totals = [r.get('totalPrice') for r in hero['levels'] if isinstance(r.get('totalPrice'), (int, float))]
|
||
hero['levelCount'] = len(hero['levels'])
|
||
hero['maxTotal'] = round(max(totals), 2) if totals else None
|
||
if hero['levels']:
|
||
final_level = hero['levels'][-1]
|
||
hero['finalLevel'] = final_level.get('level')
|
||
hero['finalTotal'] = final_level.get('totalPrice')
|
||
else:
|
||
hero['finalLevel'] = None
|
||
hero['finalTotal'] = None
|
||
heroes.sort(key=lambda h: (h.get('chessType') or '', h.get('hero') or ''))
|
||
return heroes
|
||
|
||
def _load_balance_meta(self, version_key):
|
||
path = os.path.join(BALANCE_MODELING_DATA_DIR, f'model_{version_key}_meta.json')
|
||
if not os.path.exists(path):
|
||
return {}
|
||
with open(path, 'r', encoding='utf-8-sig') as f:
|
||
return json.load(f)
|
||
|
||
def _load_balance_baselines(self, version_key):
|
||
path = self._baseline_path(version_key)
|
||
if not os.path.exists(path):
|
||
return []
|
||
|
||
baselines = []
|
||
with open(path, 'r', encoding='utf-8-sig', newline='') as f:
|
||
reader = csv.DictReader(f)
|
||
for raw in reader:
|
||
baselines.append({
|
||
'chessType': self._row_value(raw, 'ChessType') or '',
|
||
'level': self._to_int(self._row_value(raw, 'Level')),
|
||
'hp': self._to_float(self._row_value(raw, 'BaselineHP', 'HP')),
|
||
'attack': self._to_float(self._row_value(raw, 'BaselineAttack', 'Attack')),
|
||
'defense': self._to_float(self._row_value(raw, 'BaselineDefense', 'Defense')),
|
||
'move': self._to_float(self._row_value(raw, 'BaselineMove', 'Move')),
|
||
'range': self._to_float(self._row_value(raw, 'BaselineRange', 'Range')),
|
||
'skillBaseline': self._to_float(self._row_value(raw, 'SkillPackageBaseline')),
|
||
})
|
||
return baselines
|
||
|
||
def _baseline_path(self, version_key):
|
||
return os.path.join(BALANCE_MODELING_DATA_DIR, f'class_baseline_{version_key}.csv')
|
||
|
||
def _parse_priced_skills(self, text):
|
||
skills = []
|
||
for part in (text or '').split(';'):
|
||
item = part.strip()
|
||
if not item:
|
||
continue
|
||
tags = re.findall(r'\s+\[[^\]]+\]\s*$', item)
|
||
item_for_price = re.sub(r'(?:\s+\[[^\]]+\]\s*)+$', '', item).strip()
|
||
match = re.match(r'^(.*)\s+(-?\d+(?:\.\d+)?)$', item_for_price)
|
||
if match:
|
||
label = match.group(1).strip()
|
||
if tags:
|
||
label = f'{label} {" ".join(tag.strip() for tag in tags)}'
|
||
skills.append({'label': label, 'price': self._to_float(match.group(2))})
|
||
else:
|
||
skills.append({'label': item, 'price': None})
|
||
return skills
|
||
|
||
def _balance_version_tuple(self, version_key):
|
||
version_body = version_key[1:] if version_key.startswith('v') else version_key
|
||
parts = []
|
||
for item in version_body.split('_'):
|
||
try:
|
||
parts.append(int(item))
|
||
except ValueError:
|
||
parts.append(0)
|
||
return tuple(parts)
|
||
|
||
def _format_balance_version(self, version_key):
|
||
version_body = version_key[1:] if version_key.startswith('v') else version_key
|
||
return 'V' + version_body.replace('_', '.')
|
||
|
||
def _project_relpath(self, path):
|
||
try:
|
||
return os.path.relpath(path, PROJECT_ROOT).replace('\\', '/')
|
||
except ValueError:
|
||
return path.replace('\\', '/')
|
||
|
||
def _field_by_prefix(self, fieldnames, prefix):
|
||
prefix_lower = prefix.lower()
|
||
for field in fieldnames:
|
||
if field.lower().startswith(prefix_lower):
|
||
return field
|
||
return None
|
||
|
||
def _row_value(self, row, *keys):
|
||
for key in keys:
|
||
value = row.get(key)
|
||
if value is not None and value != '':
|
||
return value
|
||
return None
|
||
|
||
def _to_float(self, value):
|
||
if value is None or value == '':
|
||
return None
|
||
try:
|
||
return round(float(value), 4)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
def _to_int(self, value):
|
||
if value is None or value == '':
|
||
return None
|
||
try:
|
||
return int(float(value))
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
# ── OSS data helpers ──
|
||
|
||
def _get_oss_versions(self):
|
||
"""List available version folders in data/oss/."""
|
||
if not os.path.isdir(OSS_DATA_DIR):
|
||
return []
|
||
versions = []
|
||
for name in sorted(os.listdir(OSS_DATA_DIR)):
|
||
vdir = os.path.join(OSS_DATA_DIR, name)
|
||
if os.path.isdir(vdir):
|
||
# Count json files recursively
|
||
count = 0
|
||
for _root, _dirs, files in os.walk(vdir):
|
||
count += sum(1 for f in files if f.endswith('.json'))
|
||
versions.append({'version': name, 'matchCount': count})
|
||
return versions
|
||
|
||
def _handle_oss_matches(self):
|
||
"""List match summaries for selected versions.
|
||
Query params: versions=0.7.0,0.6.10&page=1&pageSize=20
|
||
Optional filters: unitType=1&giantType=0&level=0
|
||
"""
|
||
from urllib.parse import urlparse, parse_qs
|
||
parsed = urlparse(self.path)
|
||
params = parse_qs(parsed.query)
|
||
versions = params.get('versions', [''])[0].split(',')
|
||
versions = [v.strip() for v in versions if v.strip()]
|
||
page = int(params.get('page', ['1'])[0])
|
||
page_size = int(params.get('pageSize', ['20'])[0])
|
||
|
||
# Filters
|
||
filter_unit_type = params.get('unitType', [None])[0]
|
||
filter_giant_type = params.get('giantType', [None])[0]
|
||
filter_level = params.get('level', [None])[0]
|
||
|
||
if filter_unit_type is not None:
|
||
filter_unit_type = int(filter_unit_type)
|
||
if filter_giant_type is not None:
|
||
filter_giant_type = int(filter_giant_type)
|
||
if filter_level is not None:
|
||
filter_level = int(filter_level)
|
||
|
||
if not os.path.isdir(OSS_DATA_DIR):
|
||
self._send_json({'matches': [], 'total': 0, 'page': page, 'pageSize': page_size})
|
||
return
|
||
|
||
# Collect all match file paths
|
||
match_files = []
|
||
for ver in versions:
|
||
ver_dir = os.path.join(OSS_DATA_DIR, ver)
|
||
if not os.path.isdir(ver_dir):
|
||
continue
|
||
for root, dirs, files in os.walk(ver_dir):
|
||
for f in files:
|
||
if f.endswith('.json'):
|
||
match_files.append(os.path.join(root, f))
|
||
|
||
# Sort by filename (timestamp) descending
|
||
match_files.sort(key=lambda p: os.path.basename(p), reverse=True)
|
||
|
||
# Load summaries and apply filters
|
||
matches = []
|
||
for fpath in match_files:
|
||
try:
|
||
with open(fpath, 'r', encoding='utf-8-sig') as fp:
|
||
data = json.load(fp)
|
||
except Exception:
|
||
continue
|
||
|
||
# Apply unit filter: search addUnits AND damages (heroes are in damages, not addUnits)
|
||
if filter_unit_type is not None or filter_giant_type is not None or filter_level is not None:
|
||
found = False
|
||
# Check addUnits
|
||
for au in data.get('addUnits', []):
|
||
if filter_unit_type is not None and au.get('unitType') != filter_unit_type:
|
||
continue
|
||
if filter_giant_type is not None and au.get('giantType') != filter_giant_type:
|
||
continue
|
||
if filter_level is not None and au.get('level') != filter_level:
|
||
continue
|
||
found = True
|
||
break
|
||
# Check damages (origin and target) - heroes appear here even if not in addUnits
|
||
if not found:
|
||
for d in data.get('damages', []):
|
||
for prefix in ('origin', 'target'):
|
||
ut = d.get(prefix + 'UnitType')
|
||
gt = d.get(prefix + 'GiantType')
|
||
lv = d.get(prefix + 'Level')
|
||
if filter_unit_type is not None and ut != filter_unit_type:
|
||
continue
|
||
if filter_giant_type is not None and gt != filter_giant_type:
|
||
continue
|
||
if filter_level is not None and lv != filter_level:
|
||
continue
|
||
found = True
|
||
break
|
||
if found:
|
||
break
|
||
if not found:
|
||
continue
|
||
|
||
# Build summary
|
||
rel = os.path.relpath(fpath, OSS_DATA_DIR).replace('\\', '/')
|
||
parts = rel.split('/')
|
||
version = parts[0] if len(parts) > 0 else ''
|
||
steam_id = parts[1] if len(parts) > 1 else ''
|
||
timestamp = os.path.splitext(parts[-1])[0] if parts else ''
|
||
|
||
# Extract match stats
|
||
damages = data.get('damages', [])
|
||
add_units = data.get('addUnits', [])
|
||
game_ends = data.get('matchGameEnds', [])
|
||
player_ends = data.get('playerGameEnds', [])
|
||
max_turn = 0
|
||
if game_ends:
|
||
max_turn = max(e.get('turn', 0) for e in game_ends)
|
||
elif damages:
|
||
max_turn = max(d.get('turn', 0) for d in damages)
|
||
|
||
matches.append({
|
||
'file': rel,
|
||
'version': version,
|
||
'steamId': steam_id,
|
||
'timestamp': timestamp,
|
||
'memberId': data.get('memberId', 0),
|
||
'totalTurns': max_turn,
|
||
'damageCount': len(damages),
|
||
'unitCount': len(add_units),
|
||
'playerCount': len(player_ends),
|
||
})
|
||
|
||
total = len(matches)
|
||
start = (page - 1) * page_size
|
||
end = start + page_size
|
||
page_matches = matches[start:end]
|
||
|
||
self._send_json({
|
||
'matches': page_matches,
|
||
'total': total,
|
||
'page': page,
|
||
'pageSize': page_size,
|
||
})
|
||
|
||
def _handle_oss_match_detail(self):
|
||
"""Return full match data for a specific file.
|
||
Query: ?file=0.7.0/steamid/timestamp.json
|
||
"""
|
||
from urllib.parse import urlparse, parse_qs
|
||
parsed = urlparse(self.path)
|
||
params = parse_qs(parsed.query)
|
||
file_rel = params.get('file', [''])[0]
|
||
|
||
if not file_rel:
|
||
self._send_json({'error': 'file parameter required'}, 400)
|
||
return
|
||
|
||
fpath = os.path.normpath(os.path.join(OSS_DATA_DIR, file_rel))
|
||
# Security: ensure path is under OSS_DATA_DIR
|
||
if not fpath.startswith(os.path.normpath(OSS_DATA_DIR)):
|
||
self._send_json({'error': 'Invalid path'}, 400)
|
||
return
|
||
|
||
if not os.path.isfile(fpath):
|
||
self._send_json({'error': 'File not found'}, 404)
|
||
return
|
||
|
||
try:
|
||
with open(fpath, 'r', encoding='utf-8-sig') as fp:
|
||
data = json.load(fp)
|
||
self._send_json(data)
|
||
except Exception as e:
|
||
self._send_json({'error': str(e)}, 500)
|
||
|
||
def _handle_oss_hero_stats(self):
|
||
"""Aggregate per-hero stats across selected version matches.
|
||
Query: ?versions=0.7.0,0.6.10
|
||
Returns: { totalMatches, heroes: [{giantType, matchCount, totalDamage, totalKills, totalDeaths, ...}] }
|
||
"""
|
||
from urllib.parse import urlparse, parse_qs
|
||
parsed = urlparse(self.path)
|
||
params = parse_qs(parsed.query)
|
||
versions = params.get('versions', [''])[0].split(',')
|
||
versions = [v.strip() for v in versions if v.strip()]
|
||
|
||
if not os.path.isdir(OSS_DATA_DIR):
|
||
self._send_json({'totalMatches': 0, 'heroes': []})
|
||
return
|
||
|
||
# Collect match files
|
||
match_files = []
|
||
for ver in versions:
|
||
ver_dir = os.path.join(OSS_DATA_DIR, ver)
|
||
if not os.path.isdir(ver_dir):
|
||
continue
|
||
for root, dirs, files in os.walk(ver_dir):
|
||
for f in files:
|
||
if f.endswith('.json'):
|
||
match_files.append(os.path.join(root, f))
|
||
|
||
# All known hero giantTypes
|
||
HERO_IDS = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,31,36]
|
||
|
||
# Per-hero accumulators
|
||
hero_data = {}
|
||
for gid in HERO_IDS:
|
||
hero_data[gid] = {
|
||
'giantType': gid,
|
||
'matchCount': 0, # matches where this hero appeared
|
||
'totalDamageDealt': 0, # sum of originDamage across all matches
|
||
'totalDamageTaken': 0, # sum of targetDamage
|
||
'totalKills': 0, # times this hero killed someone
|
||
'totalDeaths': 0, # times this hero was killed
|
||
'totalAppear': 0, # addUnit entries for this hero
|
||
}
|
||
|
||
total_matches = 0
|
||
|
||
for fpath in match_files:
|
||
try:
|
||
with open(fpath, 'r', encoding='utf-8-sig') as fp:
|
||
data = json.load(fp)
|
||
except Exception:
|
||
continue
|
||
|
||
total_matches += 1
|
||
|
||
# Track which heroes appeared in this match (via addUnits or damages)
|
||
heroes_in_match = set()
|
||
|
||
# Scan addUnits
|
||
for au in data.get('addUnits', []):
|
||
gt = au.get('giantType', 0)
|
||
if gt in hero_data:
|
||
hero_data[gt]['totalAppear'] += 1
|
||
heroes_in_match.add(gt)
|
||
|
||
# Scan damages
|
||
for d in data.get('damages', []):
|
||
ogt = d.get('originGiantType', 0)
|
||
tgt = d.get('targetGiantType', 0)
|
||
|
||
if ogt in hero_data:
|
||
heroes_in_match.add(ogt)
|
||
hero_data[ogt]['totalDamageDealt'] += d.get('originDamage', 0)
|
||
if d.get('isKill'):
|
||
hero_data[ogt]['totalKills'] += 1
|
||
|
||
if tgt in hero_data:
|
||
heroes_in_match.add(tgt)
|
||
hero_data[tgt]['totalDamageTaken'] += d.get('targetDamage', 0)
|
||
if d.get('isKill'):
|
||
hero_data[tgt]['totalDeaths'] += 1
|
||
|
||
# Update matchCount for all heroes that appeared
|
||
for gid in heroes_in_match:
|
||
hero_data[gid]['matchCount'] += 1
|
||
|
||
# Build result with computed averages
|
||
heroes = []
|
||
for gid in HERO_IDS:
|
||
h = hero_data[gid]
|
||
mc = h['matchCount'] or 1
|
||
heroes.append({
|
||
'giantType': gid,
|
||
'matchCount': h['matchCount'],
|
||
'appearRate': round(h['matchCount'] / max(total_matches, 1) * 100, 1),
|
||
'avgDamage': round(h['totalDamageDealt'] / mc, 1),
|
||
'avgDeaths': round(h['totalDeaths'] / mc, 2),
|
||
'avgKills': round(h['totalKills'] / mc, 2),
|
||
'totalAppear': h['totalAppear'],
|
||
})
|
||
|
||
self._send_json({
|
||
'totalMatches': total_matches,
|
||
'heroes': heroes,
|
||
})
|
||
|
||
def handle_refresh(self):
|
||
"""Run export_data.py and return the result as JSON."""
|
||
try:
|
||
result = subprocess.run(
|
||
[sys.executable, EXPORT_SCRIPT],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60,
|
||
cwd=SCRIPT_DIR,
|
||
)
|
||
|
||
response = {
|
||
'success': result.returncode == 0,
|
||
'stdout': result.stdout,
|
||
'stderr': result.stderr,
|
||
'returncode': result.returncode,
|
||
}
|
||
status = 200 if result.returncode == 0 else 500
|
||
|
||
except subprocess.TimeoutExpired:
|
||
response = {'success': False, 'error': 'Script timed out after 60s'}
|
||
status = 504
|
||
except Exception as e:
|
||
response = {'success': False, 'error': str(e)}
|
||
status = 500
|
||
|
||
self._send_json(response, status)
|
||
|
||
def handle_sentiment_create(self):
|
||
"""Create a new sentiment record from multipart form data."""
|
||
try:
|
||
content_type = self.headers.get('Content-Type', '')
|
||
content_length = int(self.headers.get('Content-Length', 0))
|
||
|
||
if 'multipart/form-data' in content_type:
|
||
fields, uploaded_files = parse_multipart(
|
||
self.rfile, content_type, content_length
|
||
)
|
||
else:
|
||
# JSON body fallback
|
||
raw = self.rfile.read(content_length)
|
||
fields = json.loads(raw.decode('utf-8'))
|
||
uploaded_files = []
|
||
|
||
title = fields.get('title', '').strip()
|
||
source_type = fields.get('sourceType', 'file').strip()
|
||
summary = fields.get('summary', '').strip()
|
||
source = fields.get('source', '').strip()
|
||
content_text = fields.get('content', '').strip()
|
||
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title is required'}, 400)
|
||
return
|
||
|
||
# Create record
|
||
record_id = time.strftime('%Y%m%d_%H%M%S') + '_' + uuid.uuid4().hex[:6]
|
||
record_dir = os.path.join(SENTIMENT_DIR, record_id)
|
||
os.makedirs(record_dir, exist_ok=True)
|
||
|
||
record = {
|
||
'id': record_id,
|
||
'title': title,
|
||
'timestamp': int(time.time() * 1000),
|
||
'sourceType': source_type,
|
||
'source': source,
|
||
'summary': summary,
|
||
'analyzed': False,
|
||
'files': [],
|
||
}
|
||
|
||
# Handle file uploads
|
||
for f in uploaded_files:
|
||
if f['filename']:
|
||
safe_name = os.path.basename(f['filename'])
|
||
filepath = os.path.join(record_dir, safe_name)
|
||
with open(filepath, 'wb') as out:
|
||
out.write(f['data'])
|
||
record['files'].append(safe_name)
|
||
|
||
# Handle text content
|
||
if content_text:
|
||
with open(os.path.join(record_dir, 'content.txt'), 'w', encoding='utf-8') as f:
|
||
f.write(content_text)
|
||
|
||
# Handle URL source - save URL to content.txt as well
|
||
if source_type == 'url' and source:
|
||
with open(os.path.join(record_dir, 'content.txt'), 'w', encoding='utf-8') as f:
|
||
f.write(f'来源链接: {source}\n')
|
||
|
||
# Save per-record meta
|
||
with open(os.path.join(record_dir, 'meta.json'), 'w', encoding='utf-8') as f:
|
||
json.dump(record, f, ensure_ascii=False, indent=2)
|
||
|
||
# Update index
|
||
records = _load_sentiment_index()
|
||
records.append(record)
|
||
_save_sentiment_index(records)
|
||
|
||
self._send_json({'success': True, 'id': record_id})
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_sentiment_delete(self):
|
||
"""Delete a sentiment record by id."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
record_id = data.get('id', '')
|
||
|
||
if not record_id:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
records = _load_sentiment_index()
|
||
new_records = [r for r in records if r['id'] != record_id]
|
||
|
||
if len(new_records) == len(records):
|
||
self._send_json({'success': False, 'error': 'Record not found'}, 404)
|
||
return
|
||
|
||
# Remove directory
|
||
record_dir = os.path.join(SENTIMENT_DIR, record_id)
|
||
if os.path.isdir(record_dir):
|
||
shutil.rmtree(record_dir)
|
||
|
||
_save_sentiment_index(new_records)
|
||
self._send_json({'success': True})
|
||
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_bugs_create(self):
|
||
"""Create a new bug record."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
title = data.get('title', '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title is required'}, 400)
|
||
return
|
||
|
||
bugs_data = _load_bugs()
|
||
bug_id = bugs_data['nextId']
|
||
now = int(time.time() * 1000)
|
||
|
||
bug = {
|
||
'id': bug_id,
|
||
'title': title,
|
||
'description': data.get('description', '').strip(),
|
||
'status': 'open',
|
||
'priority': data.get('priority', 'medium'),
|
||
'module': data.get('module', ''),
|
||
'longTerm': False,
|
||
'createdAt': now,
|
||
'updatedAt': now,
|
||
}
|
||
|
||
bugs_data['bugs'].append(bug)
|
||
bugs_data['nextId'] = bug_id + 1
|
||
_save_bugs(bugs_data)
|
||
|
||
self._send_json({'success': True, 'id': bug_id})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_bugs_update(self):
|
||
"""Update a bug's fields (status, priority, etc.)."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
bug_id = data.get('id')
|
||
if bug_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
bugs_data = _load_bugs()
|
||
bug = next((b for b in bugs_data['bugs'] if b['id'] == bug_id), None)
|
||
if not bug:
|
||
self._send_json({'success': False, 'error': 'Bug not found'}, 404)
|
||
return
|
||
|
||
# Update allowed fields
|
||
for field in ('status', 'priority', 'title', 'description', 'module', 'longTerm'):
|
||
if field in data:
|
||
bug[field] = data[field]
|
||
|
||
bug['updatedAt'] = int(time.time() * 1000)
|
||
_save_bugs(bugs_data)
|
||
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_bugs_delete(self):
|
||
"""Delete a bug by id."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
bug_id = data.get('id')
|
||
if bug_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
bugs_data = _load_bugs()
|
||
original_len = len(bugs_data['bugs'])
|
||
bugs_data['bugs'] = [b for b in bugs_data['bugs'] if b['id'] != bug_id]
|
||
|
||
if len(bugs_data['bugs']) == original_len:
|
||
self._send_json({'success': False, 'error': 'Bug not found'}, 404)
|
||
return
|
||
|
||
_save_bugs(bugs_data)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_todos_create(self):
|
||
"""Create a new todo record."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
title = data.get('title', '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title is required'}, 400)
|
||
return
|
||
|
||
todos_data = _load_todos()
|
||
todo_id = todos_data['nextId']
|
||
now = int(time.time() * 1000)
|
||
|
||
todo = {
|
||
'id': todo_id,
|
||
'title': title,
|
||
'description': data.get('description', '').strip(),
|
||
'status': 'open',
|
||
'priority': data.get('priority', 'medium'),
|
||
'module': data.get('module', ''),
|
||
'longTerm': False,
|
||
'createdAt': now,
|
||
'updatedAt': now,
|
||
}
|
||
|
||
todos_data['todos'].append(todo)
|
||
todos_data['nextId'] = todo_id + 1
|
||
_save_todos(todos_data)
|
||
|
||
self._send_json({'success': True, 'id': todo_id})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_todos_update(self):
|
||
"""Update a todo's fields."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
todo_id = data.get('id')
|
||
if todo_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
todos_data = _load_todos()
|
||
todo = next((t for t in todos_data['todos'] if t['id'] == todo_id), None)
|
||
if not todo:
|
||
self._send_json({'success': False, 'error': 'Todo not found'}, 404)
|
||
return
|
||
|
||
for field in ('status', 'priority', 'title', 'description', 'module', 'longTerm'):
|
||
if field in data:
|
||
todo[field] = data[field]
|
||
|
||
todo['updatedAt'] = int(time.time() * 1000)
|
||
_save_todos(todos_data)
|
||
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_todos_delete(self):
|
||
"""Delete a todo by id."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
todo_id = data.get('id')
|
||
if todo_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
todos_data = _load_todos()
|
||
original_len = len(todos_data['todos'])
|
||
todos_data['todos'] = [t for t in todos_data['todos'] if t['id'] != todo_id]
|
||
|
||
if len(todos_data['todos']) == original_len:
|
||
self._send_json({'success': False, 'error': 'Todo not found'}, 404)
|
||
return
|
||
|
||
_save_todos(todos_data)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_suggestions_create(self):
|
||
"""Create a new suggestion record."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
title = data.get('title', '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title is required'}, 400)
|
||
return
|
||
|
||
suggestions_data = _load_suggestions()
|
||
suggestion_id = suggestions_data['nextId']
|
||
now = int(time.time() * 1000)
|
||
|
||
suggestion = {
|
||
'id': suggestion_id,
|
||
'title': title,
|
||
'description': data.get('description', '').strip(),
|
||
'status': 'open',
|
||
'module': data.get('module', ''),
|
||
'createdAt': now,
|
||
'updatedAt': now,
|
||
}
|
||
|
||
suggestions_data['suggestions'].append(suggestion)
|
||
suggestions_data['nextId'] = suggestion_id + 1
|
||
_save_suggestions(suggestions_data)
|
||
|
||
self._send_json({'success': True, 'id': suggestion_id})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_suggestions_update(self):
|
||
"""Update a suggestion's fields."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
suggestion_id = data.get('id')
|
||
if suggestion_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
suggestions_data = _load_suggestions()
|
||
suggestion = next((s for s in suggestions_data['suggestions'] if s['id'] == suggestion_id), None)
|
||
if not suggestion:
|
||
self._send_json({'success': False, 'error': 'Suggestion not found'}, 404)
|
||
return
|
||
|
||
for field in ('status', 'title', 'description', 'module'):
|
||
if field in data:
|
||
suggestion[field] = data[field]
|
||
|
||
suggestion['updatedAt'] = int(time.time() * 1000)
|
||
_save_suggestions(suggestions_data)
|
||
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_suggestions_delete(self):
|
||
"""Delete a suggestion by id."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
suggestion_id = data.get('id')
|
||
if suggestion_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
suggestions_data = _load_suggestions()
|
||
original_len = len(suggestions_data['suggestions'])
|
||
suggestions_data['suggestions'] = [s for s in suggestions_data['suggestions'] if s['id'] != suggestion_id]
|
||
|
||
if len(suggestions_data['suggestions']) == original_len:
|
||
self._send_json({'success': False, 'error': 'Suggestion not found'}, 404)
|
||
return
|
||
|
||
_save_suggestions(suggestions_data)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_core_feedback_update(self):
|
||
"""Update a feedback item inside a core player feedback collection."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
collection_id = data.get('collectionId')
|
||
feedback_id = data.get('feedbackId')
|
||
if not collection_id or not feedback_id:
|
||
self._send_json({'success': False, 'error': 'collectionId and feedbackId are required'}, 400)
|
||
return
|
||
|
||
status = data.get('status')
|
||
if status is not None and status not in ('待处理', '已处理', '搁置'):
|
||
self._send_json({'success': False, 'error': 'Invalid status'}, 400)
|
||
return
|
||
|
||
feedback_data = _load_core_feedback()
|
||
collections = feedback_data.get('collections', [])
|
||
collection = next((c for c in collections if c.get('id') == collection_id), None)
|
||
if not collection:
|
||
self._send_json({'success': False, 'error': 'Feedback collection not found'}, 404)
|
||
return
|
||
|
||
items = collection.get('feedback', [])
|
||
item = next((f for f in items if f.get('id') == feedback_id), None)
|
||
if not item:
|
||
self._send_json({'success': False, 'error': 'Feedback item not found'}, 404)
|
||
return
|
||
|
||
for field in ('desc', 'author', 'status', 'solution'):
|
||
if field in data:
|
||
item[field] = data[field]
|
||
item['updatedAt'] = int(time.time() * 1000)
|
||
collection['updatedAt'] = item['updatedAt']
|
||
|
||
_save_core_feedback(feedback_data)
|
||
self._send_json({'success': True, 'item': item})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_official_replies_update(self):
|
||
"""Update a reply item inside an official reply collection."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
collection_id = data.get('collectionId')
|
||
reply_id = data.get('replyId')
|
||
if not collection_id or not reply_id:
|
||
self._send_json({'success': False, 'error': 'collectionId and replyId are required'}, 400)
|
||
return
|
||
|
||
status = data.get('status')
|
||
if status is not None and status not in ('未完成', '已完成'):
|
||
self._send_json({'success': False, 'error': 'Invalid status'}, 400)
|
||
return
|
||
|
||
replies_data = _load_official_replies()
|
||
collections = replies_data.get('collections', [])
|
||
collection = next((c for c in collections if c.get('id') == collection_id), None)
|
||
if not collection:
|
||
self._send_json({'success': False, 'error': 'Official reply collection not found'}, 404)
|
||
return
|
||
|
||
items = collection.get('replies', [])
|
||
item = next((f for f in items if f.get('id') == reply_id), None)
|
||
if not item:
|
||
self._send_json({'success': False, 'error': 'Official reply item not found'}, 404)
|
||
return
|
||
|
||
for field in ('question', 'answer', 'status'):
|
||
if field in data:
|
||
item[field] = data[field]
|
||
item['updatedAt'] = int(time.time() * 1000)
|
||
collection['updatedAt'] = item['updatedAt']
|
||
|
||
_save_official_replies(replies_data)
|
||
self._send_json({'success': True, 'item': item})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_raw_feedback_create_set(self):
|
||
"""Create a raw feedback set."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
title = (data.get('title') or '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'title is required'}, 400)
|
||
return
|
||
|
||
source = (data.get('source') or '').strip()
|
||
description = (data.get('description') or '').strip()
|
||
now_ms = int(time.time() * 1000)
|
||
set_id_base = _slugify_feedback_id(title, 'raw-feedback-set')
|
||
|
||
raw_data = _load_raw_feedback()
|
||
existing_ids = {s.get('id') for s in raw_data.get('sets', [])}
|
||
set_id = f'{set_id_base}-{time.strftime("%Y%m%d-%H%M%S", time.localtime())}'
|
||
while set_id in existing_ids:
|
||
set_id = f'{set_id_base}-{uuid.uuid4().hex[:8]}'
|
||
|
||
item = {
|
||
'id': set_id,
|
||
'title': title,
|
||
'source': source,
|
||
'description': description,
|
||
'createdAt': now_ms,
|
||
'updatedAt': now_ms,
|
||
'nextId': 1,
|
||
'items': []
|
||
}
|
||
raw_data.setdefault('sets', []).append(item)
|
||
_save_raw_feedback(raw_data)
|
||
self._send_json({'success': True, 'set': item})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_raw_feedback_create_item(self):
|
||
"""Create an item inside a raw feedback set."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
set_id = data.get('setId')
|
||
content = (data.get('content') or '').strip()
|
||
if not set_id or not content:
|
||
self._send_json({'success': False, 'error': 'setId and content are required'}, 400)
|
||
return
|
||
|
||
source = (data.get('source') or '').strip()
|
||
author = (data.get('author') or '').strip()
|
||
source_time = (data.get('sourceTime') or '').strip()
|
||
tags = data.get('tags') or []
|
||
if isinstance(tags, str):
|
||
tags = [t.strip() for t in tags.replace(',', ',').split(',') if t.strip()]
|
||
elif not isinstance(tags, list):
|
||
tags = []
|
||
|
||
raw_data = _load_raw_feedback()
|
||
target = next((s for s in raw_data.get('sets', []) if s.get('id') == set_id), None)
|
||
if not target:
|
||
self._send_json({'success': False, 'error': 'Raw feedback set not found'}, 404)
|
||
return
|
||
|
||
next_id = int(target.get('nextId', 1))
|
||
now_ms = int(time.time() * 1000)
|
||
item = {
|
||
'id': f'raw-{next_id:04d}',
|
||
'content': content,
|
||
'source': source,
|
||
'author': author,
|
||
'sourceTime': source_time,
|
||
'tags': tags,
|
||
'createdAt': now_ms,
|
||
'updatedAt': now_ms
|
||
}
|
||
target.setdefault('items', []).append(item)
|
||
target['nextId'] = next_id + 1
|
||
target['updatedAt'] = now_ms
|
||
|
||
_save_raw_feedback(raw_data)
|
||
self._send_json({'success': True, 'item': item, 'set': target})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_mkt_todo_create(self):
|
||
"""Create a new marketing TODO item."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
title = data.get('title', '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title is required'}, 400)
|
||
return
|
||
|
||
todos_data = _load_mkt_todos()
|
||
todo_id = todos_data['nextId']
|
||
now = int(time.time() * 1000)
|
||
|
||
todo = {
|
||
'id': todo_id,
|
||
'title': title,
|
||
'done': False,
|
||
'createdAt': now,
|
||
}
|
||
|
||
todos_data['todos'].append(todo)
|
||
todos_data['nextId'] = todo_id + 1
|
||
_save_mkt_todos(todos_data)
|
||
|
||
self._send_json({'success': True, 'id': todo_id})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_mkt_todo_toggle(self):
|
||
"""Toggle a TODO item's done state."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
todo_id = data.get('id')
|
||
if todo_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
todos_data = _load_mkt_todos()
|
||
todo = next((t for t in todos_data['todos'] if t['id'] == todo_id), None)
|
||
if not todo:
|
||
self._send_json({'success': False, 'error': 'TODO not found'}, 404)
|
||
return
|
||
|
||
todo['done'] = not todo['done']
|
||
if todo['done']:
|
||
todo['doneAt'] = int(time.time() * 1000)
|
||
else:
|
||
todo.pop('doneAt', None)
|
||
|
||
_save_mkt_todos(todos_data)
|
||
self._send_json({'success': True, 'done': todo['done']})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_mkt_todo_delete(self):
|
||
"""Delete a TODO item."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
todo_id = data.get('id')
|
||
if todo_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
todos_data = _load_mkt_todos()
|
||
original_len = len(todos_data['todos'])
|
||
todos_data['todos'] = [t for t in todos_data['todos'] if t['id'] != todo_id]
|
||
|
||
if len(todos_data['todos']) == original_len:
|
||
self._send_json({'success': False, 'error': 'TODO not found'}, 404)
|
||
return
|
||
|
||
_save_mkt_todos(todos_data)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_devplan_update(self):
|
||
"""Update a task's description or other fields."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
task_id = data.get('id')
|
||
if task_id is None:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
tasks = _load_devplan()
|
||
task = next((t for t in tasks if t['id'] == task_id), None)
|
||
if not task:
|
||
self._send_json({'success': False, 'error': 'Task not found'}, 404)
|
||
return
|
||
|
||
for field in ('description', 'title', 'status', 'priority'):
|
||
if field in data:
|
||
task[field] = data[field]
|
||
|
||
_save_devplan(tasks)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_devplan_subtask_create(self):
|
||
"""Create a subtask under a task."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
task_id = data.get('taskId')
|
||
title = data.get('title', '').strip()
|
||
if task_id is None or not title:
|
||
self._send_json({'success': False, 'error': 'taskId and title required'}, 400)
|
||
return
|
||
|
||
tasks = _load_devplan()
|
||
task = next((t for t in tasks if t['id'] == task_id), None)
|
||
if not task:
|
||
self._send_json({'success': False, 'error': 'Task not found'}, 404)
|
||
return
|
||
|
||
if 'subtasks' not in task:
|
||
task['subtasks'] = []
|
||
task['subtaskNextId'] = 1
|
||
|
||
st_id = task.get('subtaskNextId', 1)
|
||
subtask = {
|
||
'id': st_id,
|
||
'title': title,
|
||
'done': False,
|
||
'createdAt': int(time.time() * 1000),
|
||
}
|
||
task['subtasks'].append(subtask)
|
||
task['subtaskNextId'] = st_id + 1
|
||
|
||
_save_devplan(tasks)
|
||
self._send_json({'success': True, 'id': st_id})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_devplan_subtask_toggle(self):
|
||
"""Toggle a subtask's done state."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
task_id = data.get('taskId')
|
||
st_id = data.get('subtaskId')
|
||
if task_id is None or st_id is None:
|
||
self._send_json({'success': False, 'error': 'taskId and subtaskId required'}, 400)
|
||
return
|
||
|
||
tasks = _load_devplan()
|
||
task = next((t for t in tasks if t['id'] == task_id), None)
|
||
if not task:
|
||
self._send_json({'success': False, 'error': 'Task not found'}, 404)
|
||
return
|
||
|
||
subtask = next((s for s in task.get('subtasks', []) if s['id'] == st_id), None)
|
||
if not subtask:
|
||
self._send_json({'success': False, 'error': 'Subtask not found'}, 404)
|
||
return
|
||
|
||
subtask['done'] = not subtask['done']
|
||
if subtask['done']:
|
||
subtask['doneAt'] = int(time.time() * 1000)
|
||
else:
|
||
subtask.pop('doneAt', None)
|
||
|
||
_save_devplan(tasks)
|
||
self._send_json({'success': True, 'done': subtask['done']})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_devplan_subtask_delete(self):
|
||
"""Delete a subtask."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
task_id = data.get('taskId')
|
||
st_id = data.get('subtaskId')
|
||
if task_id is None or st_id is None:
|
||
self._send_json({'success': False, 'error': 'taskId and subtaskId required'}, 400)
|
||
return
|
||
|
||
tasks = _load_devplan()
|
||
task = next((t for t in tasks if t['id'] == task_id), None)
|
||
if not task:
|
||
self._send_json({'success': False, 'error': 'Task not found'}, 404)
|
||
return
|
||
|
||
subs = task.get('subtasks', [])
|
||
original_len = len(subs)
|
||
task['subtasks'] = [s for s in subs if s['id'] != st_id]
|
||
|
||
if len(task['subtasks']) == original_len:
|
||
self._send_json({'success': False, 'error': 'Subtask not found'}, 404)
|
||
return
|
||
|
||
_save_devplan(tasks)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_marketing_save_note(self):
|
||
"""Save a marketing event note."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
event_id = data.get('id', '')
|
||
note = data.get('note', '')
|
||
|
||
if not event_id:
|
||
self._send_json({'success': False, 'error': 'ID is required'}, 400)
|
||
return
|
||
|
||
os.makedirs(MARKETING_DIR, exist_ok=True)
|
||
notes = {}
|
||
if os.path.exists(MARKETING_NOTES):
|
||
with open(MARKETING_NOTES, 'r', encoding='utf-8') as f:
|
||
notes = json.load(f)
|
||
|
||
notes[event_id] = note
|
||
with open(MARKETING_NOTES, 'w', encoding='utf-8') as f:
|
||
json.dump(notes, f, ensure_ascii=False, indent=2)
|
||
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def handle_marketing_create_event(self):
|
||
"""Create a new marketing event."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
title = data.get('title', '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title is required'}, 400)
|
||
return
|
||
|
||
os.makedirs(MARKETING_DIR, exist_ok=True)
|
||
|
||
# Load existing events
|
||
events = []
|
||
if os.path.exists(MARKETING_EVENTS):
|
||
with open(MARKETING_EVENTS, 'r', encoding='utf-8') as f:
|
||
events = json.load(f)
|
||
|
||
# Generate next ID
|
||
max_num = 0
|
||
for ev in events:
|
||
digits = ''.join(c for c in ev.get('id', '') if c.isdigit())
|
||
if digits:
|
||
max_num = max(max_num, int(digits))
|
||
next_id = 'ev' + str(max_num + 1).zfill(2)
|
||
|
||
new_event = {
|
||
'id': next_id,
|
||
'title': title,
|
||
'date': data.get('date', ''),
|
||
'endDate': data.get('endDate', ''),
|
||
'category': data.get('category', 'video'),
|
||
'platform': data.get('platform', []),
|
||
'tags': data.get('tags', []),
|
||
'status': data.get('status', 'pending'),
|
||
'folder': data.get('folder', ''),
|
||
'summary': data.get('summary', ''),
|
||
'assets': [],
|
||
'notes': '',
|
||
}
|
||
|
||
# Remove empty endDate
|
||
if not new_event['endDate']:
|
||
del new_event['endDate']
|
||
|
||
events.append(new_event)
|
||
|
||
with open(MARKETING_EVENTS, 'w', encoding='utf-8') as f:
|
||
json.dump(events, f, ensure_ascii=False, indent=2)
|
||
|
||
self._send_json({'success': True, 'event': new_event})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
# ── SNS API handlers ──
|
||
|
||
def _get_sns_platform(self, path):
|
||
"""Extract platform name from /api/sns/{platform}..."""
|
||
parts = urlparse(path).path.split('/')
|
||
if len(parts) >= 4:
|
||
return parts[3]
|
||
return None
|
||
|
||
def _get_sns_file(self, platform):
|
||
"""Get path to the platform's JSON file."""
|
||
return os.path.join(SNS_DIR, f'{platform}.json')
|
||
|
||
def _load_sns_data(self, platform):
|
||
"""Load a platform's SNS data."""
|
||
path = self._get_sns_file(platform)
|
||
if os.path.exists(path):
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
# Return default structure
|
||
names = {
|
||
'xiaoheihe': '小黑盒',
|
||
'steam': 'Steam商店',
|
||
'bilibili': 'Bilibili',
|
||
'twitter': 'Twitter',
|
||
'qq': 'Q群',
|
||
'discord': 'Discord'
|
||
}
|
||
return {
|
||
'nextId': 1,
|
||
'platform': platform,
|
||
'name': names.get(platform, platform),
|
||
'comments': []
|
||
}
|
||
|
||
def _save_sns_data(self, platform, data):
|
||
"""Save a platform's SNS data."""
|
||
os.makedirs(SNS_DIR, exist_ok=True)
|
||
path = self._get_sns_file(platform)
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
def _handle_sns_get(self):
|
||
"""Handle GET /api/sns/{platform}"""
|
||
platform = self._get_sns_platform(self.path)
|
||
if not platform:
|
||
self._send_json({'error': 'Platform required'}, 400)
|
||
return
|
||
data = self._load_sns_data(platform)
|
||
self._send_json(data)
|
||
|
||
def _handle_sns_post(self):
|
||
"""Handle POST /api/sns/{platform}/update and /delete"""
|
||
path = self.path
|
||
platform = self._get_sns_platform(path)
|
||
if not platform:
|
||
self._send_json({'error': 'Platform required'}, 400)
|
||
return
|
||
|
||
if path.endswith('/update'):
|
||
self._handle_sns_update(platform)
|
||
elif path.endswith('/delete'):
|
||
self._handle_sns_delete(platform)
|
||
elif path.endswith('/paste'):
|
||
self._handle_sns_paste(platform)
|
||
elif path.endswith('/fetch'):
|
||
self._handle_sns_fetch(platform)
|
||
elif path.endswith('/bulk-create'):
|
||
self._handle_sns_bulk_create(platform)
|
||
else:
|
||
self._send_json({'error': 'Unknown action'}, 404)
|
||
|
||
def _handle_sns_update(self, platform):
|
||
"""Update (create or modify) a comment."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
comment = json.loads(raw.decode('utf-8'))
|
||
|
||
data = self._load_sns_data(platform)
|
||
comments = data.get('comments', [])
|
||
|
||
# Find existing comment
|
||
existing = next((c for c in comments if c.get('id') == comment.get('id')), None)
|
||
if existing:
|
||
# Update
|
||
existing.update(comment)
|
||
existing['updatedAt'] = int(time.time() * 1000)
|
||
else:
|
||
# Create new
|
||
if 'id' not in comment:
|
||
comment['id'] = data.get('nextId', 1)
|
||
data['nextId'] = comment['id'] + 1
|
||
comment['createdAt'] = int(time.time() * 1000)
|
||
comment['updatedAt'] = comment['createdAt']
|
||
if 'status' not in comment:
|
||
comment['status'] = 'pending'
|
||
comments.append(comment)
|
||
|
||
data['comments'] = comments
|
||
self._save_sns_data(platform, data)
|
||
self._send_json({'success': True, 'comment': comment})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def _handle_sns_delete(self, platform):
|
||
"""Delete a comment."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
comment_id = data.get('id')
|
||
|
||
if comment_id is None:
|
||
self._send_json({'success': False, 'error': 'ID required'}, 400)
|
||
return
|
||
|
||
sns_data = self._load_sns_data(platform)
|
||
comments = sns_data.get('comments', [])
|
||
original_len = len(comments)
|
||
sns_data['comments'] = [c for c in comments if c.get('id') != comment_id]
|
||
|
||
if len(sns_data['comments']) == original_len:
|
||
self._send_json({'success': False, 'error': 'Comment not found'}, 404)
|
||
return
|
||
|
||
self._save_sns_data(platform, sns_data)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def _handle_sns_paste(self, platform):
|
||
"""Create a manually pasted SNS record."""
|
||
if platform != 'discord':
|
||
self._send_json({'success': False, 'error': 'Paste import is only supported for Discord'}, 400)
|
||
return
|
||
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
payload = json.loads(raw.decode('utf-8'))
|
||
|
||
title = (payload.get('title') or '').strip()
|
||
channel = (payload.get('channel') or '').strip()
|
||
note = (payload.get('note') or '').strip()
|
||
raw_text = (payload.get('rawText') or '').strip()
|
||
|
||
if not title:
|
||
self._send_json({'success': False, 'error': 'Title required'}, 400)
|
||
return
|
||
if not raw_text:
|
||
self._send_json({'success': False, 'error': 'Discord raw text required'}, 400)
|
||
return
|
||
|
||
parsed_messages = self._parse_discord_paste(raw_text)
|
||
speakers = []
|
||
for msg in parsed_messages:
|
||
author = msg.get('author')
|
||
if author and author not in speakers:
|
||
speakers.append(author)
|
||
|
||
sns_data = self._load_sns_data(platform)
|
||
comments = sns_data.get('comments', [])
|
||
next_id = sns_data.get('nextId', 1)
|
||
now = int(time.time() * 1000)
|
||
|
||
comment = {
|
||
'id': next_id,
|
||
'title': title,
|
||
'status': 'pending',
|
||
'source': 'discord-paste',
|
||
'channel': channel,
|
||
'note': note,
|
||
'rawText': raw_text,
|
||
'parsedMessages': parsed_messages,
|
||
'messageCount': len(parsed_messages) if parsed_messages else self._count_discord_paste_lines(raw_text),
|
||
'speakers': speakers,
|
||
'timestamp': now,
|
||
'createdAt': now,
|
||
'updatedAt': now
|
||
}
|
||
|
||
comments.append(comment)
|
||
sns_data['comments'] = comments
|
||
sns_data['nextId'] = next_id + 1
|
||
self._save_sns_data(platform, sns_data)
|
||
self._send_json({'success': True, 'comment': comment})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def _parse_discord_paste(self, raw_text):
|
||
"""Best-effort parser for copied Discord channel text."""
|
||
import re
|
||
|
||
lines = [line.rstrip() for line in raw_text.replace('\r\n', '\n').replace('\r', '\n').split('\n')]
|
||
messages = []
|
||
current = None
|
||
|
||
patterns = [
|
||
re.compile(r'^(?P<author>.+?)\s+(?P<date>\d{4}/\d{1,2}/\d{1,2})\s+(?P<time>\d{1,2}:\d{2})(?:\s*(?P<content>.*))?$'),
|
||
re.compile(r'^(?P<author>.+?)\s+(?P<date>\d{4}-\d{1,2}-\d{1,2})\s+(?P<time>\d{1,2}:\d{2})(?:\s*(?P<content>.*))?$'),
|
||
re.compile(r'^\[(?P<date>\d{4}-\d{1,2}-\d{1,2})\s+(?P<time>\d{1,2}:\d{2}(?::\d{2})?)\]\s*(?P<author>[^:]+):\s*(?P<content>.*)$'),
|
||
re.compile(r'^(?P<date>\d{4}-\d{1,2}-\d{1,2})\s+(?P<time>\d{1,2}:\d{2}(?::\d{2})?)\s*\|\s*(?P<author>[^:]+):\s*(?P<content>.*)$'),
|
||
]
|
||
|
||
def push_current():
|
||
if not current:
|
||
return
|
||
content = '\n'.join(current.get('contentLines', [])).strip()
|
||
messages.append({
|
||
'author': current.get('author', '').strip(),
|
||
'date': current.get('date', '').strip(),
|
||
'time': current.get('time', '').strip(),
|
||
'content': content
|
||
})
|
||
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if not stripped:
|
||
if current and current.get('contentLines'):
|
||
current['contentLines'].append('')
|
||
continue
|
||
|
||
match = None
|
||
for pattern in patterns:
|
||
match = pattern.match(stripped)
|
||
if match:
|
||
break
|
||
|
||
if match:
|
||
push_current()
|
||
groups = match.groupdict()
|
||
content = groups.get('content') or ''
|
||
current = {
|
||
'author': groups.get('author') or '',
|
||
'date': groups.get('date') or '',
|
||
'time': groups.get('time') or '',
|
||
'contentLines': [content] if content else []
|
||
}
|
||
elif current:
|
||
current['contentLines'].append(line)
|
||
else:
|
||
current = {
|
||
'author': '',
|
||
'date': '',
|
||
'time': '',
|
||
'contentLines': [line]
|
||
}
|
||
|
||
push_current()
|
||
|
||
return [
|
||
msg for msg in messages
|
||
if msg.get('author') or msg.get('content')
|
||
]
|
||
|
||
def _count_discord_paste_lines(self, raw_text):
|
||
lines = [line.strip() for line in raw_text.splitlines()]
|
||
return len([line for line in lines if line])
|
||
|
||
def _handle_sns_fetch(self, platform):
|
||
"""Fetch new discussions from Steam Community."""
|
||
if platform != 'steam':
|
||
self._send_json({'error': 'Only steam platform supported'}, 400)
|
||
return
|
||
|
||
try:
|
||
# Steam Community URL for the app
|
||
url = 'https://steamcommunity.com/app/3774440/discussions/'
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Cookie': 'Steam_Language=schinese',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1'
|
||
}
|
||
|
||
req = urllib.request.Request(url, headers=headers)
|
||
|
||
# Try with retry logic
|
||
max_retries = 3
|
||
last_error = None
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
# Use TLS 1.2+ and disable SSL verification for better compatibility
|
||
import ssl
|
||
ctx = ssl.create_default_context()
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
|
||
with urllib.request.urlopen(req, timeout=30, context=ctx) as response:
|
||
html = response.read().decode('utf-8')
|
||
|
||
# Parse discussions from HTML
|
||
discussions = self._parse_steam_discussions(html)
|
||
|
||
# Limit to reasonable amount
|
||
discussions = discussions[:50]
|
||
|
||
# Load existing data
|
||
existing_data = self._load_sns_data('steam')
|
||
existing_titles = {c.get('title', '') for c in existing_data.get('comments', [])}
|
||
|
||
# Filter out already existing posts
|
||
new_discussions = []
|
||
for d in discussions:
|
||
new_discussions.append({
|
||
'id': d.get('id', str(int(time.time() * 1000))),
|
||
'title': d.get('title', ''),
|
||
'author': d.get('author', ''),
|
||
'url': d.get('url', ''),
|
||
'replies': d.get('replies', 0),
|
||
'timestamp': int(time.time() * 1000)
|
||
})
|
||
|
||
self._send_json({
|
||
'success': True,
|
||
'discussions': new_discussions,
|
||
'count': len(new_discussions)
|
||
})
|
||
return
|
||
|
||
except urllib.error.URLError as ue:
|
||
last_error = ue
|
||
if attempt < max_retries - 1:
|
||
time.sleep(2) # Wait before retry
|
||
continue
|
||
raise
|
||
|
||
except urllib.error.URLError as e:
|
||
# Provide fallback demo data if Steam is unreachable
|
||
import traceback
|
||
traceback.print_exc()
|
||
self._send_json({
|
||
'success': True,
|
||
'discussions': self._get_steam_fallback_data(),
|
||
'count': 3,
|
||
'warning': '无法连接到Steam社区,已返回示例数据。请检查网络连接。'
|
||
})
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
self._send_json({
|
||
'success': True,
|
||
'discussions': self._get_steam_fallback_data(),
|
||
'count': 3,
|
||
'warning': f'获取Steam数据失败: {str(e)[:50]},已返回示例数据'
|
||
})
|
||
|
||
def _get_steam_fallback_data(self):
|
||
"""Return demo data when Steam is unreachable."""
|
||
return [
|
||
{
|
||
'id': 'demo1',
|
||
'title': '[示例] 游戏新手攻略讨论',
|
||
'author': '玩家A',
|
||
'url': 'https://steamcommunity.com/app/3774440/discussions/',
|
||
'replies': 15,
|
||
'timestamp': int(time.time() * 1000)
|
||
},
|
||
{
|
||
'id': 'demo2',
|
||
'title': '[示例] 联机对战心得分享',
|
||
'author': '玩家B',
|
||
'url': 'https://steamcommunity.com/app/3774440/discussions/',
|
||
'replies': 8,
|
||
'timestamp': int(time.time() * 1000)
|
||
},
|
||
{
|
||
'id': 'demo3',
|
||
'title': '[示例] BUG反馈:闪退问题',
|
||
'author': '玩家C',
|
||
'url': 'https://steamcommunity.com/app/3774440/discussions/',
|
||
'replies': 3,
|
||
'timestamp': int(time.time() * 1000)
|
||
}
|
||
]
|
||
|
||
def _parse_steam_discussions(self, html):
|
||
"""Parse Steam Community discussion list from HTML."""
|
||
discussions = []
|
||
import re
|
||
|
||
# Steam discussion page uses specific HTML structure
|
||
# Look for topic entries with class "forum_topic"
|
||
|
||
# Pattern 1: Modern Steam discussion format
|
||
# Find all rows with class containing "forum_topic"
|
||
topic_blocks = re.findall(
|
||
r'<div[^>]*class="forum_topic[^"]*"[^>]*>(.*?)</div>\s*</div>(?:\s*</div>)?',
|
||
html, re.DOTALL
|
||
)
|
||
|
||
for block in topic_blocks:
|
||
# Extract topic ID
|
||
topic_id_match = re.search(r'data-topic="([^"]+)"', block)
|
||
topic_id = topic_id_match.group(1) if topic_id_match else ''
|
||
|
||
# Extract title
|
||
title_match = re.search(r'class="forum_topic_name[^"]*"[^>]*>([^<]+)', block)
|
||
title = title_match.group(1).strip() if title_match else ''
|
||
|
||
# Extract author
|
||
author_match = re.search(r'class="[^"]*author[^"]*"[^>]*>([^<]+)', block)
|
||
author = author_match.group(1).strip() if author_match else ''
|
||
|
||
# Extract replies count
|
||
replies_match = re.search(r'class="[^"]*replies[^"]*"[^>]*>(\d+)', block)
|
||
replies = int(replies_match.group(1)) if replies_match else 0
|
||
|
||
if title:
|
||
discussions.append({
|
||
'id': topic_id,
|
||
'title': title,
|
||
'author': author,
|
||
'url': f'https://steamcommunity.com/app/3774440/discussions/?t={topic_id}',
|
||
'replies': replies
|
||
})
|
||
|
||
# Pattern 2: Alternative format - look for discussion links
|
||
if not discussions:
|
||
# Find all discussion links
|
||
links = re.findall(
|
||
r'<a[^>]*href="(/app/3774440/discussions/\d+/[^"]+)"[^>]*>(.*?)</a>',
|
||
html, re.DOTALL
|
||
)
|
||
|
||
for url_path, content in links:
|
||
# Extract title from the content
|
||
title_match = re.search(r'>([^<]+)<', content)
|
||
if title_match:
|
||
title = title_match.group(1).strip()
|
||
if title and len(title) > 3: # Filter out short/empty titles
|
||
discussions.append({
|
||
'id': url_path.split('/')[-2] if '/' in url_path else 'unknown',
|
||
'title': title,
|
||
'author': '',
|
||
'url': f'https://steamcommunity.com{url_path}',
|
||
'replies': 0
|
||
})
|
||
|
||
# Pattern 3: Last resort - look for any link containing discussion
|
||
if not discussions:
|
||
all_links = re.findall(
|
||
r'href="(/app/3774440/discussions[^"]+)"[^>]*>([^<]+)</a>',
|
||
html
|
||
)
|
||
seen = set()
|
||
for url_path, title in all_links:
|
||
title = title.strip()
|
||
if title and title not in seen and len(title) > 5:
|
||
seen.add(title)
|
||
discussions.append({
|
||
'id': url_path.split('/')[-1] or url_path,
|
||
'title': title,
|
||
'author': '',
|
||
'url': f'https://steamcommunity.com{url_path}',
|
||
'replies': 0
|
||
})
|
||
|
||
return discussions
|
||
|
||
def _handle_sns_bulk_create(self, platform):
|
||
"""Bulk create comments from selected discussions."""
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
data = json.loads(raw.decode('utf-8'))
|
||
|
||
discussions = data.get('discussions', [])
|
||
if not discussions:
|
||
self._send_json({'success': True, 'created': 0})
|
||
return
|
||
|
||
# Load existing data
|
||
sns_data = self._load_sns_data(platform)
|
||
comments = sns_data.get('comments', [])
|
||
next_id = sns_data.get('nextId', 1)
|
||
|
||
created_count = 0
|
||
for d in discussions:
|
||
# Check if already exists
|
||
exists = any(c.get('title') == d.get('title') for c in comments)
|
||
if not exists:
|
||
comment = {
|
||
'id': next_id,
|
||
'title': d.get('title', 'Untitled'),
|
||
'status': 'pending',
|
||
'author': d.get('author', ''),
|
||
'url': d.get('url', ''),
|
||
'source': 'steam',
|
||
'createdAt': int(time.time() * 1000),
|
||
'updatedAt': int(time.time() * 1000)
|
||
}
|
||
comments.append(comment)
|
||
next_id += 1
|
||
created_count += 1
|
||
|
||
sns_data['comments'] = comments
|
||
sns_data['nextId'] = next_id
|
||
self._save_sns_data(platform, sns_data)
|
||
|
||
self._send_json({'success': True, 'created': created_count})
|
||
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
# SNS API handlers end
|
||
|
||
# ── Quick replies (常用回复) ──
|
||
|
||
def _load_quick_replies(self):
|
||
"""Load quick replies data."""
|
||
if os.path.exists(QUICK_REPLIES_FILE):
|
||
try:
|
||
with open(QUICK_REPLIES_FILE, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, dict):
|
||
data = {}
|
||
data.setdefault('nextId', 1)
|
||
data.setdefault('replies', [])
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return {'nextId': 1, 'replies': []}
|
||
|
||
def _save_quick_replies(self, data):
|
||
"""Persist quick replies data."""
|
||
os.makedirs(os.path.dirname(QUICK_REPLIES_FILE), exist_ok=True)
|
||
with open(QUICK_REPLIES_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
def _handle_qr_get(self):
|
||
"""GET /api/quick-replies"""
|
||
self._send_json(self._load_quick_replies())
|
||
|
||
def _handle_qr_post(self):
|
||
"""POST /api/quick-replies/{create|update|delete}"""
|
||
path = self.path
|
||
if path.endswith('/create'):
|
||
self._handle_qr_create()
|
||
elif path.endswith('/update'):
|
||
self._handle_qr_update()
|
||
elif path.endswith('/delete'):
|
||
self._handle_qr_delete()
|
||
else:
|
||
self._send_json({'error': 'Unknown action'}, 404)
|
||
|
||
def _handle_qr_create(self):
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
payload = json.loads(raw.decode('utf-8'))
|
||
|
||
title = (payload.get('title') or '').strip()
|
||
content = payload.get('content') or ''
|
||
if not title:
|
||
self._send_json({'success': False, 'error': '标题不能为空'}, 400)
|
||
return
|
||
if not content.strip():
|
||
self._send_json({'success': False, 'error': '内容不能为空'}, 400)
|
||
return
|
||
|
||
data = self._load_quick_replies()
|
||
now = int(time.time() * 1000)
|
||
reply = {
|
||
'id': data.get('nextId', 1),
|
||
'title': title,
|
||
'content': content,
|
||
'createdAt': now,
|
||
'updatedAt': now
|
||
}
|
||
data['nextId'] = reply['id'] + 1
|
||
data['replies'].append(reply)
|
||
self._save_quick_replies(data)
|
||
self._send_json({'success': True, 'reply': reply})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def _handle_qr_update(self):
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
payload = json.loads(raw.decode('utf-8'))
|
||
|
||
reply_id = payload.get('id')
|
||
if reply_id is None:
|
||
self._send_json({'success': False, 'error': 'id required'}, 400)
|
||
return
|
||
|
||
data = self._load_quick_replies()
|
||
target = next((r for r in data['replies'] if r.get('id') == reply_id), None)
|
||
if not target:
|
||
self._send_json({'success': False, 'error': 'not found'}, 404)
|
||
return
|
||
|
||
if 'title' in payload:
|
||
title = (payload.get('title') or '').strip()
|
||
if not title:
|
||
self._send_json({'success': False, 'error': '标题不能为空'}, 400)
|
||
return
|
||
target['title'] = title
|
||
if 'content' in payload:
|
||
target['content'] = payload.get('content') or ''
|
||
target['updatedAt'] = int(time.time() * 1000)
|
||
|
||
self._save_quick_replies(data)
|
||
self._send_json({'success': True, 'reply': target})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def _handle_qr_delete(self):
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
raw = self.rfile.read(length)
|
||
payload = json.loads(raw.decode('utf-8'))
|
||
reply_id = payload.get('id')
|
||
if reply_id is None:
|
||
self._send_json({'success': False, 'error': 'id required'}, 400)
|
||
return
|
||
|
||
data = self._load_quick_replies()
|
||
original_len = len(data['replies'])
|
||
data['replies'] = [r for r in data['replies'] if r.get('id') != reply_id]
|
||
if len(data['replies']) == original_len:
|
||
self._send_json({'success': False, 'error': 'not found'}, 404)
|
||
return
|
||
|
||
self._save_quick_replies(data)
|
||
self._send_json({'success': True})
|
||
except Exception as e:
|
||
self._send_json({'success': False, 'error': str(e)}, 500)
|
||
|
||
def log_message(self, format, *args):
|
||
"""Quieter logging."""
|
||
msg = format % args
|
||
if '/api/' in msg or 'POST' in msg:
|
||
sys.stderr.write(f'[API] {msg}\n')
|
||
|
||
|
||
def kill_stale_servers(port):
|
||
"""Kill any existing processes listening on the target port."""
|
||
if sys.platform == 'win32':
|
||
try:
|
||
# netstat to find PIDs on the port
|
||
result = subprocess.run(
|
||
['netstat', '-ano'],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
my_pid = os.getpid()
|
||
killed = set()
|
||
for line in result.stdout.splitlines():
|
||
# Match lines like: TCP 0.0.0.0:8080 ... LISTENING 12345
|
||
parts = line.split()
|
||
if len(parts) >= 5 and f':{port}' in parts[1] and 'LISTENING' in parts[3]:
|
||
pid = int(parts[4])
|
||
if pid != my_pid and pid not in killed:
|
||
try:
|
||
subprocess.run(
|
||
['taskkill', '/PID', str(pid), '/F'],
|
||
capture_output=True, timeout=5
|
||
)
|
||
killed.add(pid)
|
||
print(f' Killed stale process PID {pid}')
|
||
except Exception:
|
||
pass
|
||
if killed:
|
||
time.sleep(0.3) # brief wait for port release
|
||
print(f' Cleaned up {len(killed)} old process(es)')
|
||
except Exception as e:
|
||
print(f' Warning: cleanup failed: {e}')
|
||
else:
|
||
# Unix: use lsof
|
||
try:
|
||
result = subprocess.run(
|
||
['lsof', '-ti', f':{port}'],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
my_pid = os.getpid()
|
||
for pid_str in result.stdout.strip().split('\n'):
|
||
if pid_str.strip():
|
||
pid = int(pid_str.strip())
|
||
if pid != my_pid:
|
||
os.kill(pid, 9)
|
||
print(f' Killed stale process PID {pid}')
|
||
time.sleep(0.3)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
os.chdir(SCRIPT_DIR)
|
||
|
||
print(f'Checking port {PORT} for stale processes...')
|
||
kill_stale_servers(PORT)
|
||
|
||
with http.server.ThreadingHTTPServer(('', PORT), DashboardHandler) as httpd:
|
||
url = f'http://localhost:{PORT}'
|
||
print(f'TH1 Dashboard serving at {url}')
|
||
print('Press Ctrl+C to stop.')
|
||
webbrowser.open(url)
|
||
try:
|
||
httpd.serve_forever()
|
||
except KeyboardInterrupt:
|
||
print('\nStopped.')
|