#!/usr/bin/env python3 """Check raw_data for duplicate per-run metrics rows.""" from __future__ import annotations import csv import hashlib from pathlib import Path import argparse RAW = Path(__file__).parent.parent / "raw_data" def metrics_payload(p: Path) -> tuple[str, ...]: with p.open() as f: rows = list(csv.reader(f)) return tuple(rows[1][2:]) def packet_signature(p: Path) -> tuple[int, int, tuple[int, ...], int, str]: size = p.stat().st_size streams: dict[int, int] = {} timestamps: list[int] = [] with p.open() as f: next(f) for line in f: parts = line.rstrip().split(",") streams[int(parts[2])] = streams.get(int(parts[2]), 0) + 1 timestamps.append(int(parts[3])) timestamps.sort() span = timestamps[-1] - timestamps[0] if timestamps else 0 ts_hash = hashlib.sha1(",".join(map(str, timestamps)).encode()).hexdigest()[:12] return size, len(timestamps), tuple(sorted(streams)), span, ts_hash def check_solution(sol_dir: Path) -> list[str]: payloads: dict[tuple[str, ...], list[int]] = {} for run_dir in sol_dir.glob("tmp_run_*"): n = int(run_dir.name.removeprefix("tmp_run_")) try: payloads.setdefault(metrics_payload(run_dir / "metrics_row.csv"), []).append(n) except FileNotFoundError: continue report = [] for payload, runs in payloads.items(): if len(runs) < 2: continue runs.sort() base = runs[0] for other in runs[1:]: base_pp = sol_dir / f"tmp_run_{base}" / "parsed_packets.csv" other_pp = sol_dir / f"tmp_run_{other}" / "parsed_packets.csv" if not base_pp.exists() or not other_pp.exists(): report.append( f" run {base:>2} = run {other:<2} metrics: same " f"parsed_packets: MISSING -> duplicate confirmed via metrics only" ) continue sig_base, sig_other = packet_signature(base_pp), packet_signature(other_pp) ts_match = (sig_base[4] == sig_other[4]) if ts_match: verdict = "DEEPER BUG (timestamps identical despite different stream IDs)" elif sig_base[1] == sig_other[1] and sig_base[3] == sig_other[3]: verdict = "BUG + suspicious (rows + span match but timestamps differ)" else: verdict = "BUG (independent captures, metrics row stamped from another run)" report.append( f" run {base:>2} = run {other:<2} metrics: same -> {verdict}\n" f" run {base:>2}: size={sig_base[0]:>10} rows={sig_base[1]:>7} " f"streams={list(sig_base[2])} span_ns={sig_base[3]} ts_hash={sig_base[4]}\n" f" run {other:>2}: size={sig_other[0]:>10} rows={sig_other[1]:>7} " f"streams={list(sig_other[2])} span_ns={sig_other[3]} ts_hash={sig_other[4]}" ) return report def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--data", type=Path, default=RAW, help="raw_data root directory") args = parser.parse_args() total_bugs = 0 for exp_dir in sorted(args.data.iterdir()): agg = exp_dir / "aggregates" if not agg.is_dir(): continue for sol_dir in sorted(agg.iterdir()): if not sol_dir.is_dir(): continue lines = check_solution(sol_dir) if not lines: continue print(f"\n== {exp_dir.name}/{sol_dir.name} ==") for line in lines: print(line) if "BUG" in line: total_bugs += 1 print(f"\nconfirmed BUG count: {total_bugs}") if __name__ == "__main__": main()