From 733f7f3a4380c756fd5702d2c9db504c87f977c8 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Thu, 12 Dec 2024 20:16:47 +0000 Subject: [PATCH] DNM: Try to reproduce 8798 --- ci/nightly/pipeline.template.yml | 1 + .../materialize/parallel_workload/action.py | 12 +++++------ .../materialize/parallel_workload/database.py | 20 +++++++++---------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 7409938a1f91a..2ac13467cd00e 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1533,6 +1533,7 @@ steps: timeout_in_minutes: 90 agents: queue: hetzner-x86-64-dedi-16cpu-64gb + parallelism: 20 plugins: - ./ci/plugins/mzcompose: composition: parallel-workload diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 8824a081d4d05..1115b72e3d142 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -223,7 +223,7 @@ def run(self, exe: Executor) -> bool: ) query = f"SUBSCRIBE {obj}" if self.rng.choice([True, False]): - envelope = "UPSERT" if self.rng.choice([True, False]) else "DEBEZIUM" + envelope = "UPSERT" columns = self.rng.sample(obj.columns, len(obj.columns)) key = ", ".join(column.name(True) for column in columns) query += f" ENVELOPE {envelope} (KEY ({key}))" @@ -1677,7 +1677,7 @@ def run(self, exe: Executor) -> bool: DeploymentStatus.IS_LEADER, mz_service ) - time.sleep(self.rng.uniform(60, 120)) + time.sleep(60) return True @@ -2197,7 +2197,7 @@ def __init__( (HttpPostAction, 5), (CommitRollbackAction, 10), (ReconnectAction, 1), - (SourceInsertAction, 5), + (SourceInsertAction, 500), (FlipFlagsAction, 2), ], autocommit=False, @@ -2236,9 +2236,9 @@ def __init__( (CreateWebhookSourceAction, 2), (DropWebhookSourceAction, 2), (CreateKafkaSinkAction, 4), - (DropKafkaSinkAction, 4), - (CreateKafkaSourceAction, 4), - (DropKafkaSourceAction, 4), + (DropKafkaSinkAction, 0), + (CreateKafkaSourceAction, 100), + (DropKafkaSourceAction, 0), # TODO: Reenable when database-issues#8237 is fixed # (CreateMySqlSourceAction, 4), # (DropMySqlSourceAction, 4), diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index 97504f939bc70..18469298e1e2a 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -40,20 +40,20 @@ MAX_COLUMNS = 5 MAX_INCLUDE_HEADERS = 5 -MAX_ROWS = 50 +MAX_ROWS = 500 MAX_CLUSTERS = 4 -MAX_CLUSTER_REPLICAS = 2 +MAX_CLUSTER_REPLICAS = 1 MAX_DBS = 5 MAX_SCHEMAS = 5 MAX_TABLES = 5 MAX_VIEWS = 15 MAX_INDEXES = 15 MAX_ROLES = 15 -MAX_WEBHOOK_SOURCES = 5 -MAX_KAFKA_SOURCES = 5 -MAX_MYSQL_SOURCES = 5 -MAX_POSTGRES_SOURCES = 5 -MAX_KAFKA_SINKS = 5 +MAX_WEBHOOK_SOURCES = 2 +MAX_KAFKA_SOURCES = 10 +MAX_MYSQL_SOURCES = 2 +MAX_POSTGRES_SOURCES = 2 +MAX_KAFKA_SINKS = 2 MAX_INITIAL_DBS = 1 MAX_INITIAL_SCHEMAS = 1 @@ -62,10 +62,10 @@ MAX_INITIAL_VIEWS = 2 MAX_INITIAL_ROLES = 1 MAX_INITIAL_WEBHOOK_SOURCES = 1 -MAX_INITIAL_KAFKA_SOURCES = 1 +MAX_INITIAL_KAFKA_SOURCES = 10 MAX_INITIAL_MYSQL_SOURCES = 1 MAX_INITIAL_POSTGRES_SOURCES = 1 -MAX_INITIAL_KAFKA_SINKS = 1 +MAX_INITIAL_KAFKA_SINKS = 10 NAUGHTY_IDENTIFIERS = False @@ -567,7 +567,7 @@ def __init__( formats.extend(single_column_formats) self.format = rng.choice(formats) self.envelope = ( - "UPSERT" if self.format == "JSON" else rng.choice(["DEBEZIUM", "UPSERT"]) + "UPSERT" if self.format == "JSON" else rng.choice(["UPSERT"]) ) if self.envelope == "UPSERT" or rng.choice([True, False]): key_cols = [