chore: add Discord chat export tool

This commit is contained in:
daixiawu 2026-07-02 14:45:24 +08:00
parent 9ff87438fe
commit f2af485dbb
4 changed files with 421 additions and 0 deletions

3
.gitignore vendored
View File

@ -128,6 +128,9 @@ __pycache__/
# Local OSS report viewer credentials and download caches.
/Tools/PlayerBugViewer/config.local.json
# Local Discord chat exports.
/output/discord-export/
/Tools/PlayerBugViewer/Data/
/Tools/PlayerMultilingualReportViewer/config.local.json
/Tools/PlayerMultilingualReportViewer/Data/

View File

@ -0,0 +1,53 @@
# Discord Chat Export
Local exporter for Discord channels using an official bot token and Discord's documented REST API.
## Bot Setup
1. Open the Discord Developer Portal and create an application.
2. Add a bot to the application.
3. Enable the bot's privileged **Message Content Intent** if you need message text, not just metadata.
4. Invite the bot to your server with at least:
- `View Channels`
- `Read Message History`
5. Make sure channel-specific permission overwrites allow the bot to read the target channels.
Do not use a user token or self-bot. Store the bot token only in your local environment.
## Usage
PowerShell:
```powershell
$env:DISCORD_BOT_TOKEN = "YOUR_BOT_TOKEN"
python Tools/DiscordExport/discord_export.py --channel 123456789012345678 --after 2026-07-01 --before 2026-07-02 --format jsonl --format csv
Remove-Item Env:\DISCORD_BOT_TOKEN
```
Multiple channels:
```powershell
$env:DISCORD_BOT_TOKEN = "YOUR_BOT_TOKEN"
python Tools/DiscordExport/discord_export.py --channels-file Tools/DiscordExport/channels.example.txt --out output/discord-export
Remove-Item Env:\DISCORD_BOT_TOKEN
```
Date arguments are interpreted as UTC. A date without a time means the full UTC day for `--before`, and midnight UTC for `--after`.
## Outputs
Supported formats:
- `jsonl`: one normalized message per line, with the raw Discord message embedded.
- `json`: the same normalized messages as a JSON array.
- `csv`: spreadsheet-friendly summary with message text and attachment URLs.
- `md`: human-readable Markdown transcript.
The default output directory is `output/discord-export`.
## Notes
- Discord returns messages newest-first; the exporter writes them oldest-first.
- The exporter follows `429` rate-limit responses and also waits briefly between pages.
- If `content` is empty, check the bot's Message Content Intent and channel permissions.
- This tool exports messages the bot can legitimately access; deleted messages are not available through normal history export.

View File

@ -0,0 +1,3 @@
# One Discord channel id per line. Comments and blank lines are ignored.
# Enable Developer Mode in Discord, then right-click a channel and Copy Channel ID.
123456789012345678

View File

