TH1/Tools/CrashSight/crashsight_api.py
2026-06-02 17:57:13 +08:00

389 lines
13 KiB
Python

#!/usr/bin/env python3
"""CrashSight OpenAPI helper for TH1 crash/error triage.
Credentials are read from Temp/CrashSight/crashsight_api_credentials.json by
default. Keep that file out of source control.
"""
from __future__ import annotations
import argparse
import base64
import datetime as dt
import hashlib
import hmac
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_CREDENTIALS = ROOT / "Temp" / "CrashSight" / "crashsight_api_credentials.json"
DEFAULT_BASE_URL = "https://crashsight.qq.com/uniform/openapi"
DEFAULT_APP_ID = "01076c49ce"
DEFAULT_PLATFORM_ID = 10
DEFAULT_PID = "10"
def load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="\n") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
f.write("\n")
def load_credentials(path: Path) -> dict[str, Any]:
if not path.exists():
raise SystemExit(f"Credentials file not found: {path}")
creds = load_json(path)
missing = [k for k in ("localUserId", "user_key") if not creds.get(k)]
if missing:
raise SystemExit(f"Credentials file missing keys: {', '.join(missing)}")
return creds
def sign(local_user_id: str, user_key: str, timestamp_seconds: int) -> str:
message = f"{local_user_id}_{timestamp_seconds}".encode("utf-8")
digest_hex = hmac.new(user_key.encode("utf-8"), message, hashlib.sha256).hexdigest()
return base64.b64encode(digest_hex.encode("utf-8")).decode("ascii")
def auth_query(creds: dict[str, Any]) -> str:
timestamp_seconds = int(time.time())
local_user_id = str(creds["localUserId"])
user_key = str(creds["user_key"])
query = {
"localUserId": local_user_id,
"t": str(timestamp_seconds),
"userSecret": sign(local_user_id, user_key, timestamp_seconds),
}
return urllib.parse.urlencode(query)
def request_api(
path: str,
body: dict[str, Any],
*,
creds: dict[str, Any],
base_url: str = DEFAULT_BASE_URL,
method: str = "POST",
timeout: int = 60,
) -> dict[str, Any]:
if not path.startswith("/"):
path = "/" + path
auth = auth_query(creds)
method = method.upper()
data = None
if method == "GET":
query = urllib.parse.urlencode({k: v for k, v in body.items() if v is not None})
sep = "&" if "?" in path else "?"
path = f"{path}{sep}{query}" if query else path
url = f"{base_url.rstrip('/')}{path}&{auth}" if "?" in path else f"{base_url.rstrip('/')}{path}?{auth}"
else:
url = f"{base_url.rstrip('/')}{path}?{auth}"
data = json.dumps(body, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={
"Content-Type": "application/json;charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"User-Agent": "TH1-CrashSight-OpenAPI/1.0",
},
method=method,
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
status = resp.status
content_type = resp.headers.get("content-type", "")
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} for {url}: {raw[:1000]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"Request failed for {url}: {exc}") from exc
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"Non-JSON response from {url} status={status} content-type={content_type}: {raw[:1000]}"
) from exc
return parsed
def normalize_response(resp: dict[str, Any]) -> Any:
if isinstance(resp, dict):
if "data" in resp:
return resp["data"]
if "ret" in resp and "msg" in resp:
return resp
return resp
def common_body(args: argparse.Namespace) -> dict[str, Any]:
body: dict[str, Any] = {
"appId": args.app_id,
"platformId": args.platform_id,
"pid": str(args.pid),
}
if getattr(args, "version", None):
body["version"] = args.version
if getattr(args, "date", None):
body["date"] = args.date
return body
def cmd_init(args: argparse.Namespace) -> int:
data = {
"localUserId": str(args.local_user_id),
"user_key": args.user_key,
"base_url": args.base_url,
"app_id": args.app_id,
"platform_id": args.platform_id,
"created_at": dt.datetime.now(dt.timezone.utc).isoformat(),
}
write_json(args.credentials, data)
print(f"Wrote credentials: {args.credentials}")
return 0
def cmd_call(args: argparse.Namespace) -> int:
creds = load_credentials(args.credentials)
body = json.loads(args.body) if args.body else {}
resp = request_api(args.path, body, creds=creds, base_url=creds.get("base_url", args.base_url))
print(json.dumps(resp, ensure_ascii=False, indent=2))
return 0
def cmd_issue_list(args: argparse.Namespace) -> int:
creds = load_credentials(args.credentials)
body = common_body(args)
body.update(
{
"start": args.start,
"rows": args.rows,
"sortField": args.sort_field,
"sortOrder": args.sort_order,
}
)
if args.status:
body["status"] = args.status
if args.exception_type:
body["exceptionTypeList"] = args.exception_type
if args.extra:
body.update(json.loads(args.extra))
resp = request_api(
"/queryIssueList",
body,
creds=creds,
base_url=creds.get("base_url", args.base_url),
timeout=args.timeout,
)
if args.output:
write_json(args.output, resp)
print(json.dumps(resp, ensure_ascii=False, indent=2))
return 0
def cmd_issue_info(args: argparse.Namespace) -> int:
creds = load_credentials(args.credentials)
body = common_body(args)
body["issueId"] = args.issue_id
if args.extra:
body.update(json.loads(args.extra))
resp = request_api(
"/issueInfo",
body,
creds=creds,
base_url=creds.get("base_url", args.base_url),
method="GET",
timeout=args.timeout,
)
if args.output:
write_json(args.output, resp)
print(json.dumps(resp, ensure_ascii=False, indent=2))
return 0
def cmd_crash_list(args: argparse.Namespace) -> int:
creds = load_credentials(args.credentials)
body = common_body(args)
body.update({"issueId": args.issue_id, "start": args.start, "rows": args.rows})
if args.extra:
body.update(json.loads(args.extra))
resp = request_api(
"/crashList/crashDataType/undefined",
body,
creds=creds,
base_url=creds.get("base_url", args.base_url),
method="GET",
timeout=args.timeout,
)
if args.output:
write_json(args.output, resp)
print(json.dumps(resp, ensure_ascii=False, indent=2))
return 0
def cmd_crash_info(args: argparse.Namespace) -> int:
creds = load_credentials(args.credentials)
body = common_body(args)
body["crashId"] = args.crash_id
if args.issue_id:
body["issueId"] = args.issue_id
if args.extra:
body.update(json.loads(args.extra))
resp = request_api(
"/crashDoc/appId/{}/platformId/{}/crashHash/{}".format(
urllib.parse.quote(str(args.app_id), safe=""),
urllib.parse.quote(str(args.platform_id), safe=""),
urllib.parse.quote(str(args.crash_id), safe=""),
),
body,
creds=creds,
base_url=creds.get("base_url", args.base_url),
method="GET",
timeout=args.timeout,
)
if args.output:
write_json(args.output, resp)
print(json.dumps(resp, ensure_ascii=False, indent=2))
return 0
def cmd_query_access(args: argparse.Namespace) -> int:
creds = load_credentials(args.credentials)
if not args.device_id and not args.user_id:
raise SystemExit("Provide --device-id or --user-id.")
body: dict[str, Any] = {
"appId": args.app_id,
"platformId": args.platform_id,
"pid": str(args.pid),
"skipDistinctQuery": True,
"pageNumber": args.page_number,
"pageSize": args.page_size,
"exceptionCategoryList": args.exception_category,
}
if args.upload_time_begin_millis is not None:
body["uploadTimeBeginMillis"] = args.upload_time_begin_millis
else:
body["uploadTimeBeginMillis"] = int((time.time() - args.since_days * 86400) * 1000)
if args.device_id:
body["deviceIdList"] = [args.device_id]
if args.user_id:
body["userIdList"] = [args.user_id]
if args.extra:
body.update(json.loads(args.extra))
resp = request_api(
"/redir/api/queryAccess/queryAccessList",
body,
creds=creds,
base_url="https://crashsight.qq.com",
method="POST",
timeout=args.timeout,
)
if args.output:
write_json(args.output, resp)
print(json.dumps(resp, ensure_ascii=False, indent=2))
return 0
def add_common(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--credentials", type=Path, default=DEFAULT_CREDENTIALS)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--app-id", default=DEFAULT_APP_ID)
parser.add_argument("--platform-id", type=int, default=DEFAULT_PLATFORM_ID)
parser.add_argument("--pid", default=DEFAULT_PID)
def add_query_common(parser: argparse.ArgumentParser) -> None:
add_common(parser)
parser.add_argument("--version")
parser.add_argument("--date", default="last_1_day")
parser.add_argument("--extra", help="JSON object merged into the request body")
parser.add_argument("--output", type=Path)
parser.add_argument("--timeout", type=int, default=60)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
sub = parser.add_subparsers(required=True)
p = sub.add_parser("init", help="Write local credentials file")
add_common(p)
p.add_argument("--local-user-id", required=True)
p.add_argument("--user-key", required=True)
p.set_defaults(func=cmd_init)
p = sub.add_parser("call", help="Raw API call for endpoint exploration")
add_common(p)
p.add_argument("path")
p.add_argument("--body", default="{}")
p.set_defaults(func=cmd_call)
p = sub.add_parser("issue-list", help="List CrashSight issues")
add_query_common(p)
p.add_argument("--start", type=int, default=0)
p.add_argument("--rows", type=int, default=100)
p.add_argument("--status", default="0,2")
p.add_argument("--exception-type")
p.add_argument("--sort-field", default="uploadTime")
p.add_argument("--sort-order", default="desc")
p.set_defaults(func=cmd_issue_list)
p = sub.add_parser("issue-info", help="Fetch issue details")
add_query_common(p)
p.add_argument("issue_id")
p.set_defaults(func=cmd_issue_info)
p = sub.add_parser("crash-list", help="List reports/samples under an issue")
add_query_common(p)
p.add_argument("issue_id")
p.add_argument("--start", type=int, default=0)
p.add_argument("--rows", type=int, default=20)
p.set_defaults(func=cmd_crash_list)
p = sub.add_parser("crash-info", help="Fetch one report/sample detail")
add_query_common(p)
p.add_argument("crash_id")
p.add_argument("--issue-id")
p.set_defaults(func=cmd_crash_info)
p = sub.add_parser("query-access", help="List recent reports for a device/user")
add_common(p)
p.add_argument("--device-id")
p.add_argument("--user-id")
p.add_argument("--since-days", type=int, default=3)
p.add_argument("--upload-time-begin-millis", type=int)
p.add_argument("--page-number", type=int, default=1)
p.add_argument("--page-size", type=int, default=100)
p.add_argument("--exception-category", action="append", default=[])
p.add_argument("--extra", help="JSON object merged into the request body")
p.add_argument("--output", type=Path)
p.add_argument("--timeout", type=int, default=60)
p.set_defaults(func=cmd_query_access)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())