#!/usr/bin/env python3 """Aggregate raw per-run data into tidy unit-of-observation CSVs. Outputs four CSVs into --out: runs.csv one row per run: setup, solution, run, throughput_bps, cpu_sender, cpu_receiver rtts.csv one row per RTT sample: setup, solution, rtt_us idts.csv one row per per-flow inter-departure-time: setup, solution, run, stream_id, idt_us firstflow_bins.csv one row per 50 us bin (first stream of run 1): setup, solution, t_ms, packets """ import argparse import json from pathlib import Path import numpy as np import pandas as pd BIN_US = 50 MIN_PACKETS_PER_STREAM = 100 def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--raw", required=True, type=Path, help="raw aggregates directory") p.add_argument("--setup-prefix", required=True, help="directory prefix before _") p.add_argument("--solutions", required=True, nargs="+", help="solution names. Default lookup: _/. " "Use 'label:dirname' to point a solution at a non-conforming directory " "(e.g. cake:datacenter_cake_no-tso).") p.add_argument("--out", required=True, type=Path, help="derived CSV output directory") return p.parse_args() def solution_spec(spec: str, setup_prefix: str) -> tuple[str, str]: if ":" in spec: label, dirname = spec.split(":", 1) return label, dirname return spec, f"{setup_prefix}_{spec}" def load_metrics(sol_dir: Path) -> pd.DataFrame: df = pd.read_csv(sol_dir / "metrics.csv") return df[["run_num", "throughput_bps", "cpu_sender", "cpu_receiver"]].rename(columns={"run_num": "run"}) def load_rtts(sol_dir: Path) -> pd.DataFrame: with open(sol_dir / "rtt.json") as f: rtts = json.load(f) return pd.DataFrame({"rtt_us": pd.Series(rtts, dtype="float64")}) def load_packets(sol_dir: Path) -> pd.DataFrame: df = pd.read_csv(sol_dir / "packets.csv") df = df[["run_num", "stream_id", "p4_timestamp_ns"]].rename(columns={"run_num": "run"}) df = df.astype({"run": "int64", "stream_id": "int64", "p4_timestamp_ns": "int64"}) stream_counts = df.groupby("stream_id").size() keep = stream_counts[stream_counts >= MIN_PACKETS_PER_STREAM].index df = df[df["stream_id"].isin(keep)] return df.sort_values("p4_timestamp_ns", kind="mergesort").reset_index(drop=True) def per_flow_idts(packets: pd.DataFrame) -> pd.DataFrame: rows = [] for (run, stream_id), df_flow in packets.groupby(["run", "stream_id"], sort=True): t = df_flow["p4_timestamp_ns"].to_numpy(dtype=np.float64) if len(t) < 2: continue idts = np.diff(t) / 1e3 rows.append(pd.DataFrame({"run": run, "stream_id": stream_id, "idt_us": idts})) return pd.concat(rows, ignore_index=True) def firstflow_bins(packets: pd.DataFrame, bin_us: int) -> pd.DataFrame: first_run = int(packets["run"].min()) df_run = packets[packets["run"] == first_run] first_stream = int(df_run["stream_id"].min()) df_flow = df_run[df_run["stream_id"] == first_stream].sort_values("p4_timestamp_ns", kind="mergesort") assert len(df_flow) >= MIN_PACKETS_PER_STREAM, f"first flow (run={first_run}, stream={first_stream}) too small" t_s = df_flow["p4_timestamp_ns"].to_numpy(dtype=np.float64) / 1e9 bin_width_s = bin_us / 1e6 t0 = float(t_s.min()) idx = np.floor((t_s - t0) / bin_width_s).astype(np.int64) n_bins = int(idx.max() + 1) counts = np.bincount(idx, minlength=n_bins) t_ms = (np.arange(n_bins) * bin_width_s) * 1000.0 return pd.DataFrame({"t_ms": t_ms, "packets": counts}) def main() -> None: args = parse_args() args.out.mkdir(parents=True, exist_ok=True) runs_frames = [] rtts_frames = [] idts_frames = [] bins_frames = [] for spec in args.solutions: label, dirname = solution_spec(spec, args.setup_prefix) sol_dir = args.raw / dirname if not sol_dir.is_dir(): raise SystemExit(f"missing solution directory: {sol_dir}") print(f" {label} (from {dirname})") runs = load_metrics(sol_dir) runs.insert(0, "solution", label) runs.insert(0, "setup", args.setup_prefix) runs_frames.append(runs) rtts = load_rtts(sol_dir) rtts.insert(0, "solution", label) rtts.insert(0, "setup", args.setup_prefix) rtts_frames.append(rtts) if not (sol_dir / "packets.csv").exists(): print(f" no packets.csv, skipping idts + firstflow_bins") continue packets = load_packets(sol_dir) if packets.empty: print(f" no packet data, skipping idts + firstflow_bins") continue idts = per_flow_idts(packets) idts.insert(0, "solution", label) idts.insert(0, "setup", args.setup_prefix) idts_frames.append(idts) bins = firstflow_bins(packets, BIN_US) bins.insert(0, "solution", label) bins.insert(0, "setup", args.setup_prefix) bins_frames.append(bins) pd.concat(runs_frames, ignore_index=True).to_csv(args.out / "runs.csv", index=False) print(f"wrote {args.out / 'runs.csv'}") pd.concat(rtts_frames, ignore_index=True).to_csv(args.out / "rtts.csv", index=False) print(f"wrote {args.out / 'rtts.csv'}") if idts_frames: pd.concat(idts_frames, ignore_index=True).to_csv(args.out / "idts.csv", index=False) print(f"wrote {args.out / 'idts.csv'}") if bins_frames: pd.concat(bins_frames, ignore_index=True).to_csv(args.out / "firstflow_bins.csv", index=False) print(f"wrote {args.out / 'firstflow_bins.csv'}") if __name__ == "__main__": main()