Skip to content

Commit

Permalink
parallel-benchmark: Implement overwriting and linear regression
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Sep 5, 2024
1 parent 9fdd901 commit 4cd6b7f
Showing 1 changed file with 76 additions and 38 deletions.
114 changes: 76 additions & 38 deletions test/parallel-benchmark/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import threading
import time
from collections.abc import Iterator, Sequence
from dataclasses import dataclass
from pathlib import Path
from textwrap import dedent

Expand Down Expand Up @@ -77,16 +78,23 @@ def __str__(self) -> str:
return f"{self.timestamp} {self.duration}"


@dataclass
class State:
measurements: collections.defaultdict[str, list[Measurement]]
load_phase_duration: int | None
periodic_dists: dict[str, int]


class Action:
def run(
self,
start_time: float,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
):
self._run(conns)
duration = time.time() - start_time
measurements[str(self)].append(Measurement(duration, start_time))
state.measurements[str(self)].append(Measurement(duration, start_time))

def _run(self, conns: queue.Queue):
raise NotImplementedError
Expand Down Expand Up @@ -173,7 +181,9 @@ def sleep_until(timestamp: float) -> None:


class Distribution:
def generate(self, duration: int) -> Iterator[float]:
def generate(
self, duration: int, action_name: str, state: State
) -> Iterator[float]:
raise NotImplementedError


Expand All @@ -183,11 +193,14 @@ class Periodic(Distribution):
def __init__(self, per_second: float):
self.per_second = per_second

def generate(self, duration: int) -> Iterator[float]:
def generate(
self, duration: int, action_name: str, state: State
) -> Iterator[float]:
per_second = state.periodic_dists.get(action_name) or self.per_second
next_time = time.time()
for i in range(int(duration * self.per_second)):
for i in range(int(duration * per_second)):
yield next_time
next_time += 1 / self.per_second
next_time += 1 / per_second
sleep_until(next_time)


Expand All @@ -198,7 +211,9 @@ def __init__(self, mean: float, stddev: float):
self.mean = mean
self.stddev = stddev

