diff --git a/test/parallel-benchmark/mzcompose.py b/test/parallel-benchmark/mzcompose.py index c8a6a333f3452..53e1a021bad6a 100644 --- a/test/parallel-benchmark/mzcompose.py +++ b/test/parallel-benchmark/mzcompose.py @@ -19,6 +19,7 @@ import threading import time from collections.abc import Iterator, Sequence +from dataclasses import dataclass from pathlib import Path from textwrap import dedent @@ -77,16 +78,23 @@ def __str__(self) -> str: return f"{self.timestamp} {self.duration}" +@dataclass +class State: + measurements: collections.defaultdict[str, list[Measurement]] + load_phase_duration: int | None + periodic_dists: dict[str, int] + + class Action: def run( self, start_time: float, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ): self._run(conns) duration = time.time() - start_time - measurements[str(self)].append(Measurement(duration, start_time)) + state.measurements[str(self)].append(Measurement(duration, start_time)) def _run(self, conns: queue.Queue): raise NotImplementedError @@ -173,7 +181,9 @@ def sleep_until(timestamp: float) -> None: class Distribution: - def generate(self, duration: int) -> Iterator[float]: + def generate( + self, duration: int, action_name: str, state: State + ) -> Iterator[float]: raise NotImplementedError @@ -183,11 +193,14 @@ class Periodic(Distribution): def __init__(self, per_second: float): self.per_second = per_second - def generate(self, duration: int) -> Iterator[float]: + def generate( + self, duration: int, action_name: str, state: State + ) -> Iterator[float]: + per_second = state.periodic_dists.get(action_name) or self.per_second next_time = time.time() - for i in range(int(duration * self.per_second)): + for i in range(int(duration * per_second)): yield next_time - next_time += 1 / self.per_second + next_time += 1 / per_second sleep_until(next_time) @@ -198,7 +211,9 @@ def __init__(self, mean: float, stddev: float): self.mean = mean self.stddev = stddev - def generate(self, duration: int) -> Iterator[float]: + def generate( + self, duration: int, action_name: str, state: State + ) -> Iterator[float]: end_time = time.time() + duration next_time = time.time() while time.time() < end_time: @@ -213,7 +228,7 @@ def run( duration: int, jobs: queue.Queue, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: raise NotImplementedError @@ -228,10 +243,10 @@ def run( duration: int, jobs: queue.Queue, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: - for start_time in self.dist.generate(duration): - jobs.put(lambda: self.action.run(start_time, conns, measurements)) + for start_time in self.dist.generate(duration, str(self.action), state): + jobs.put(lambda: self.action.run(start_time, conns, state)) class ClosedLoop(PhaseAction): @@ -243,11 +258,11 @@ def run( duration: int, jobs: queue.Queue, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: end_time = time.time() + duration while time.time() < end_time: - self.action.run(time.time(), conns, measurements) + self.action.run(time.time(), conns, state) class Phase: @@ -256,7 +271,7 @@ def run( c: Composition, jobs: queue.Queue, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: raise NotImplementedError @@ -270,7 +285,7 @@ def run( c: Composition, jobs: queue.Queue, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: c.testdrive(self.td, quiet=True) @@ -288,13 +303,14 @@ def run( c: Composition, jobs: queue.Queue, conns: queue.Queue, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: - print(f"Load phase for {self.duration}s") + duration = state.load_phase_duration or self.duration + print(f"Load phase for {duration}s") threads = [ threading.Thread( target=phase_action.run, - args=(self.duration, jobs, conns, measurements), + args=(duration, jobs, conns, state), ) for phase_action in self.phase_actions ] @@ -356,10 +372,10 @@ def setup(self, c: Composition, conn_infos: dict[str, PgConnInfo]) -> None: def run( self, c: Composition, - measurements: collections.defaultdict[str, list[Measurement]], + state: State, ) -> None: for phase in self.phases: - phase.run(c, self.jobs, self.conns, measurements) + phase.run(c, self.jobs, self.conns, state) def teardown(self) -> None: while not self.conns.empty(): @@ -595,7 +611,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; - {"INSERT INTO t3 VALUES (1);" * 100} + {"INSERT INTO t3 VALUES (1); " * 100} """, c, ), @@ -708,7 +724,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): ], guarantees={ # TODO(def-): Lower when #29235 is fixed to prevent regressions - "SELECT 1 (reuse connection)": {"avg": 5, "max": 500}, + "SELECT 1 (reuse connection)": {"avg": 5, "max": 500, "slope": 0.1}, }, ) @@ -732,7 +748,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): ), ], guarantees={ - "SELECT 1 (reuse connection)": {"qps": 2000, "max": 100}, + "SELECT 1 (reuse connection)": {"qps": 2000, "max": 100, "slope": 0.1}, }, ) @@ -756,7 +772,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): ], conn_pool_size=100, guarantees={ - "SELECT 1 (pooled)": {"avg": 5, "max": 200}, + "SELECT 1 (pooled)": {"avg": 5, "max": 200, "slope": 0.1}, }, ) @@ -796,7 +812,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): ], conn_pool_size=100, guarantees={ - "SELECT 1 (pooled)": {"avg": 5, "max": 200}, + "SELECT 1 (pooled)": {"avg": 5, "max": 200, "slope": 0.1}, }, ) @@ -815,7 +831,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): actions=[ OpenLoop( action=ReuseConnQuery( - "INSERT INTO insert_table SELECT 1, '1' WHERE NOT EXISTS (SELECT 1 FROM insert_table WHERE a = 100);", + "INSERT INTO insert_table SELECT 1, '1' WHERE NOT EXISTS (SELECT 1 FROM insert_table WHERE a = 100)", conn_infos["materialized"], strict_serializable=False, ), @@ -843,25 +859,26 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): actions=[ OpenLoop( action=ReuseConnQuery( - "INSERT INTO insert_select_table VALUES (1, '1');", + "INSERT INTO insert_select_table VALUES (1, '1')", conn_infos["materialized"], strict_serializable=False, ), - dist=Periodic(per_second=0.1), + dist=Periodic(per_second=1), ), - OpenLoop( + ClosedLoop( action=ReuseConnQuery( - "SELECT min(a) FROM insert_select_table;", + "SELECT min(a) FROM insert_select_table", conn_infos["materialized"], strict_serializable=False, ), - dist=Periodic(per_second=20), ), ], ), ], conn_pool_size=100, - # TODO(def-): Add guarantees when #29371 is fixed + guarantees={ + "SELECT min(a) FROM insert_select_table (reuse connection)": {"qps": 10, "p99": 350}, + }, ) @@ -1055,6 +1072,7 @@ def __init__(self, times: list[float], durations: list[float]): self.p95 = numpy.percentile(durations, 95) self.p99 = numpy.percentile(durations, 99) self.std = numpy.std(durations, ddof=1) + self.slope = numpy.polyfit(times, durations, 1)[0] def __str__(self) -> str: return f""" queries: {self.queries:>5} @@ -1065,7 +1083,8 @@ def __str__(self) -> str: p50: {self.p50:>7.2f}ms p95: {self.p95:>7.2f}ms p99: {self.p99:>7.2f}ms - std: {self.std:>7.2f}ms""" + std: {self.std:>7.2f}ms + slope: {self.slope:>7.2f}""" def report( @@ -1145,7 +1164,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "--size", metavar="N", type=int, - default=4, + default=1, help="default SIZE", ) @@ -1154,7 +1173,21 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: metavar="SCENARIO", action="append", type=str, - help="Scenario to run.", + help="Scenario to run", + ) + + parser.add_argument( + "--load-phase-duration", + type=int, + help="Override durations of LoadPhases", + ) + + parser.add_argument( + "--periodic-dist", + nargs=2, + metavar=("action", "per_second"), + action="append", + help="Override periodic distribution for an action with specified name", ) parser.add_argument("--mz-url", type=str, help="Remote Mz instance to run against") @@ -1240,10 +1273,15 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: } c.silent = True + for scenario_class in scenarios: scenario_name = scenario_class.__name__ print(f"--- Running scenario {scenario_name}") - measurements = collections.defaultdict(list) + state = State( + measurements=collections.defaultdict(list), + load_phase_duration=args.load_phase_duration, + periodic_dists={pd[0]: int(pd[1]) for pd in args.periodic_dist or []}, + ) scenario = scenario_class(c, conn_infos) scenario.setup(c, conn_infos) start_time = time.time() @@ -1251,18 +1289,19 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: try: # Don't let the garbage collector interfere with our measurements gc.disable() - scenario.run(c, measurements) + scenario.run(c, state) scenario.teardown() gc.collect() gc.enable() finally: failures.extend( - report(mz_string, scenario, measurements, start_time, args.guarantees) + report( + mz_string, scenario, state.measurements, start_time, args.guarantees + ) ) if failures: raise FailedTestExecutionError(errors=failures) - # TODO: Allow parametrization of scenarios (--load-phase-duration, --parallelism, ...) - # TODO: Linear regression slope as a measurement to make sure something doesn't get slower with time # TODO: Choose an existing cluster name (for remote mz) + # TODO: For CI start comparing against older version