Files
2026-05-27 21:00:28 +02:00

149 lines
5.7 KiB
Python

#!/usr/bin/env python3
"""Aggregate raw per-run data into tidy unit-of-observation CSVs.
Outputs four CSVs into --out:
runs.csv one row per run: setup, solution, run, throughput_bps, cpu_sender, cpu_receiver
rtts.csv one row per RTT sample: setup, solution, rtt_us
idts.csv one row per per-flow inter-departure-time: setup, solution, run, stream_id, idt_us
firstflow_bins.csv one row per 50 us bin (first stream of run 1): setup, solution, t_ms, packets
"""
import argparse
import json
from pathlib import Path
import numpy as np
import pandas as pd
BIN_US = 50
MIN_PACKETS_PER_STREAM = 100
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--raw", required=True, type=Path, help="raw aggregates directory")
p.add_argument("--setup-prefix", required=True, help="directory prefix before _<solution>")
p.add_argument("--solutions", required=True, nargs="+",
help="solution names. Default lookup: <setup-prefix>_<solution>/. "
"Use 'label:dirname' to point a solution at a non-conforming directory "
"(e.g. cake:datacenter_cake_no-tso).")
p.add_argument("--out", required=True, type=Path, help="derived CSV output directory")
return p.parse_args()
def solution_spec(spec: str, setup_prefix: str) -> tuple[str, str]:
if ":" in spec:
label, dirname = spec.split(":", 1)
return label, dirname
return spec, f"{setup_prefix}_{spec}"
def load_metrics(sol_dir: Path) -> pd.DataFrame:
df = pd.read_csv(sol_dir / "metrics.csv")
return df[["run_num", "throughput_bps", "cpu_sender", "cpu_receiver"]].rename(columns={"run_num": "run"})
def load_rtts(sol_dir: Path) -> pd.DataFrame:
with open(sol_dir / "rtt.json") as f:
rtts = json.load(f)
return pd.DataFrame({"rtt_us": pd.Series(rtts, dtype="float64")})
def load_packets(sol_dir: Path) -> pd.DataFrame:
df = pd.read_csv(sol_dir / "packets.csv")
df = df[["run_num", "stream_id", "p4_timestamp_ns"]].rename(columns={"run_num": "run"})
df = df.astype({"run": "int64", "stream_id": "int64", "p4_timestamp_ns": "int64"})
stream_counts = df.groupby("stream_id").size()
keep = stream_counts[stream_counts >= MIN_PACKETS_PER_STREAM].index
df = df[df["stream_id"].isin(keep)]
return df.sort_values("p4_timestamp_ns", kind="mergesort").reset_index(drop=True)
def per_flow_idts(packets: pd.DataFrame) -> pd.DataFrame:
rows = []
for (run, stream_id), df_flow in packets.groupby(["run", "stream_id"], sort=True):
t = df_flow["p4_timestamp_ns"].to_numpy(dtype=np.float64)
if len(t) < 2:
continue
idts = np.diff(t) / 1e3
rows.append(pd.DataFrame({"run": run, "stream_id": stream_id, "idt_us": idts}))
return pd.concat(rows, ignore_index=True)
def firstflow_bins(packets: pd.DataFrame, bin_us: int) -> pd.DataFrame:
first_run = int(packets["run"].min())
df_run = packets[packets["run"] == first_run]
first_stream = int(df_run["stream_id"].min())
df_flow = df_run[df_run["stream_id"] == first_stream].sort_values("p4_timestamp_ns", kind="mergesort")
assert len(df_flow) >= MIN_PACKETS_PER_STREAM, f"first flow (run={first_run}, stream={first_stream}) too small"
t_s = df_flow["p4_timestamp_ns"].to_numpy(dtype=np.float64) / 1e9
bin_width_s = bin_us / 1e6
t0 = float(t_s.min())
idx = np.floor((t_s - t0) / bin_width_s).astype(np.int64)
n_bins = int(idx.max() + 1)
counts = np.bincount(idx, minlength=n_bins)
t_ms = (np.arange(n_bins) * bin_width_s) * 1000.0
return pd.DataFrame({"t_ms": t_ms, "packets": counts})
def main() -> None:
args = parse_args()
args.out.mkdir(parents=True, exist_ok=True)
runs_frames = []
rtts_frames = []
idts_frames = []
bins_frames = []
for spec in args.solutions:
label, dirname = solution_spec(spec, args.setup_prefix)
sol_dir = args.raw / dirname
if not sol_dir.is_dir():
raise SystemExit(f"missing solution directory: {sol_dir}")
print(f" {label} (from {dirname})")
runs = load_metrics(sol_dir)
runs.insert(0, "solution", label)
runs.insert(0, "setup", args.setup_prefix)
runs_frames.append(runs)
rtts = load_rtts(sol_dir)
rtts.insert(0, "solution", label)
rtts.insert(0, "setup", args.setup_prefix)
rtts_frames.append(rtts)
if not (sol_dir / "packets.csv").exists():
print(f" no packets.csv, skipping idts + firstflow_bins")
continue
packets = load_packets(sol_dir)
if packets.empty:
print(f" no packet data, skipping idts + firstflow_bins")
continue
idts = per_flow_idts(packets)
idts.insert(0, "solution", label)
idts.insert(0, "setup", args.setup_prefix)
idts_frames.append(idts)
bins = firstflow_bins(packets, BIN_US)
bins.insert(0, "solution", label)
bins.insert(0, "setup", args.setup_prefix)
bins_frames.append(bins)
pd.concat(runs_frames, ignore_index=True).to_csv(args.out / "runs.csv", index=False)
print(f"wrote {args.out / 'runs.csv'}")
pd.concat(rtts_frames, ignore_index=True).to_csv(args.out / "rtts.csv", index=False)
print(f"wrote {args.out / 'rtts.csv'}")
if idts_frames:
pd.concat(idts_frames, ignore_index=True).to_csv(args.out / "idts.csv", index=False)
print(f"wrote {args.out / 'idts.csv'}")
if bins_frames:
pd.concat(bins_frames, ignore_index=True).to_csv(args.out / "firstflow_bins.csv", index=False)
print(f"wrote {args.out / 'firstflow_bins.csv'}")
if __name__ == "__main__":
main()