@ -0,0 +1,362 @@
#!/usr/bin/env python3
"""
Export Discord channel history with an official bot token.
This tool uses Discord's documented REST API. It does not use user tokens,
self-bot flows, browser scraping, or private endpoints.
"""
from __future__ import annotations
import argparse
import csv
import datetime as dt
import json
import os
import pathlib
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
DISCORD_EPOCH_MS = 1420070400000
API_BASE = "https://discord.com/api/v10"
class DiscordApiError(RuntimeError):
pass
def parse_utc_datetime(value: str | None, *, end_of_day: bool = False) -> dt.datetime | None:
if not value:
return None
text = value.strip()
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", text):
suffix = "T23:59:59.999999+00:00" if end_of_day else "T00:00:00+00:00"
text = text + suffix
elif text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError as exc:
raise ValueError(f"Invalid datetime: {value}") from exc
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def snowflake_from_datetime(value: dt.datetime) -> int:
unix_ms = int(value.timestamp() * 1000)
return (unix_ms - DISCORD_EPOCH_MS) << 22
def datetime_from_discord_timestamp(value: str) -> dt.datetime:
return dt.datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(dt.timezone.utc)
def safe_filename(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", value).strip("_") or "discord_export"
def request_json(path: str, token: str, params: dict[str, Any] | None = None) -> Any:
if params:
query = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None})
url = f"{API_BASE}{path}?{query}"
else:
url = f"{API_BASE}{path}"
headers = {
"Authorization": f"Bot {token}",
"User-Agent": "TH1 Discord Exporter (official bot REST API)",
}
while True:
request = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(request, timeout=60) as response:
payload = response.read().decode("utf-8")
return json.loads(payload) if payload else None
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
retry_after = parse_retry_after(exc, body)
if exc.code == 429 and retry_after is not None:
time.sleep(retry_after + 0.25)
continue
detail = body
try:
parsed = json.loads(body)
detail = parsed.get("message", body)
except json.JSONDecodeError:
pass
raise DiscordApiError(f"Discord API error {exc.code} for {path}: {detail}") from exc
def parse_retry_after(exc: urllib.error.HTTPError, body: str) -> float | None:
header_value = exc.headers.get("Retry-After")
if header_value:
try:
return float(header_value)
except ValueError:
pass
try:
payload = json.loads(body)
except json.JSONDecodeError:
return None
value = payload.get("retry_after")
return float(value) if isinstance(value, int | float) else None
def normalize_message(message: dict[str, Any], channel_id: str) -> dict[str, Any]:
author = message.get("author") or {}
member = message.get("member") or {}
return {
"id": message.get("id"),
"channel_id": channel_id,
"timestamp": message.get("timestamp"),
"edited_timestamp": message.get("edited_timestamp"),
"type": message.get("type"),
"pinned": message.get("pinned", False),
"author_id": author.get("id"),
"author_username": author.get("username"),
"author_global_name": author.get("global_name"),
"author_discriminator": author.get("discriminator"),
"member_nick": member.get("nick"),
"content": message.get("content", ""),
"attachments": [
{
"id": attachment.get("id"),
"filename": attachment.get("filename"),
"url": attachment.get("url"),
"proxy_url": attachment.get("proxy_url"),
"content_type": attachment.get("content_type"),
"size": attachment.get("size"),
}
for attachment in message.get("attachments", [])
],
"embeds": message.get("embeds", []),
"reactions": message.get("reactions", []),
"mentions": [
{
"id": mention.get("id"),
"username": mention.get("username"),
"global_name": mention.get("global_name"),
}
for mention in message.get("mentions", [])
],
"raw": message,
}
def iter_channel_messages(
channel_id: str,
token: str,
after: dt.datetime | None,
before: dt.datetime | None,
sleep_seconds: float,
) -> list[dict[str, Any]]:
lower_bound = after
before_snowflake = snowflake_from_datetime(before) if before else None
messages: list[dict[str, Any]] = []
while True:
params: dict[str, Any] = {"limit": 100, "before": before_snowflake}
batch = request_json(f"/channels/{channel_id}/messages", token, params)
if not isinstance(batch, list):
raise DiscordApiError(f"Unexpected response for channel {channel_id}: {batch!r}")
if not batch:
break
stop = False
for message in batch:
timestamp = datetime_from_discord_timestamp(message["timestamp"])
if lower_bound and timestamp < lower_bound:
stop = True
continue
messages.append(normalize_message(message, channel_id))
oldest_id = int(batch[-1]["id"])
before_snowflake = oldest_id
print(f"channel={channel_id} fetched={len(messages)} oldest={batch[-1]['timestamp']}", flush=True)
if stop or len(batch) < 100:
break
if sleep_seconds > 0:
time.sleep(sleep_seconds)
messages.sort(key=lambda item: item["id"])
return messages
def load_channels(args: argparse.Namespace) -> list[str]:
channels: list[str] = []
channels.extend(args.channel or [])
if args.channels_file:
path = pathlib.Path(args.channels_file)
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
channels.append(line.split()[0])
seen: set[str] = set()
unique: list[str] = []
for channel_id in channels:
if not re.fullmatch(r"\d{17,22}", channel_id):
raise ValueError(f"Invalid Discord channel id: {channel_id}")
if channel_id not in seen:
unique.append(channel_id)
seen.add(channel_id)
return unique
def write_json(path: pathlib.Path, messages: list[dict[str, Any]]) -> None:
path.write_text(json.dumps(messages, ensure_ascii=False, indent=2), encoding="utf-8")
def write_jsonl(path: pathlib.Path, messages: list[dict[str, Any]]) -> None:
with path.open("w", encoding="utf-8", newline="\n") as handle:
for message in messages:
handle.write(json.dumps(message, ensure_ascii=False) + "\n")
def write_csv(path: pathlib.Path, messages: list[dict[str, Any]]) -> None:
fields = [
"id",
"channel_id",
"timestamp",
"edited_timestamp",
"author_id",
"author_username",
"author_global_name",
"member_nick",
"content",
"attachment_urls",
"reaction_count",
]
with path.open("w", encoding="utf-8-sig", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fields)
writer.writeheader()
for message in messages:
writer.writerow(
{
"id": message["id"],
"channel_id": message["channel_id"],
"timestamp": message["timestamp"],
"edited_timestamp": message["edited_timestamp"],
"author_id": message["author_id"],
"author_username": message["author_username"],
"author_global_name": message["author_global_name"],
"member_nick": message["member_nick"],
"content": message["content"],
"attachment_urls": " ".join(
attachment.get("url") or "" for attachment in message["attachments"]
).strip(),
"reaction_count": sum(reaction.get("count", 0) for reaction in message["reactions"]),
}
)
def write_markdown(path: pathlib.Path, messages: list[dict[str, Any]]) -> None:
lines = ["# Discord Export", ""]
for message in messages:
display_name = (
message.get("member_nick")
or message.get("author_global_name")
or message.get("author_username")
or message.get("author_id")
or "Unknown"
)
lines.append(f"## {message['timestamp']} - {display_name}")
lines.append("")
content = message.get("content") or ""
lines.append(content if content else "_No message content returned._")
if message["attachments"]:
lines.append("")
lines.append("Attachments:")
for attachment in message["attachments"]:
filename = attachment.get("filename") or attachment.get("id")
url = attachment.get("url") or ""
lines.append(f"- [{filename}]({url})")
lines.append("")
path.write_text("\n".join(lines), encoding="utf-8")
def write_outputs(
output_dir: pathlib.Path,
channel_id: str,
messages: list[dict[str, Any]],
formats: list[str],
) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
basename = safe_filename(f"discord_{channel_id}_{dt.datetime.now(dt.UTC).strftime('%Y%m%d_%H%M%S')}")
writers = {
"json": write_json,
"jsonl": write_jsonl,
"csv": write_csv,
"md": write_markdown,
}
for fmt in formats:
path = output_dir / f"{basename}.{fmt}"
writers[fmt](path, messages)
print(f"wrote {path}", flush=True)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Export Discord channel history with a bot token.")
parser.add_argument("--channel", action="append", help="Discord channel id. May be repeated.")
parser.add_argument("--channels-file", help="Text file with one channel id per line.")
parser.add_argument("--after", help="UTC start time, e.g. 2026-07-01 or 2026-07-01T12:00:00Z.")
parser.add_argument("--before", help="UTC end time, e.g. 2026-07-02 or 2026-07-02T12:00:00Z.")
parser.add_argument("--format", action="append", choices=["json", "jsonl", "csv", "md"], help="Output format. May be repeated. Defaults to jsonl and csv.")
parser.add_argument("--out", default="output/discord-export", help="Output directory.")
parser.add_argument("--token-env", default="DISCORD_BOT_TOKEN", help="Environment variable holding the bot token.")
parser.add_argument("--sleep", type=float, default=0.35, help="Delay between successful page requests.")
return parser.parse_args()
def main() -> int:
args = parse_args()
token = os.environ.get(args.token_env)
if not token:
print(f"Missing bot token. Set ${args.token_env} first.", file=sys.stderr)
return 2
try:
channels = load_channels(args)
after = parse_utc_datetime(args.after)
before = parse_utc_datetime(args.before, end_of_day=True)
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 2
if not channels:
print("No channels specified. Use --channel or --channels-file.", file=sys.stderr)
return 2
if after and before and after > before:
print("--after must be earlier than --before.", file=sys.stderr)
return 2
formats = args.format or ["jsonl", "csv"]
output_dir = pathlib.Path(args.out)
for channel_id in channels:
messages = iter_channel_messages(channel_id, token, after, before, args.sleep)
write_outputs(output_dir, channel_id, messages, formats)
print(f"channel={channel_id} total={len(messages)}", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())