def generate(self, duration: int) -> Iterator[float]:
def generate(
self, duration: int, action_name: str, state: State
) -> Iterator[float]:
end_time = time.time() + duration
next_time = time.time()
while time.time() < end_time:
Expand All @@ -213,7 +228,7 @@ def run(
duration: int,
jobs: queue.Queue,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
raise NotImplementedError

Expand All @@ -228,10 +243,10 @@ def run(
duration: int,
jobs: queue.Queue,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
for start_time in self.dist.generate(duration):
jobs.put(lambda: self.action.run(start_time, conns, measurements))
for start_time in self.dist.generate(duration, str(self.action), state):
jobs.put(lambda: self.action.run(start_time, conns, state))


class ClosedLoop(PhaseAction):
Expand All @@ -243,11 +258,11 @@ def run(
duration: int,
jobs: queue.Queue,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
end_time = time.time() + duration
while time.time() < end_time:
self.action.run(time.time(), conns, measurements)
self.action.run(time.time(), conns, state)


class Phase:
Expand All @@ -256,7 +271,7 @@ def run(
c: Composition,
jobs: queue.Queue,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
raise NotImplementedError

Expand All @@ -270,7 +285,7 @@ def run(
c: Composition,
jobs: queue.Queue,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
c.testdrive(self.td, quiet=True)

Expand All @@ -288,13 +303,14 @@ def run(
c: Composition,
jobs: queue.Queue,
conns: queue.Queue,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
print(f"Load phase for {self.duration}s")
duration = state.load_phase_duration or self.duration
print(f"Load phase for {duration}s")
threads = [
threading.Thread(
target=phase_action.run,
args=(self.duration, jobs, conns, measurements),
args=(duration, jobs, conns, state),
)
for phase_action in self.phase_actions
]
Expand Down Expand Up @@ -356,10 +372,10 @@ def setup(self, c: Composition, conn_infos: dict[str, PgConnInfo]) -> None:
def run(
self,
c: Composition,
measurements: collections.defaultdict[str, list[Measurement]],
state: State,
) -> None:
for phase in self.phases:
phase.run(c, self.jobs, self.conns, measurements)
phase.run(c, self.jobs, self.conns, state)

def teardown(self) -> None:
while not self.conns.empty():
Expand Down Expand Up @@ -595,7 +611,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
{"INSERT INTO t3 VALUES (1);" * 100}
{"INSERT INTO t3 VALUES (1); " * 100}
""",
c,
),
Expand Down Expand Up @@ -708,7 +724,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
],
guarantees={
# TODO(def-): Lower when #29235 is fixed to prevent regressions
"SELECT 1 (reuse connection)": {"avg": 5, "max": 500},
"SELECT 1 (reuse connection)": {"avg": 5, "max": 500, "slope": 0.1},
},
)

Expand All @@ -732,7 +748,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
),
],
guarantees={
"SELECT 1 (reuse connection)": {"qps": 2000, "max": 100},
"SELECT 1 (reuse connection)": {"qps": 2000, "max": 100, "slope": 0.1},
},
)

Expand All @@ -756,7 +772,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
],
conn_pool_size=100,
guarantees={
"SELECT 1 (pooled)": {"avg": 5, "max": 200},
"SELECT 1 (pooled)": {"avg": 5, "max": 200, "slope": 0.1},
},
)

Expand Down Expand Up @@ -796,7 +812,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
],
conn_pool_size=100,
guarantees={
"SELECT 1 (pooled)": {"avg": 5, "max": 200},
"SELECT 1 (pooled)": {"avg": 5, "max": 200, "slope": 0.1},
},
)

Expand All @@ -815,7 +831,7 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
actions=[
OpenLoop(
action=ReuseConnQuery(
"INSERT INTO insert_table SELECT 1, '1' WHERE NOT EXISTS (SELECT 1 FROM insert_table WHERE a = 100);",
"INSERT INTO insert_table SELECT 1, '1' WHERE NOT EXISTS (SELECT 1 FROM insert_table WHERE a = 100)",
conn_infos["materialized"],
strict_serializable=False,
),
Expand Down Expand Up @@ -843,19 +859,19 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
actions=[
OpenLoop(
action=ReuseConnQuery(
"INSERT INTO insert_select_table VALUES (1, '1');",
"INSERT INTO insert_select_table VALUES (1, '1')",
conn_infos["materialized"],
strict_serializable=False,
),
dist=Periodic(per_second=0.1),
dist=Periodic(per_second=1),
),
OpenLoop(
action=ReuseConnQuery(
"SELECT min(a) FROM insert_select_table;",
"SELECT min(a) FROM insert_select_table",
conn_infos["materialized"],
strict_serializable=False,
),
dist=Periodic(per_second=20),
dist=Periodic(per_second=15),
),
],
),
Expand Down Expand Up @@ -1055,6 +1071,7 @@ def __init__(self, times: list[float], durations: list[float]):
self.p95 = numpy.percentile(durations, 95)
self.p99 = numpy.percentile(durations, 99)
self.std = numpy.std(durations, ddof=1)
self.slope = numpy.polyfit(times, durations, 1)[0]

def __str__(self) -> str:
return f""" queries: {self.queries:>5}
Expand All @@ -1065,7 +1082,8 @@ def __str__(self) -> str:
p50: {self.p50:>7.2f}ms
p95: {self.p95:>7.2f}ms
p99: {self.p99:>7.2f}ms
std: {self.std:>7.2f}ms"""
std: {self.std:>7.2f}ms
slope: {self.slope:>7.2f}"""


def report(
Expand Down Expand Up @@ -1145,7 +1163,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
"--size",
metavar="N",
type=int,
default=4,
default=1,
help="default SIZE",
)

Expand All @@ -1154,7 +1172,21 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
metavar="SCENARIO",
action="append",
type=str,
help="Scenario to run.",
help="Scenario to run",
)

parser.add_argument(
"--load-phase-duration",
type=int,
help="Override durations of LoadPhases",
)

parser.add_argument(
"--periodic-dist",
nargs=2,
metavar=("action", "per_second"),
action="append",
help="Override periodic distribution for an action with specified name",
)

parser.add_argument("--mz-url", type=str, help="Remote Mz instance to run against")
Expand Down Expand Up @@ -1240,29 +1272,35 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
}

c.silent = True

for scenario_class in scenarios:
scenario_name = scenario_class.__name__
print(f"--- Running scenario {scenario_name}")
measurements = collections.defaultdict(list)
state = State(
measurements=collections.defaultdict(list),
load_phase_duration=args.load_phase_duration,
periodic_dists={pd[0]: int(pd[1]) for pd in args.periodic_dist or []},
)
scenario = scenario_class(c, conn_infos)
scenario.setup(c, conn_infos)
start_time = time.time()
Path(MZ_ROOT / "plots").mkdir(parents=True, exist_ok=True)
try:
# Don't let the garbage collector interfere with our measurements
gc.disable()
scenario.run(c, measurements)
scenario.run(c, state)
scenario.teardown()
gc.collect()
gc.enable()
finally:
failures.extend(
report(mz_string, scenario, measurements, start_time, args.guarantees)
report(
mz_string, scenario, state.measurements, start_time, args.guarantees
)
)

if failures:
raise FailedTestExecutionError(errors=failures)

# TODO: Allow parametrization of scenarios (--load-phase-duration, --parallelism, ...)
# TODO: Linear regression slope as a measurement to make sure something doesn't get slower with time
# TODO: Choose an existing cluster name (for remote mz)
# TODO: For CI start comparing against older version

0 comments on commit 4cd6b7f

Please sign in to comment.