149 lines
5.7 KiB
Python
149 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Aggregate raw per-run data into tidy unit-of-observation CSVs.
|
|
|
|
Outputs four CSVs into --out:
|
|
runs.csv one row per run: setup, solution, run, throughput_bps, cpu_sender, cpu_receiver
|
|
rtts.csv one row per RTT sample: setup, solution, rtt_us
|
|
idts.csv one row per per-flow inter-departure-time: setup, solution, run, stream_id, idt_us
|
|
firstflow_bins.csv one row per 50 us bin (first stream of run 1): setup, solution, t_ms, packets
|
|
"""
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
BIN_US = 50
|
|
MIN_PACKETS_PER_STREAM = 100
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p.add_argument("--raw", required=True, type=Path, help="raw aggregates directory")
|
|
p.add_argument("--setup-prefix", required=True, help="directory prefix before _<solution>")
|
|
p.add_argument("--solutions", required=True, nargs="+",
|
|
help="solution names. Default lookup: <setup-prefix>_<solution>/. "
|
|
"Use 'label:dirname' to point a solution at a non-conforming directory "
|
|
"(e.g. cake:datacenter_cake_no-tso).")
|
|
p.add_argument("--out", required=True, type=Path, help="derived CSV output directory")
|
|
return p.parse_args()
|
|
|
|
|
|
def solution_spec(spec: str, setup_prefix: str) -> tuple[str, str]:
|
|
if ":" in spec:
|
|
label, dirname = spec.split(":", 1)
|
|
return label, dirname
|
|
return spec, f"{setup_prefix}_{spec}"
|
|
|
|
|
|
def load_metrics(sol_dir: Path) -> pd.DataFrame:
|
|
df = pd.read_csv(sol_dir / "metrics.csv")
|
|
return df[["run_num", "throughput_bps", "cpu_sender", "cpu_receiver"]].rename(columns={"run_num": "run"})
|
|
|
|
|
|
def load_rtts(sol_dir: Path) -> pd.DataFrame:
|
|
with open(sol_dir / "rtt.json") as f:
|
|
rtts = json.load(f)
|
|
return pd.DataFrame({"rtt_us": pd.Series(rtts, dtype="float64")})
|
|
|
|
|
|
def load_packets(sol_dir: Path) -> pd.DataFrame:
|
|
df = pd.read_csv(sol_dir / "packets.csv")
|
|
df = df[["run_num", "stream_id", "p4_timestamp_ns"]].rename(columns={"run_num": "run"})
|
|
df = df.astype({"run": "int64", "stream_id": "int64", "p4_timestamp_ns": "int64"})
|
|
|
|
stream_counts = df.groupby("stream_id").size()
|
|
keep = stream_counts[stream_counts >= MIN_PACKETS_PER_STREAM].index
|
|
df = df[df["stream_id"].isin(keep)]
|
|
|
|
return df.sort_values("p4_timestamp_ns", kind="mergesort").reset_index(drop=True)
|
|
|
|
|
|
def per_flow_idts(packets: pd.DataFrame) -> pd.DataFrame:
|
|
rows = []
|
|
for (run, stream_id), df_flow in packets.groupby(["run", "stream_id"], sort=True):
|
|
t = df_flow["p4_timestamp_ns"].to_numpy(dtype=np.float64)
|
|
if len(t) < 2:
|
|
continue
|
|
idts = np.diff(t) / 1e3
|
|
rows.append(pd.DataFrame({"run": run, "stream_id": stream_id, "idt_us": idts}))
|
|
return pd.concat(rows, ignore_index=True)
|
|
|
|
|
|
def firstflow_bins(packets: pd.DataFrame, bin_us: int) -> pd.DataFrame:
|
|
first_run = int(packets["run"].min())
|
|
df_run = packets[packets["run"] == first_run]
|
|
first_stream = int(df_run["stream_id"].min())
|
|
df_flow = df_run[df_run["stream_id"] == first_stream].sort_values("p4_timestamp_ns", kind="mergesort")
|
|
assert len(df_flow) >= MIN_PACKETS_PER_STREAM, f"first flow (run={first_run}, stream={first_stream}) too small"
|
|
|
|
t_s = df_flow["p4_timestamp_ns"].to_numpy(dtype=np.float64) / 1e9
|
|
bin_width_s = bin_us / 1e6
|
|
t0 = float(t_s.min())
|
|
idx = np.floor((t_s - t0) / bin_width_s).astype(np.int64)
|
|
n_bins = int(idx.max() + 1)
|
|
counts = np.bincount(idx, minlength=n_bins)
|
|
t_ms = (np.arange(n_bins) * bin_width_s) * 1000.0
|
|
|
|
return pd.DataFrame({"t_ms": t_ms, "packets": counts})
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
args.out.mkdir(parents=True, exist_ok=True)
|
|
|
|
runs_frames = []
|
|
rtts_frames = []
|
|
idts_frames = []
|
|
bins_frames = []
|
|
for spec in args.solutions:
|
|
label, dirname = solution_spec(spec, args.setup_prefix)
|
|
sol_dir = args.raw / dirname
|
|
if not sol_dir.is_dir():
|
|
raise SystemExit(f"missing solution directory: {sol_dir}")
|
|
print(f" {label} (from {dirname})")
|
|
|
|
runs = load_metrics(sol_dir)
|
|
runs.insert(0, "solution", label)
|
|
runs.insert(0, "setup", args.setup_prefix)
|
|
runs_frames.append(runs)
|
|
|
|
rtts = load_rtts(sol_dir)
|
|
rtts.insert(0, "solution", label)
|
|
rtts.insert(0, "setup", args.setup_prefix)
|
|
rtts_frames.append(rtts)
|
|
|
|
if not (sol_dir / "packets.csv").exists():
|
|
print(f" no packets.csv, skipping idts + firstflow_bins")
|
|
continue
|
|
packets = load_packets(sol_dir)
|
|
if packets.empty:
|
|
print(f" no packet data, skipping idts + firstflow_bins")
|
|
continue
|
|
|
|
idts = per_flow_idts(packets)
|
|
idts.insert(0, "solution", label)
|
|
idts.insert(0, "setup", args.setup_prefix)
|
|
idts_frames.append(idts)
|
|
|
|
bins = firstflow_bins(packets, BIN_US)
|
|
bins.insert(0, "solution", label)
|
|
bins.insert(0, "setup", args.setup_prefix)
|
|
bins_frames.append(bins)
|
|
|
|
pd.concat(runs_frames, ignore_index=True).to_csv(args.out / "runs.csv", index=False)
|
|
print(f"wrote {args.out / 'runs.csv'}")
|
|
pd.concat(rtts_frames, ignore_index=True).to_csv(args.out / "rtts.csv", index=False)
|
|
print(f"wrote {args.out / 'rtts.csv'}")
|
|
if idts_frames:
|
|
pd.concat(idts_frames, ignore_index=True).to_csv(args.out / "idts.csv", index=False)
|
|
print(f"wrote {args.out / 'idts.csv'}")
|
|
if bins_frames:
|
|
pd.concat(bins_frames, ignore_index=True).to_csv(args.out / "firstflow_bins.csv", index=False)
|
|
print(f"wrote {args.out / 'firstflow_bins.csv'}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|