TH1/.codex/skills/th1-ai-director/scripts/analyze_ai_batch_quality.py

1287 lines
54 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import math
import re
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from analyze_ai_director_log import (
action_key,
has_no_effect_delta,
is_tactical_repeat,
load_rows,
stable_key,
summarize,
)
HERO_ROLE_BY_GIANT = {
"EgyptianFlandre": "Assassin",
"GermanyMomiji": "Assassin",
"EgyptianRemilia": "Support",
"FrenchTewi": "Support",
"FrenchEirin": "Support",
"NorwayReimu": "Support",
"EgyptianPatchouli": "Caster",
"FrenchReisen": "Caster",
"GermanySanae": "Caster",
"IndianUtsuho": "Caster",
"IndianRin": "Caster",
"NorwaySumireko": "Caster",
"GermanyAya": "Mobility",
"IndianSatori": "Control",
"IndianKoishi": "Control",
"FrenchKaguya": "Control",
"GermanySuwako": "Vanguard",
"FrenchMokou": "Vanguard",
"IndianYuugi": "Vanguard",
"NorwayKasen": "Vanguard",
"NorwaySuika": "Summoner",
"GermanyKanako": "Economy",
"EgyptianSakuya": "Defender",
"EgyptianMeiling": "Defender",
"NorwayAunn": "Defender",
}
UNIT_ROLE_BY_TYPE = {
"Warrior": "Melee",
"Swordsman": "Melee",
"KaguyaFrenchAnimalWarrior": "Melee",
"KaguyaFrenchWarrior": "Melee",
"NoUseHakureiBerserkWarrior": "Melee",
"NoUseHakureiBerserker": "Melee",
"Rider": "Mobility",
"Knights": "Mobility",
"MoriyaRider": "Mobility",
"MoriyaKnight": "Mobility",
"KomeijiIndianRider": "Mobility",
"KomeijiIndianKnight": "Mobility",
"HakureiValkyrie": "Mobility",
"Archer": "Ranged",
"KomeijiIndianArcher": "Ranged",
"Catapult": "Siege",
"KaguyaFrenchCatapult": "Siege",
"KomeijiIndianCatapult": "Siege",
"Defender": "Defender",
"HakureiRoundShieldman": "Defender",
"Boat": "Naval",
"Ship": "Naval",
"RammerShip": "Naval",
"BomberShip": "Naval",
"Juggernaut": "Naval",
"GiantJuggernaut": "NavalHero",
"WolfJuggernaut": "Naval",
"KomeijiIndianShip": "Naval",
"KomeijiIndianBomberShip": "Naval",
"KomeijiIndianJuggernaut": "Naval",
"DaggerShip": "Naval",
"HakureiKarvi": "Naval",
"HakureiLongship": "Naval",
"HakureiDragonship": "Naval",
"Cloak": "Special",
"Minder": "Special",
"Dagger": "Special",
"BigGuy": "Special",
"KaguyaFrenchMokouEgg": "Special",
"KaguyaFrenchReisenIllusion": "Special",
"KaguyaFrenchWolf": "Special",
"RemiliaEgyptianKoakuma": "Special",
"RemiliaEgyptianKoakumaLion": "Special",
"MoriyaHebi": "Special",
"BonePile": "Special",
"KomeijiIndianBigGuy": "Special",
"SumirekoNorwayOrb": "Special",
"SumirekoDenmarkOrb": "Special",
"SumirekoEnglandOrb": "Special",
"KasenBeastGuideMarker": "Special",
"AunnTwin": "Special",
"SuikaMini": "Summon",
}
def repo_root() -> Path:
return Path(__file__).resolve().parents[4]
def find_latest_batch(root: Path) -> Path:
batch_dir = root / "Unity" / "Logs" / "AI_Batch"
summaries = sorted(batch_dir.glob("*/batch_summary.json"), key=lambda p: p.stat().st_mtime, reverse=True)
if not summaries:
raise FileNotFoundError(f"No batch_summary.json found under {batch_dir}")
return summaries[0]
def parse_time(value):
if not value:
return None
normalized = value.strip()
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
normalized = re.sub(r"(\.\d{6})\d+([+-]\d\d:\d\d)?$", r"\1\2", normalized)
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def seconds_between(start, end) -> float:
if start is None or end is None:
return 0.0
return max(0.0, (end - start).total_seconds())
def average(values) -> float:
values = [value for value in values if value is not None]
if not values:
return 0.0
return sum(values) / len(values)
def contains_any(value: str, tokens) -> bool:
if not value:
return False
lower = value.lower()
return any(token.lower() in lower for token in tokens if token)
def classify_style_bucket(reason: str, action: str) -> str:
text = f"{reason or ''} {action or ''}"
if contains_any(text, ("LowHp", "Recover", "Heal", "Eirin", "Sanae", "Patchouli", "Absorb", "Revive")):
return "Recovery"
if contains_any(text, ("Defense", "Defend", "Protect", "Guard", "MoveToCity", "Aunn", "Meiling", "Sakuya", "Reimu", "ShakeOff", "Unsit")):
return "Defense"
if contains_any(text, ("Kill", "Flandre", "Assassin", "AttackValue", "LocalBattle", "Boom", "Mokou", "Reisen", "Yuugi")):
return "Burst"
if contains_any(text, ("Ban", "Fear", "Control", "Satori", "Koishi", "Sumireko", "Orb", "Ground")):
return "Control"
if contains_any(text, ("Summon", "CreateMini", "Suwako", "Snake", "BonePile", "Mini")):
return "Summon"
if contains_any(text, ("Economy", "CityExp", "Corpse", "KanakoSit", "Rin", "Tewi", "Kaguya")):
return "Economy"
if contains_any(text, ("MoveAgain", "MoveToFront", "Retreat", "Mobility", "Aya", "Kasen")):
return "Mobility"
if contains_any(text, ("SelectHero", "SpawnHero", "FinishLowestTask")):
return "HeroLifecycle"
return "General"
def is_defensive_style(bucket: str) -> bool:
return bucket in ("Defense", "Recovery")
def is_hero_action(action: dict) -> bool:
if not action:
return False
actor_unit_type = action.get("actorUnitType") or ""
actor_giant_type = action.get("actorGiantType") or ""
giant_type = action.get("giantType") or ""
return (
actor_unit_type == "Giant"
or actor_giant_type not in ("", "None")
or giant_type not in ("", "None")
)
def non_empty(value: str) -> str:
if value in (None, "", "None", "NONE"):
return ""
return str(value)
def hero_identity(action: dict) -> str:
if not action:
return ""
return (
non_empty(action.get("actorGiantType"))
or non_empty(action.get("giantType"))
or non_empty(action.get("targetActorGiantType"))
)
def hero_role(hero: str) -> str:
return HERO_ROLE_BY_GIANT.get(hero, "Vanguard" if hero else "")
def actor_unit_type(action: dict) -> str:
if not action:
return ""
return non_empty(action.get("actorUnitType")) or non_empty(action.get("unitType"))
def unit_role(unit_type: str) -> str:
if not unit_type:
return ""
if unit_type == "Giant":
return "Hero"
return UNIT_ROLE_BY_TYPE.get(unit_type, "Other")
def delta_damage_to_target(delta: dict) -> int:
return max(0, -int(delta.get("targetUnitHealthDelta") or 0))
def delta_damage_taken(delta: dict) -> int:
return max(0, -int(delta.get("unitHealthDelta") or 0))
def delta_self_heal(delta: dict) -> int:
return max(0, int(delta.get("unitHealthDelta") or 0))
def delta_has_skill_change(delta: dict) -> bool:
return (
bool(delta.get("unitSkillSignatureChanged"))
or bool(delta.get("targetUnitSkillSignatureChanged"))
or int(delta.get("unitSkillDelta") or 0) != 0
or int(delta.get("targetUnitSkillDelta") or 0) != 0
)
def update_expression_stats(stats: Counter, action_key_value: str, delta: dict, execution: dict):
stats["executions"] += 1
stats["damage"] += delta_damage_to_target(delta)
stats["taken"] += delta_damage_taken(delta)
stats["healed"] += delta_self_heal(delta)
if delta.get("unitMoved"):
stats["moves"] += 1
if delta.get("targetUnitDied"):
stats["kills"] += 1
if delta.get("unitDied"):
stats["selfDeaths"] += 1
if delta.get("cityThreatResolved"):
stats["threatResolved"] += 1
if delta.get("cityThreatWorsened"):
stats["threatWorsened"] += 1
if delta.get("cityOwnerChanged") or delta.get("targetCityOwnerChanged"):
stats["cityOwnerChanged"] += 1
if delta_has_skill_change(delta):
stats["skillChanged"] += 1
if action_key_value.startswith("UnitAction:Recover"):
stats["recoverActions"] += 1
if action_key_value.startswith("UnitAction:Capture"):
stats["captures"] += 1
if has_no_effect_delta(execution):
stats["noEffect"] += 1
def compact_skill_signature(signature: str) -> str:
if not signature:
return ""
parts = signature.split("|")
if len(parts) <= 4:
return signature
return "|".join(parts[:4]) + "|..."
def expression_rows(stats_by_key, action_counters, top, role_lookup=None, style_counters=None):
rows = []
for key, stats in stats_by_key.items():
row = {"key": key}
if role_lookup is not None:
row["role"] = role_lookup(key)
for name in (
"decisions",
"executions",
"kills",
"selfDeaths",
"damage",
"taken",
"healed",
"moves",
"captures",
"recoverActions",
"skillChanged",
"threatResolved",
"threatWorsened",
"cityOwnerChanged",
"selected",
"spawned",
"tasks",
"noEffect",
):
value = stats.get(name, 0)
if value:
row[name] = value
row["top_actions"] = action_counters.get(key, Counter()).most_common(5)
if style_counters is not None:
row["top_styles"] = style_counters.get(key, Counter()).most_common(5)
rows.append(row)
rows.sort(
key=lambda item: (
item.get("executions", 0),
item.get("decisions", 0),
item.get("kills", 0),
item.get("damage", 0),
),
reverse=True,
)
return rows[:top]
def percentile(values, ratio: float) -> float:
values = sorted(value for value in values if value is not None)
if not values:
return 0.0
if len(values) == 1:
return float(values[0])
position = (len(values) - 1) * ratio
low = math.floor(position)
high = math.ceil(position)
if low == high:
return float(values[low])
weight = position - low
return float(values[low] * (1 - weight) + values[high] * weight)
def load_json(path: Path):
with path.open("r", encoding="utf-8-sig") as f:
return json.load(f)
def player_rows(batch):
for game in batch.get("results", []):
for player in game.get("players", []) or []:
yield game, player
def game_elapsed(game) -> float:
return seconds_between(parse_time(game.get("startedAt", "")), parse_time(game.get("endedAt", "")))
def local_session_time_from_name(path: Path):
stem = path.stem
prefix = "ai_director_"
if not stem.startswith(prefix):
return None
try:
return datetime.strptime(stem[len(prefix):], "%Y%m%d_%H%M%S")
except ValueError:
return None
def batch_local_window(batch_path: Path, batch):
output = (batch.get("options") or {}).get("OutputDirectory") or str(batch_path.parent)
start = None
try:
start = datetime.strptime(Path(output).name, "%Y%m%d_%H%M%S")
except ValueError:
pass
generated_utc = parse_time(batch.get("generatedAt", ""))
end = None
if generated_utc is not None:
if generated_utc.tzinfo is None:
generated_utc = generated_utc.replace(tzinfo=timezone.utc)
end = generated_utc.astimezone().replace(tzinfo=None)
if start is None and end is not None:
start = end - timedelta(hours=2)
if end is None and start is not None:
end = start + timedelta(hours=2)
if start is None or end is None:
return None, None
return start - timedelta(minutes=2), end + timedelta(minutes=2)
def find_matching_logs(root: Path, batch_path: Path, batch, explicit_logs):
if explicit_logs:
result = []
for value in explicit_logs:
path = Path(value)
if not path.is_absolute():
path = root / path
result.append(path)
return result
direct_logs = []
for game in batch.get("results", []) or []:
value = game.get("diagnosticsLogPath") or ""
if not value:
continue
path = Path(value)
if not path.is_absolute():
path = root / path
if path.exists():
direct_logs.append(path)
if direct_logs:
return direct_logs
start, end = batch_local_window(batch_path, batch)
log_dir = root / "Unity" / "Logs" / "AI_Director_Diagnostics"
logs = sorted(log_dir.glob("ai_director_*.jsonl"), key=lambda p: p.stat().st_mtime)
if start is None or end is None:
return logs[-1:] if logs else []
matched = []
for path in logs:
session_time = local_session_time_from_name(path)
if session_time is None:
continue
if start <= session_time <= end:
matched.append(path)
completed_games = len(batch.get("results", []) or [])
if completed_games > 0 and len(matched) > completed_games:
matched = matched[:completed_games]
return matched
def analyze_logs(log_paths):
aggregate = {
"logs": [],
"rows": 0,
"decisions": 0,
"executions": 0,
"no_effect": 0,
"repeated_stable": 0,
"max_actions_per_player_turn": 0,
"kills": 0,
"self_deaths": 0,
"city_owner_changes": 0,
"target_city_owner_changes": 0,
"action_counts": Counter(),
"lane_counts": Counter(),
"player_actions": Counter(),
"player_kills": Counter(),
"player_self_deaths": Counter(),
"decide_ms": [],
"top_no_effect": Counter(),
"top_repeated": Counter(),
"hero_reasons": Counter(),
"hero_actions": Counter(),
"hero_executed_actions": Counter(),
"hero_lane_counts": Counter(),
"fallback_reasons": Counter(),
"fallback_actions": Counter(),
"fallback_executed_actions": Counter(),
"fallback_no_effect_actions": Counter(),
"hero_delta": 0,
"selected_hero_delta": 0,
"hero_task_delta": 0,
"ready_hero_task_delta": 0,
"forced_hero_task_delta": 0,
"hero_task_progress_delta": 0,
"final_selected_hero_counts": [],
"final_spawned_hero_counts": [],
"final_max_hero_counts": [],
"critical_city_threat_turns": 0,
"capital_threat_turns": 0,
"empty_threatened_city_turns": 0,
"empty_threatened_city_total": 0,
"city_threat_resolved": 0,
"city_threat_worsened": 0,
"city_lost": 0,
"city_gained": 0,
"capital_ownership_changed": 0,
"emergency_decisions": 0,
"emergency_executions": 0,
"defender_return": 0,
"hero_defensive_use": 0,
"defense_reasons": Counter(),
"defense_actions": Counter(),
"hero_style_buckets": Counter(),
"hero_style_reasons": Counter(),
"hero_style_actions": Counter(),
"unit_skill_actions": Counter(),
"unit_skill_changed_actions": Counter(),
"actor_skill_signatures": Counter(),
"hero_personal": defaultdict(Counter),
"hero_personal_actions": defaultdict(Counter),
"hero_personal_styles": defaultdict(Counter),
"unit_role_stats": defaultdict(Counter),
"unit_role_actions": defaultdict(Counter),
"unit_type_stats": defaultdict(Counter),
"unit_type_actions": defaultdict(Counter),
"skill_signature_stats": defaultdict(Counter),
"skill_signature_actions": defaultdict(Counter),
}
for path in log_paths:
rows = load_rows(path)
summary = summarize(rows, top=50, last=0)
aggregate["logs"].append(str(path))
aggregate["rows"] += len(rows)
aggregate["decisions"] += summary["decision_count"]
aggregate["executions"] += summary["execution_count"]
no_effect_items = summary["no_effect_actions"]
aggregate["no_effect"] += sum(count for _, count in no_effect_items)
aggregate["repeated_stable"] += sum(count for _, count in summary["repeated_stable_keys"])
if summary["max_actions_per_player_turn"]:
aggregate["max_actions_per_player_turn"] = max(
aggregate["max_actions_per_player_turn"],
max(count for _, count in summary["max_actions_per_player_turn"]),
)
aggregate["action_counts"].update(dict(summary["execution_actions"]))
aggregate["top_no_effect"].update(dict(no_effect_items))
for key, count in summary["repeated_stable_keys"]:
aggregate["top_repeated"][str(key)] += count
stable_rows_by_turn = defaultdict(list)
turn_rows = defaultdict(list)
decision_lane_by_turn_action = {}
decision_reason_by_turn_action = {}
last_turn_summary_by_player = {}
last_probe_by_player = {}
for row in rows:
event_type = row.get("eventType", "")
if event_type == "Decision":
decision = row.get("decision") or {}
lane = decision.get("lane") or ""
reason = decision.get("reason") or ""
action = decision.get("action") or {}
if lane and lane != "None":
aggregate["lane_counts"][lane] += 1
decide_ms = decision.get("decideMs")
if isinstance(decide_ms, (int, float)):
aggregate["decide_ms"].append(float(decide_ms))
if decision.get("hasAction"):
turn_action_key = (
row.get("playerId", 0),
row.get("playerTurn", 0),
stable_key(action),
)
decision_lane_by_turn_action[turn_action_key] = lane
decision_reason_by_turn_action[turn_action_key] = reason
if lane == "Emergency":
aggregate["emergency_decisions"] += 1
aggregate["defense_reasons"][reason] += 1
aggregate["defense_actions"][action_key(action)] += 1
hero = hero_identity(action)
if hero:
hero_key = action_key(action)
hero_bucket = classify_style_bucket(reason, hero_key)
aggregate["hero_personal"][hero]["decisions"] += 1
aggregate["hero_personal_styles"][hero][hero_bucket] += 1
if lane in ("HeroManagement", "HeroPlaybook"):
aggregate["hero_lane_counts"][lane] += 1
aggregate["hero_reasons"][reason] += 1
key = action_key(action)
aggregate["hero_actions"][key] += 1
bucket = classify_style_bucket(reason, key)
aggregate["hero_style_buckets"][bucket] += 1
aggregate["hero_style_reasons"][f"{bucket}:{reason}"] += 1
aggregate["hero_style_actions"][f"{bucket}:{key}"] += 1
if decision.get("isFallback"):
aggregate["fallback_reasons"][reason] += 1
aggregate["fallback_actions"][action_key(action)] += 1
continue
if event_type == "TurnStart":
turn_summary = row.get("turnSummary") or {}
player_id = row.get("playerId", 0)
if turn_summary:
last_probe_by_player[player_id] = turn_summary
if int(turn_summary.get("criticalCityThreatCount") or 0) > 0:
aggregate["critical_city_threat_turns"] += 1
if int(turn_summary.get("capitalThreatCount") or 0) > 0:
aggregate["capital_threat_turns"] += 1
empty_threatened = int(turn_summary.get("emptyThreatenedCityCount") or 0)
if empty_threatened > 0:
aggregate["empty_threatened_city_turns"] += 1
aggregate["empty_threatened_city_total"] += empty_threatened
previous = last_turn_summary_by_player.get(player_id)
if previous:
previous_cities = int(previous.get("cityCount") or 0)
current_cities = int(turn_summary.get("cityCount") or 0)
if current_cities < previous_cities:
aggregate["city_lost"] += previous_cities - current_cities
if current_cities > previous_cities:
aggregate["city_gained"] += current_cities - previous_cities
if (previous.get("capitalCityIdsSignature") or "") != (turn_summary.get("capitalCityIdsSignature") or ""):
aggregate["capital_ownership_changed"] += 1
last_turn_summary_by_player[player_id] = turn_summary
continue
if event_type != "Execution":
continue
execution = row.get("execution") or {}
if not execution.get("executed"):
continue
action = execution.get("action") or {}
delta = execution.get("delta") or {}
after_probe = execution.get("after") or {}
player_id = row.get("playerId", 0)
if after_probe:
last_probe_by_player[player_id] = after_probe
key = action_key(action)
turn_action_key = (player_id, row.get("playerTurn", 0), stable_key(action))
decision_lane = decision_lane_by_turn_action.get(turn_action_key, "")
decision_reason = decision_reason_by_turn_action.get(turn_action_key, "")
aggregate["player_actions"][player_id] += 1
hero = hero_identity(action)
if hero:
update_expression_stats(aggregate["hero_personal"][hero], key, delta, execution)
aggregate["hero_personal_actions"][hero][key] += 1
aggregate["hero_personal_styles"][hero][classify_style_bucket(decision_reason, key)] += 1
if int(delta.get("selectedHeroDelta") or 0) > 0:
aggregate["hero_personal"][hero]["selected"] += int(delta.get("selectedHeroDelta") or 0)
if int(delta.get("heroDelta") or 0) > 0:
aggregate["hero_personal"][hero]["spawned"] += int(delta.get("heroDelta") or 0)
if int(delta.get("heroTaskDelta") or 0) != 0:
aggregate["hero_personal"][hero]["tasks"] += int(delta.get("heroTaskDelta") or 0)
if decision_lane in ("HeroManagement", "HeroPlaybook"):
aggregate["hero_executed_actions"][key] += 1
if decision_lane == "Fallback":
aggregate["fallback_executed_actions"][key] += 1
if has_no_effect_delta(execution):
aggregate["fallback_no_effect_actions"][key] += 1
aggregate["hero_delta"] += int(delta.get("heroDelta") or 0)
aggregate["selected_hero_delta"] += int(delta.get("selectedHeroDelta") or 0)
aggregate["hero_task_delta"] += int(delta.get("heroTaskDelta") or 0)
aggregate["ready_hero_task_delta"] += int(delta.get("readyHeroTaskDelta") or 0)
aggregate["forced_hero_task_delta"] += int(delta.get("forcedHeroTaskDelta") or 0)
aggregate["hero_task_progress_delta"] += int(delta.get("heroTaskProgressDelta") or 0)
if delta.get("cityThreatResolved"):
aggregate["city_threat_resolved"] += 1
if delta.get("cityThreatWorsened"):
aggregate["city_threat_worsened"] += 1
if decision_lane == "Emergency":
aggregate["emergency_executions"] += 1
if decision_lane == "Emergency" and key.startswith("UnitMove"):
aggregate["defender_return"] += 1
if is_hero_action(action):
bucket = classify_style_bucket(decision_reason, key)
if is_defensive_style(bucket) or decision_lane == "Emergency":
aggregate["hero_defensive_use"] += 1
before = execution.get("before") or {}
skill_signature = before.get("unitSkillSignature") or ""
unit_type = actor_unit_type(action)
role = unit_role(unit_type)
if unit_type:
update_expression_stats(aggregate["unit_type_stats"][unit_type], key, delta, execution)
aggregate["unit_type_actions"][unit_type][key] += 1
if role and role != "Hero":
update_expression_stats(aggregate["unit_role_stats"][role], key, delta, execution)
aggregate["unit_role_actions"][role][key] += 1
if skill_signature:
compact_signature = compact_skill_signature(skill_signature)
aggregate["unit_skill_actions"][key] += 1
aggregate["actor_skill_signatures"][compact_signature] += 1
update_expression_stats(aggregate["skill_signature_stats"][compact_signature], key, delta, execution)
aggregate["skill_signature_actions"][compact_signature][key] += 1
if delta_has_skill_change(delta):
aggregate["unit_skill_changed_actions"][key] += 1
if delta.get("targetUnitDied"):
aggregate["kills"] += 1
aggregate["player_kills"][player_id] += 1
if delta.get("unitDied"):
aggregate["self_deaths"] += 1
aggregate["player_self_deaths"][player_id] += 1
if delta.get("cityOwnerChanged"):
aggregate["city_owner_changes"] += 1
if delta.get("targetCityOwnerChanged"):
aggregate["target_city_owner_changes"] += 1
turn_key = (player_id, row.get("playerTurn", 0))
turn_rows[turn_key].append(row)
stable_rows_by_turn[(turn_key[0], turn_key[1], stable_key(action))].append(row)
for key, grouped_rows in stable_rows_by_turn.items():
turn_key = (key[0], key[1])
for row in grouped_rows:
row["_turn_rows"] = turn_rows[turn_key]
if len(grouped_rows) > 1 and not is_tactical_repeat(grouped_rows):
aggregate["top_repeated"][str(key)] += len(grouped_rows)
for probe in last_probe_by_player.values():
aggregate["final_selected_hero_counts"].append(int(probe.get("selectedHeroCount") or 0))
aggregate["final_spawned_hero_counts"].append(int(probe.get("heroCount") or 0))
aggregate["final_max_hero_counts"].append(int(probe.get("maxHeroCount") or 0))
return aggregate
def quality_warnings(batch, metrics, log_metrics):
warnings = []
options = batch.get("options") or {}
max_turns = int(options.get("MaxTurns") or 0)
failed = int(batch.get("failedGames") or 0)
if failed:
warnings.append(f"FAILED_GAMES: {failed} games failed.")
if metrics["games"] and metrics["avg_elapsed_sec"] > 0:
if metrics["avg_actions_per_sec"] < 15:
warnings.append(f"SLOW_ACTION_THROUGHPUT: actions/sec={metrics['avg_actions_per_sec']:.1f}.")
if metrics["avg_elapsed_sec"] > max(90, max_turns * 8):
warnings.append(f"SLOW_GAME_RUNTIME: avg seconds/game={metrics['avg_elapsed_sec']:.1f}.")
if metrics["avg_alive_city_count"] < 1.35 and max_turns >= 12:
warnings.append(f"LOW_EXPANSION: alive avg city count={metrics['avg_alive_city_count']:.2f}.")
if metrics["alive_city_ge_2_ratio"] < 0.25 and max_turns >= 12:
warnings.append(f"FEW_SECOND_CITIES: alive players with >=2 cities={metrics['alive_city_ge_2_ratio']:.1%}.")
if metrics["avg_surviving_players"] < metrics["players_per_game"] * 0.65 and max_turns <= 20:
warnings.append(f"EARLY_ELIMINATION: avg survivors={metrics['avg_surviving_players']:.1f}/{metrics['players_per_game']:.1f}.")
if log_metrics["max_actions_per_player_turn"] > 80:
warnings.append(f"HIGH_ACTIONS_PER_TURN: max={log_metrics['max_actions_per_player_turn']}.")
if log_metrics["no_effect"] > 0:
warnings.append(f"NO_EFFECT_ACTIONS: count={log_metrics['no_effect']}.")
if log_metrics["repeated_stable"] > 0:
warnings.append(f"REPEATED_ACTIONS: count={log_metrics['repeated_stable']}.")
if log_metrics["city_lost"] > 0:
warnings.append(f"CITY_LOSS: cityLost={log_metrics['city_lost']}.")
if log_metrics["capital_ownership_changed"] > 0:
warnings.append(f"CAPITAL_CHANGED: count={log_metrics['capital_ownership_changed']}.")
defense_opportunities = log_metrics["critical_city_threat_turns"] + log_metrics["empty_threatened_city_turns"]
if defense_opportunities > 0 and log_metrics["emergency_decisions"] == 0:
warnings.append(f"NO_EMERGENCY_RESPONSE: defenseOpportunities={defense_opportunities}.")
if log_metrics["city_threat_worsened"] > log_metrics["city_threat_resolved"] + max(2, defense_opportunities // 4):
warnings.append(
f"DEFENSE_WORSENING: worsened={log_metrics['city_threat_worsened']} resolved={log_metrics['city_threat_resolved']}."
)
if log_metrics["decide_ms"]:
p95 = percentile(log_metrics["decide_ms"], 0.95)
if p95 > 60:
warnings.append(f"SLOW_DECISION_P95: p95={p95:.1f}ms.")
return warnings
def summarize_batch(batch):
games = batch.get("results", []) or []
succeeded = [game for game in games if game.get("success")]
players = list(player_rows(batch))
alive_players = [(game, player) for game, player in players if player.get("alive")]
elapsed_values = [game_elapsed(game) for game in games]
net_actions = [game.get("netActions", 0) for game in games]
frames = [game.get("frames", 0) for game in games]
max_turns = [game.get("maxPlayerTurn", 0) for game in games]
survivors = [game.get("survivingPlayers", 0) for game in games]
players_per_game = average([game.get("playerCount", 0) for game in games])
city_counts = [player.get("cityCount", 0) for _, player in players]
alive_city_counts = [player.get("cityCount", 0) for _, player in alive_players]
unit_counts = [player.get("unitCount", 0) for _, player in players]
alive_unit_counts = [player.get("unitCount", 0) for _, player in alive_players]
scores = [player.get("score", 0) for _, player in players]
selected_hero_counts = [player.get("selectedHeroCount", 0) for _, player in players]
spawned_hero_counts = [player.get("spawnedHeroCount", 0) for _, player in players]
max_hero_counts = [player.get("maxHeroCount", 0) for _, player in players]
eligible_players = [
(game, player)
for game, player in players
if player.get("heroEligible")
or player.get("selectedHeroCount", 0) > 0
or player.get("spawnedHeroCount", 0) > 0
or player.get("maxHeroCount", 0) > 1
]
eligible_selected_hero_counts = [player.get("selectedHeroCount", 0) for _, player in eligible_players]
eligible_spawned_hero_counts = [player.get("spawnedHeroCount", 0) for _, player in eligible_players]
eligible_max_hero_counts = [player.get("maxHeroCount", 0) for _, player in eligible_players]
total_elapsed = sum(elapsed_values)
total_actions = sum(net_actions)
total_frames = sum(frames)
alive_count = len(alive_players)
return {
"games": len(games),
"succeeded": len(succeeded),
"failed": int(batch.get("failedGames") or 0),
"players_per_game": players_per_game,
"avg_elapsed_sec": average(elapsed_values),
"total_elapsed_sec": total_elapsed,
"avg_turn": average(max_turns),
"avg_net_actions": average(net_actions),
"avg_actions_per_turn": total_actions / max(1.0, sum(max_turns)),
"avg_actions_per_sec": total_actions / max(0.001, total_elapsed),
"avg_frames_per_sec": total_frames / max(0.001, total_elapsed),
"avg_surviving_players": average(survivors),
"avg_eliminations": average([max(0.0, players_per_game - value) for value in survivors]),
"avg_city_count": average(city_counts),
"avg_alive_city_count": average(alive_city_counts),
"max_city_count": max(city_counts) if city_counts else 0,
"alive_city_ge_2_ratio": sum(1 for value in alive_city_counts if value >= 2) / max(1, alive_count),
"alive_city_ge_3_ratio": sum(1 for value in alive_city_counts if value >= 3) / max(1, alive_count),
"avg_unit_count": average(unit_counts),
"avg_alive_unit_count": average(alive_unit_counts),
"avg_score": average(scores),
"score_p10": percentile(scores, 0.10),
"score_p50": percentile(scores, 0.50),
"score_p90": percentile(scores, 0.90),
"hero_eligible_count": len(eligible_players),
"hero_eligible_ratio": len(eligible_players) / max(1, len(players)),
"avg_selected_hero_count": average(selected_hero_counts),
"avg_spawned_hero_count": average(spawned_hero_counts),
"avg_max_hero_count": average(max_hero_counts),
"selected_hero_ge_2_ratio": sum(1 for value in selected_hero_counts if value >= 2) / max(1, len(players)),
"spawned_hero_ge_1_ratio": sum(1 for value in spawned_hero_counts if value >= 1) / max(1, len(players)),
"spawned_hero_ge_2_ratio": sum(1 for value in spawned_hero_counts if value >= 2) / max(1, len(players)),
"eligible_avg_selected_hero_count": average(eligible_selected_hero_counts),
"eligible_avg_spawned_hero_count": average(eligible_spawned_hero_counts),
"eligible_avg_max_hero_count": average(eligible_max_hero_counts),
"eligible_selected_hero_ge_2_ratio": (
sum(1 for value in eligible_selected_hero_counts if value >= 2) / max(1, len(eligible_players))
),
"eligible_spawned_hero_ge_1_ratio": (
sum(1 for value in eligible_spawned_hero_counts if value >= 1) / max(1, len(eligible_players))
),
"eligible_spawned_hero_ge_2_ratio": (
sum(1 for value in eligible_spawned_hero_counts if value >= 2) / max(1, len(eligible_players))
),
"win_players": [
{
"gameIndex": game.get("gameIndex"),
"playerId": player.get("id"),
"civId": player.get("civId"),
"score": player.get("score"),
"cityCount": player.get("cityCount"),
"unitCount": player.get("unitCount"),
}
for game, player in players
if player.get("isWin")
],
}
def to_jsonable(value):
if isinstance(value, Counter):
return value.most_common()
if isinstance(value, dict):
return {key: to_jsonable(item) for key, item in value.items()}
if isinstance(value, list):
return [to_jsonable(item) for item in value]
return value
def compact_log_metrics(log_metrics, top):
decide_ms = log_metrics["decide_ms"]
defense_opportunities = log_metrics["critical_city_threat_turns"] + log_metrics["empty_threatened_city_turns"]
emergency_response_rate = (
min(1.0, log_metrics["emergency_decisions"] / defense_opportunities)
if defense_opportunities > 0
else 0.0
)
defense_score = (
log_metrics["city_threat_resolved"] * 3
+ log_metrics["emergency_executions"]
+ log_metrics["defender_return"]
+ log_metrics["hero_defensive_use"] * 2
- log_metrics["city_lost"] * 8
- log_metrics["capital_threat_turns"] * 2
- log_metrics["empty_threatened_city_turns"] * 2
- log_metrics["city_threat_worsened"] * 2
)
return {
"logs": [str(path) for path in log_metrics["logs"]],
"rows": log_metrics["rows"],
"decisions": log_metrics["decisions"],
"executions": log_metrics["executions"],
"no_effect": log_metrics["no_effect"],
"repeated_stable": log_metrics["repeated_stable"],
"max_actions_per_player_turn": log_metrics["max_actions_per_player_turn"],
"kills": log_metrics["kills"],
"self_deaths": log_metrics["self_deaths"],
"city_owner_changes": log_metrics["city_owner_changes"],
"target_city_owner_changes": log_metrics["target_city_owner_changes"],
"defense": {
"critical_city_threat_turns": log_metrics["critical_city_threat_turns"],
"capital_threat_turns": log_metrics["capital_threat_turns"],
"empty_threatened_city_turns": log_metrics["empty_threatened_city_turns"],
"empty_threatened_city_total": log_metrics["empty_threatened_city_total"],
"defense_opportunity_turns": defense_opportunities,
"city_threat_resolved": log_metrics["city_threat_resolved"],
"city_threat_worsened": log_metrics["city_threat_worsened"],
"city_lost": log_metrics["city_lost"],
"city_gained": log_metrics["city_gained"],
"capital_ownership_changed": log_metrics["capital_ownership_changed"],
"emergency_decisions": log_metrics["emergency_decisions"],
"emergency_executions": log_metrics["emergency_executions"],
"defender_return": log_metrics["defender_return"],
"hero_defensive_use": log_metrics["hero_defensive_use"],
"emergency_response_rate": emergency_response_rate,
"defense_score": defense_score,
"top_reasons": log_metrics["defense_reasons"].most_common(top),
"top_actions": log_metrics["defense_actions"].most_common(top),
},
"decision_time": {
"avg_ms": average(decide_ms),
"p95_ms": percentile(decide_ms, 0.95),
"max_ms": max(decide_ms) if decide_ms else 0,
},
"top_actions": log_metrics["action_counts"].most_common(top),
"top_lanes": log_metrics["lane_counts"].most_common(top),
"hero": {
"lane_counts": log_metrics["hero_lane_counts"].most_common(top),
"top_reasons": log_metrics["hero_reasons"].most_common(top),
"top_actions": log_metrics["hero_actions"].most_common(top),
"top_executed_actions": log_metrics["hero_executed_actions"].most_common(top),
"style_buckets": log_metrics["hero_style_buckets"].most_common(top),
"style_reasons": log_metrics["hero_style_reasons"].most_common(top),
"style_actions": log_metrics["hero_style_actions"].most_common(top),
"hero_delta": log_metrics["hero_delta"],
"selected_hero_delta": log_metrics["selected_hero_delta"],
"hero_task_delta": log_metrics["hero_task_delta"],
"ready_hero_task_delta": log_metrics["ready_hero_task_delta"],
"forced_hero_task_delta": log_metrics["forced_hero_task_delta"],
"hero_task_progress_delta": log_metrics["hero_task_progress_delta"],
"final_selected_avg": average(log_metrics["final_selected_hero_counts"]),
"final_spawned_avg": average(log_metrics["final_spawned_hero_counts"]),
"final_max_slots_avg": average(log_metrics["final_max_hero_counts"]),
"final_selected_ge_2_ratio": (
sum(1 for value in log_metrics["final_selected_hero_counts"] if value >= 2)
/ max(1, len(log_metrics["final_selected_hero_counts"]))
),
"final_selected_ge_3_ratio": (
sum(1 for value in log_metrics["final_selected_hero_counts"] if value >= 3)
/ max(1, len(log_metrics["final_selected_hero_counts"]))
),
"final_spawned_ge_1_ratio": (
sum(1 for value in log_metrics["final_spawned_hero_counts"] if value >= 1)
/ max(1, len(log_metrics["final_spawned_hero_counts"]))
),
"final_spawned_ge_2_ratio": (
sum(1 for value in log_metrics["final_spawned_hero_counts"] if value >= 2)
/ max(1, len(log_metrics["final_spawned_hero_counts"]))
),
"final_max_slots_ge_2_ratio": (
sum(1 for value in log_metrics["final_max_hero_counts"] if value >= 2)
/ max(1, len(log_metrics["final_max_hero_counts"]))
),
"final_max_slots_ge_3_ratio": (
sum(1 for value in log_metrics["final_max_hero_counts"] if value >= 3)
/ max(1, len(log_metrics["final_max_hero_counts"]))
),
},
"fallback": {
"top_reasons": log_metrics["fallback_reasons"].most_common(top),
"top_actions": log_metrics["fallback_actions"].most_common(top),
"top_executed_actions": log_metrics["fallback_executed_actions"].most_common(top),
"top_no_effect_actions": log_metrics["fallback_no_effect_actions"].most_common(top),
},
"unit_skill_expression": {
"top_actions": log_metrics["unit_skill_actions"].most_common(top),
"top_changed_actions": log_metrics["unit_skill_changed_actions"].most_common(top),
"top_actor_signatures": log_metrics["actor_skill_signatures"].most_common(top),
"skill_signature_expression": expression_rows(
log_metrics["skill_signature_stats"],
log_metrics["skill_signature_actions"],
top,
),
},
"hero_personal_expression": expression_rows(
log_metrics["hero_personal"],
log_metrics["hero_personal_actions"],
top,
role_lookup=hero_role,
style_counters=log_metrics["hero_personal_styles"],
),
"unit_role_expression": expression_rows(
log_metrics["unit_role_stats"],
log_metrics["unit_role_actions"],
top,
),
"unit_type_expression": expression_rows(
log_metrics["unit_type_stats"],
log_metrics["unit_type_actions"],
top,
role_lookup=unit_role,
),
}
def print_report(batch_path, batch, metrics, log_metrics, warnings, top):
print(f"Batch: {batch_path}")
print(f"Games: {metrics['games']} success={metrics['succeeded']} failed={metrics['failed']}")
print(
"Runtime: "
f"avgGame={metrics['avg_elapsed_sec']:.1f}s "
f"actions/sec={metrics['avg_actions_per_sec']:.1f} "
f"frames/sec={metrics['avg_frames_per_sec']:.1f} "
f"actions/turn={metrics['avg_actions_per_turn']:.1f}"
)
print(
"Outcome: "
f"avgTurn={metrics['avg_turn']:.1f} "
f"avgSurvivors={metrics['avg_surviving_players']:.1f}/{metrics['players_per_game']:.1f} "
f"avgElims={metrics['avg_eliminations']:.1f}"
)
print(
"Expansion: "
f"aliveAvgCities={metrics['avg_alive_city_count']:.2f} "
f"allAvgCities={metrics['avg_city_count']:.2f} "
f"maxCities={metrics['max_city_count']} "
f"alive>=2={metrics['alive_city_ge_2_ratio']:.1%} "
f"alive>=3={metrics['alive_city_ge_3_ratio']:.1%}"
)
print(
"Power: "
f"aliveAvgUnits={metrics['avg_alive_unit_count']:.2f} "
f"allAvgUnits={metrics['avg_unit_count']:.2f} "
f"score(p10/p50/p90)={metrics['score_p10']:.0f}/{metrics['score_p50']:.0f}/{metrics['score_p90']:.0f}"
)
print(
"HeroSummary: "
f"eligible={metrics['hero_eligible_count']} ({metrics['hero_eligible_ratio']:.1%}) "
f"allAvgSelected={metrics['avg_selected_hero_count']:.2f} "
f"allAvgSpawned={metrics['avg_spawned_hero_count']:.2f} "
f"eligibleAvgSelected={metrics['eligible_avg_selected_hero_count']:.2f} "
f"eligibleAvgSpawned={metrics['eligible_avg_spawned_hero_count']:.2f} "
f"eligibleAvgMaxSlots={metrics['eligible_avg_max_hero_count']:.2f} "
f"eligibleSpawned>=1={metrics['eligible_spawned_hero_ge_1_ratio']:.1%} "
f"eligibleSpawned>=2={metrics['eligible_spawned_hero_ge_2_ratio']:.1%} "
f"eligibleSelected>=2={metrics['eligible_selected_hero_ge_2_ratio']:.1%}"
)
if metrics["win_players"]:
print("Wins:")
for winner in metrics["win_players"]:
print(
" "
f"game={winner['gameIndex']} player={winner['playerId']} civ={winner['civId']} "
f"score={winner['score']} cities={winner['cityCount']} units={winner['unitCount']}"
)
else:
print("Wins: <none>")
print(f"Logs: {len(log_metrics['logs'])} rows={log_metrics['rows']}")
print(
"Action quality: "
f"executions={log_metrics['executions']} "
f"maxActions/playerTurn={log_metrics['max_actions_per_player_turn']} "
f"noEffect={log_metrics['no_effect']} "
f"repeated={log_metrics['repeated_stable']}"
)
print(
"Attrition proxy: "
f"kills={log_metrics['kills']} "
f"actingUnitDeaths={log_metrics['self_deaths']} "
f"cityOwnerChanges={log_metrics['city_owner_changes']} "
f"targetCityOwnerChanges={log_metrics['target_city_owner_changes']}"
)
defense_opportunities = log_metrics["critical_city_threat_turns"] + log_metrics["empty_threatened_city_turns"]
emergency_response_rate = (
min(1.0, log_metrics["emergency_decisions"] / defense_opportunities)
if defense_opportunities > 0
else 0.0
)
defense_score = (
log_metrics["city_threat_resolved"] * 3
+ log_metrics["emergency_executions"]
+ log_metrics["defender_return"]
+ log_metrics["hero_defensive_use"] * 2
- log_metrics["city_lost"] * 8
- log_metrics["capital_threat_turns"] * 2
- log_metrics["empty_threatened_city_turns"] * 2
- log_metrics["city_threat_worsened"] * 2
)
print(
"Defense: "
f"score={defense_score:.1f} "
f"criticalThreatTurns={log_metrics['critical_city_threat_turns']} "
f"capitalThreatTurns={log_metrics['capital_threat_turns']} "
f"emptyThreatTurns={log_metrics['empty_threatened_city_turns']} "
f"resolved={log_metrics['city_threat_resolved']} "
f"worsened={log_metrics['city_threat_worsened']} "
f"cityLost={log_metrics['city_lost']} "
f"cityGained={log_metrics['city_gained']} "
f"emergencyResponse={emergency_response_rate:.1%} "
f"defenderReturn={log_metrics['defender_return']} "
f"heroDefense={log_metrics['hero_defensive_use']}"
)
if log_metrics["decide_ms"]:
print(
"Decision time: "
f"avg={average(log_metrics['decide_ms']):.1f}ms "
f"p95={percentile(log_metrics['decide_ms'], 0.95):.1f}ms "
f"max={max(log_metrics['decide_ms']):.1f}ms"
)
print("Top actions:")
for key, count in log_metrics["action_counts"].most_common(top):
print(f" {count:>5} {key}")
print("Top lanes:")
for key, count in log_metrics["lane_counts"].most_common(top):
print(f" {count:>5} {key}")
print("Hero:")
print(
" deltas: "
f"selected={log_metrics['selected_hero_delta']} "
f"spawned={log_metrics['hero_delta']} "
f"task={log_metrics['hero_task_delta']} "
f"readyTask={log_metrics['ready_hero_task_delta']} "
f"forcedTask={log_metrics['forced_hero_task_delta']} "
f"taskProgress={log_metrics['hero_task_progress_delta']}"
)
final_hero_sample_count = len(log_metrics["final_selected_hero_counts"])
if final_hero_sample_count > 0:
selected_ge_2 = sum(1 for value in log_metrics["final_selected_hero_counts"] if value >= 2) / final_hero_sample_count
selected_ge_3 = sum(1 for value in log_metrics["final_selected_hero_counts"] if value >= 3) / final_hero_sample_count
spawned_ge_1 = sum(1 for value in log_metrics["final_spawned_hero_counts"] if value >= 1) / final_hero_sample_count
spawned_ge_2 = sum(1 for value in log_metrics["final_spawned_hero_counts"] if value >= 2) / final_hero_sample_count
slots_ge_2 = sum(1 for value in log_metrics["final_max_hero_counts"] if value >= 2) / final_hero_sample_count
slots_ge_3 = sum(1 for value in log_metrics["final_max_hero_counts"] if value >= 3) / final_hero_sample_count
print(
" final counts: "
f"avgSelected={average(log_metrics['final_selected_hero_counts']):.2f} "
f"avgSpawned={average(log_metrics['final_spawned_hero_counts']):.2f} "
f"avgMaxSlots={average(log_metrics['final_max_hero_counts']):.2f} "
f"selected>=2={selected_ge_2:.1%} "
f"selected>=3={selected_ge_3:.1%} "
f"spawned>=1={spawned_ge_1:.1%} "
f"spawned>=2={spawned_ge_2:.1%} "
f"slots>=2={slots_ge_2:.1%} "
f"slots>=3={slots_ge_3:.1%}"
)
if log_metrics["hero_reasons"]:
print(" top reasons:")
for key, count in log_metrics["hero_reasons"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" top reasons: <none>")
if log_metrics["hero_executed_actions"]:
print(" executed actions:")
for key, count in log_metrics["hero_executed_actions"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" executed actions: <none>")
if log_metrics["hero_style_buckets"]:
print(" style buckets:")
for key, count in log_metrics["hero_style_buckets"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" style buckets: <none>")
print("Hero personal expression:")
hero_rows = expression_rows(
log_metrics["hero_personal"],
log_metrics["hero_personal_actions"],
top,
role_lookup=hero_role,
style_counters=log_metrics["hero_personal_styles"],
)
if hero_rows:
for row in hero_rows:
top_actions = ", ".join(f"{key}={count}" for key, count in row.get("top_actions", [])[:3]) or "-"
top_styles = ", ".join(f"{key}={count}" for key, count in row.get("top_styles", [])[:3]) or "-"
print(
f" {row['key']}[{row.get('role', '')}]: "
f"dec={row.get('decisions', 0)} exec={row.get('executions', 0)} "
f"sel={row.get('selected', 0)} spawn={row.get('spawned', 0)} "
f"kill={row.get('kills', 0)} dmg={row.get('damage', 0)} "
f"move={row.get('moves', 0)} heal={row.get('healed', 0)} "
f"skill={row.get('skillChanged', 0)} noEff={row.get('noEffect', 0)} "
f"styles=[{top_styles}] actions=[{top_actions}]"
)
else:
print(" <none>")
print("Unit skill expression:")
if log_metrics["unit_skill_actions"]:
print(" skill-bearing actor actions:")
for key, count in log_metrics["unit_skill_actions"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" skill-bearing actor actions: <none>")
if log_metrics["unit_skill_changed_actions"]:
print(" signature-changing actions:")
for key, count in log_metrics["unit_skill_changed_actions"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" signature-changing actions: <none>")
if log_metrics["actor_skill_signatures"]:
print(" actor signatures:")
for key, count in log_metrics["actor_skill_signatures"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" actor signatures: <none>")
print("Unit role expression:")
role_rows = expression_rows(log_metrics["unit_role_stats"], log_metrics["unit_role_actions"], top)
if role_rows:
for row in role_rows:
top_actions = ", ".join(f"{key}={count}" for key, count in row.get("top_actions", [])[:3]) or "-"
print(
f" {row['key']}: exec={row.get('executions', 0)} "
f"kill={row.get('kills', 0)} death={row.get('selfDeaths', 0)} "
f"dmg={row.get('damage', 0)} taken={row.get('taken', 0)} "
f"move={row.get('moves', 0)} cap={row.get('captures', 0)} "
f"skill={row.get('skillChanged', 0)} noEff={row.get('noEffect', 0)} "
f"actions=[{top_actions}]"
)
else:
print(" <none>")
print("Unit type expression:")
type_rows = expression_rows(
log_metrics["unit_type_stats"],
log_metrics["unit_type_actions"],
top,
role_lookup=unit_role,
)
if type_rows:
for row in type_rows:
top_actions = ", ".join(f"{key}={count}" for key, count in row.get("top_actions", [])[:3]) or "-"
print(
f" {row['key']}[{row.get('role', '')}]: exec={row.get('executions', 0)} "
f"kill={row.get('kills', 0)} death={row.get('selfDeaths', 0)} "
f"dmg={row.get('damage', 0)} move={row.get('moves', 0)} "
f"cap={row.get('captures', 0)} skill={row.get('skillChanged', 0)} "
f"noEff={row.get('noEffect', 0)} actions=[{top_actions}]"
)
else:
print(" <none>")
print("Fallback:")
if log_metrics["fallback_actions"]:
print(" selected actions:")
for key, count in log_metrics["fallback_actions"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" selected actions: <none>")
if log_metrics["fallback_no_effect_actions"]:
print(" no-effect actions:")
for key, count in log_metrics["fallback_no_effect_actions"].most_common(top):
print(f" {count:>5} {key}")
else:
print(" no-effect actions: <none>")
print("Warnings:")
if warnings:
for warning in warnings:
print(f" {warning}")
else:
print(" <none>")
def main():
parser = argparse.ArgumentParser(description="Analyze TH1 AI Director batch quality metrics.")
parser.add_argument("--batch", type=Path, help="Specific AI_Batch/*/batch_summary.json path.")
parser.add_argument("--log", action="append", default=[], help="Specific AI_Director_Diagnostics jsonl log path. Can repeat.")
parser.add_argument("--top", type=int, default=12, help="Number of top actions/lanes to print.")
parser.add_argument("--json", action="store_true", help="Emit JSON summary.")
args = parser.parse_args()
root = repo_root()
batch_path = args.batch if args.batch else find_latest_batch(root)
if not batch_path.is_absolute():
batch_path = root / batch_path
batch = load_json(batch_path)
metrics = summarize_batch(batch)
log_paths = find_matching_logs(root, batch_path, batch, args.log)
log_metrics = analyze_logs(log_paths)
warnings = quality_warnings(batch, metrics, log_metrics)
result = {
"batch": str(batch_path),
"metrics": metrics,
"logs": compact_log_metrics(log_metrics, args.top),
"warnings": warnings,
}
if args.json:
print(json.dumps(to_jsonable(result), ensure_ascii=False, indent=2))
return
print_report(batch_path, batch, metrics, log_metrics, warnings, args.top)
if __name__ == "__main__":
main()