From d07d41774a0202b74f4db88da492c4fa7830252d Mon Sep 17 00:00:00 2001 From: igor-aptos <110557261+igor-aptos@users.noreply.github.com> Date: Tue, 3 Oct 2023 13:01:22 -0700 Subject: [PATCH] previewnet flow in the benchmark (#10305) --- execution/executor-benchmark/src/lib.rs | 2 +- testsuite/single_node_performance.py | 206 ++++++++++++++++++------ 2 files changed, 154 insertions(+), 54 deletions(-) diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index c10eaa5c5af901..baac53d6bda10b 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -480,7 +480,7 @@ fn add_accounts_impl( let now_version = db.reader.get_latest_version().unwrap(); let delta_v = now_version - start_version; info!( - "Overall TPS: account creation: {} txn/s", + "Overall TPS: create_db: account creation: {} txn/s", delta_v as f32 / elapsed, ); diff --git a/testsuite/single_node_performance.py b/testsuite/single_node_performance.py index c84dee65e6933f..d493b96838d5e7 100755 --- a/testsuite/single_node_performance.py +++ b/testsuite/single_node_performance.py @@ -22,6 +22,10 @@ class Flow(Flag): # Tests that are run manually when using a smaller representative mode. # (i.e. for measuring speed of the machine) REPRESENTATIVE = auto() + # Tests used for previewnet evaluation + PREVIEWNET = auto() + # Tests used for previewnet evaluation + PREVIEWNET_LARGE_DB = auto() @dataclass @@ -34,6 +38,8 @@ class RunGroupKey: transaction_weights_override: Optional[str] = field(default=None) sharding_traffic_flags: Optional[str] = field(default=None) + smaller_working_set: bool = field(default=False) + @dataclass class RunGroupConfig: @@ -43,6 +49,25 @@ class RunGroupConfig: waived: bool = field(default=False) +SELECTED_FLOW = Flow[os.environ.get("FLOW", default="LAND_BLOCKING")] +IS_PREVIEWNET = SELECTED_FLOW in [Flow.PREVIEWNET, Flow.PREVIEWNET_LARGE_DB] + +DEFAULT_NUM_INIT_ACCOUNTS = ( + "100000000" if SELECTED_FLOW == Flow.PREVIEWNET_LARGE_DB else "2000000" +) +DEFAULT_MAX_BLOCK_SIZE = "25000" if IS_PREVIEWNET else "10000" + +MAX_BLOCK_SIZE = int(os.environ.get("MAX_BLOCK_SIZE", default=DEFAULT_MAX_BLOCK_SIZE)) +NUM_BLOCKS = int(os.environ.get("NUM_BLOCKS_PER_TEST", default=15)) +NUM_BLOCKS_DETAILED = 10 +NUM_ACCOUNTS = max( + [ + int(os.environ.get("NUM_INIT_ACCOUNTS", default=DEFAULT_NUM_INIT_ACCOUNTS)), + (2 + 2 * NUM_BLOCKS) * MAX_BLOCK_SIZE, + ] +) +MAIN_SIGNER_ACCOUNTS = 2 * MAX_BLOCK_SIZE + # numbers are based on the machine spec used by github action # Calibrate from https://gist.github.com/igor-aptos/7b12ca28de03894cddda8e415f37889e # Local machine numbers will be higher. @@ -85,30 +110,33 @@ class RunGroupConfig: RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded", sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.REPRESENTATIVE), RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded", sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.REPRESENTATIVE), + + # setting separately for previewnet, as we run on a different number of cores. + RunGroupConfig(expected_tps=29000 if NUM_ACCOUNTS < 5000000 else 20000, key=RunGroupKey("coin-transfer", smaller_working_set=True), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB), + RunGroupConfig(expected_tps=23000 if NUM_ACCOUNTS < 5000000 else 15000, key=RunGroupKey("account-generation"), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB), + RunGroupConfig(expected_tps=130 if NUM_ACCOUNTS < 5000000 else 60, key=RunGroupKey("publish-package"), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB), + RunGroupConfig(expected_tps=1500 if NUM_ACCOUNTS < 5000000 else 1500, key=RunGroupKey("token-v2-ambassador-mint"), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB), + RunGroupConfig(expected_tps=12000 if NUM_ACCOUNTS < 5000000 else 7000, key=RunGroupKey("token-v2-ambassador-mint", module_working_set_size=100), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB), + RunGroupConfig(expected_tps=35000 if NUM_ACCOUNTS < 5000000 else 28000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded", sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB, waived=True), + RunGroupConfig(expected_tps=27000 if NUM_ACCOUNTS < 5000000 else 23000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded", sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB, waived=True), ] # fmt: on -NOISE_LOWER_LIMIT = 0.8 -NOISE_LOWER_LIMIT_WARN = 0.9 +NOISE_LOWER_LIMIT = 0.98 if IS_PREVIEWNET else 0.8 +NOISE_LOWER_LIMIT_WARN = None if IS_PREVIEWNET else 0.9 # If you want to calibrate the upper limit for perf improvement, you can # increase this value temporarily (i.e. to 1.3) and readjust back after a day or two of runs -NOISE_UPPER_LIMIT = 1.15 -NOISE_UPPER_LIMIT_WARN = 1.05 +NOISE_UPPER_LIMIT = 5 if IS_PREVIEWNET else 1.15 +NOISE_UPPER_LIMIT_WARN = None if IS_PREVIEWNET else 1.05 # bump after a perf improvement, so you can easily distinguish runs # that are on top of this commit CODE_PERF_VERSION = "v4" -NUMBER_OF_EXECUTION_THREADS = 8 -MAX_BLOCK_SIZE = int(os.environ.get("MAX_BLOCK_SIZE", default="10000")) -NUM_BLOCKS = 15 -NUM_BLOCKS_DETAILED = 10 -NUM_ACCOUNTS = max([2000000, 4 * NUM_BLOCKS * MAX_BLOCK_SIZE]) -ADDITIONAL_DST_POOL_ACCOUNTS = 2 * NUM_BLOCKS * MAX_BLOCK_SIZE -MAIN_SIGNER_ACCOUNTS = 2 * MAX_BLOCK_SIZE - # default to using production number of execution threads for assertions -NUMBER_OF_EXECUTION_THREADS = os.environ.get("NUMBER_OF_EXECUTION_THREADS", default=8) +NUMBER_OF_EXECUTION_THREADS = int( + os.environ.get("NUMBER_OF_EXECUTION_THREADS", default=8) +) if os.environ.get("DETAILED"): EXECUTION_ONLY_NUMBER_OF_THREADS = [1, 2, 4, 8, 16, 32, 60] @@ -120,7 +148,6 @@ class RunGroupConfig: else: BUILD_FLAG = "--profile performance" -SELECTED_FLOW = Flow[os.environ.get("FLOW", default="LAND_BLOCKING")] if os.environ.get("PROD_DB_FLAGS"): DB_CONFIG_FLAGS = "" @@ -129,6 +156,13 @@ class RunGroupConfig: "--split-ledger-db --use-sharded-state-merkle-db --skip-index-and-usage" ) +if os.environ.get("ENABLE_PRUNER"): + DB_PRUNER_FLAGS = "--enable-state-pruner --enable-ledger-pruner --enable-epoch-snapshot-pruner --ledger-pruning-batch-size 10000 --state-prune-window 3000000 --epoch-snapshot-prune-window 3000000 --ledger-prune-window 3000000" +else: + DB_PRUNER_FLAGS = "" + +HIDE_OUTPUT = os.environ.get("HIDE_OUTPUT") + # Run the single node with performance optimizations enabled target_directory = "execution/executor-benchmark/src" @@ -148,17 +182,22 @@ def execute_command(command): # stream to output while command is executing if p.stdout is not None: for line in p.stdout: - print(line, end="") + if not HIDE_OUTPUT: + print(line, end="") result.append(line) - if p.returncode != 0: - raise CalledProcessError(p.returncode, p.args) - # return the full output in the end for postprocessing full_result = "\n".join(result) + if p.returncode != 0: + if HIDE_OUTPUT: + print(full_result) + raise CalledProcessError(p.returncode, p.args) + if " ERROR " in full_result: print("ERROR log line in execution") + if HIDE_OUTPUT: + print(full_result) exit(1) return full_result @@ -188,28 +227,47 @@ def get_only(values): return values[0] -def extract_run_results(output: str, execution_only: bool) -> RunResults: +def extract_run_results( + output: str, execution_only: bool, create_db: bool = False +) -> RunResults: if execution_only: tps = float(re.findall(r"Overall execution TPS: (\d+\.?\d*) txn/s", output)[-1]) gps = float(re.findall(r"Overall execution GPS: (\d+\.?\d*) gas/s", output)[-1]) gpt = float( re.findall(r"Overall execution GPT: (\d+\.?\d*) gas/txn", output)[-1] ) - + elif create_db: + tps = float( + get_only( + re.findall( + r"Overall TPS: create_db: account creation: (\d+\.?\d*) txn/s", + output, + ) + ) + ) + gps = 0 + gpt = 0 else: tps = float(get_only(re.findall(r"Overall TPS: (\d+\.?\d*) txn/s", output))) gps = float(get_only(re.findall(r"Overall GPS: (\d+\.?\d*) gas/s", output))) gpt = float(get_only(re.findall(r"Overall GPT: (\d+\.?\d*) gas/txn", output))) - fraction_in_execution = float( - re.findall(r"Overall fraction of total: (\d+\.?\d*) in execution", output)[-1] - ) - fraction_of_execution_in_vm = float( - re.findall(r"Overall fraction of execution (\d+\.?\d*) in VM", output)[-1] - ) - fraction_in_commit = float( - re.findall(r"Overall fraction of total: (\d+\.?\d*) in commit", output)[-1] - ) + if create_db: + fraction_in_execution = 0 + fraction_of_execution_in_vm = 0 + fraction_in_commit = 0 + else: + fraction_in_execution = float( + re.findall(r"Overall fraction of total: (\d+\.?\d*) in execution", output)[ + -1 + ] + ) + fraction_of_execution_in_vm = float( + re.findall(r"Overall fraction of execution (\d+\.?\d*) in VM", output)[-1] + ) + fraction_in_commit = float( + re.findall(r"Overall fraction of total: (\d+\.?\d*) in commit", output)[-1] + ) return RunResults( tps, @@ -282,11 +340,24 @@ def print_table( warnings = [] with tempfile.TemporaryDirectory() as tmpdirname: - create_db_command = f"cargo run {BUILD_FLAG} -- --block-size {MAX_BLOCK_SIZE} --execution-threads {NUMBER_OF_EXECUTION_THREADS} {DB_CONFIG_FLAGS} create-db --data-dir {tmpdirname}/db --num-accounts {NUM_ACCOUNTS}" + print(f"Warmup - creating DB with {NUM_ACCOUNTS} accounts") + create_db_command = f"cargo run {BUILD_FLAG} -- --block-size {MAX_BLOCK_SIZE} --execution-threads {NUMBER_OF_EXECUTION_THREADS} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} create-db --data-dir {tmpdirname}/db --num-accounts {NUM_ACCOUNTS}" output = execute_command(create_db_command) results = [] + results.append( + RunGroupInstance( + key=RunGroupKey("warmup"), + single_node_result=extract_run_results( + output, execution_only=False, create_db=True + ), + number_of_threads_results={}, + block_size=MAX_BLOCK_SIZE, + expected_tps=0, + ) + ) + for ( test_index, test, @@ -313,10 +384,16 @@ def print_table( elif test.key.executor_type == "native": executor_type_str = "--use-native-executor --transactions-per-sender 1" elif test.key.executor_type == "sharded": - executor_type_str = f"--async-partitioning --num-executor-shards {NUMBER_OF_EXECUTION_THREADS} {sharding_traffic_flags}" + executor_type_str = f"--num-executor-shards {NUMBER_OF_EXECUTION_THREADS} {sharding_traffic_flags}" else: raise Exception(f"executor type not supported {test.key.executor_type}") - common_command_suffix = f"{executor_type_str} --generate-then-execute --block-size {cur_block_size} {DB_CONFIG_FLAGS} run-executor {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp" + txn_emitter_prefix_str = "" if NUM_BLOCKS > 200 else " --generate-then-execute" + + ADDITIONAL_DST_POOL_ACCOUNTS = ( + 2 * MAX_BLOCK_SIZE * (1 if test.key.smaller_working_set else NUM_BLOCKS) + ) + + common_command_suffix = f"{executor_type_str} {txn_emitter_prefix_str} --block-size {cur_block_size} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} run-executor {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp" number_of_threads_results = {} @@ -363,43 +440,66 @@ def print_table( ) ) - print_table( - results, by_levels=True, single_field=("t/s", lambda r: int(round(r.tps))) - ) - print_table( - results, by_levels=True, single_field=("g/s", lambda r: int(round(r.gps))) - ) - print_table( - results, - by_levels=True, - single_field=("exe/total", lambda r: round(r.fraction_in_execution, 3)), - ) - print_table( - results, - by_levels=True, - single_field=("vm/exe", lambda r: round(r.fraction_of_execution_in_vm, 3)), - ) - print_table(results, by_levels=False, single_field=None) + if not HIDE_OUTPUT: + print_table( + results, + by_levels=True, + single_field=("t/s", lambda r: int(round(r.tps))), + ) + print_table( + results, + by_levels=True, + single_field=("g/s", lambda r: int(round(r.gps))), + ) + print_table( + results, + by_levels=True, + single_field=("exe/total", lambda r: round(r.fraction_in_execution, 3)), + ) + print_table( + results, + by_levels=True, + single_field=( + "vm/exe", + lambda r: round(r.fraction_of_execution_in_vm, 3), + ), + ) + print_table(results, by_levels=False, single_field=None) - if single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT: + if ( + NOISE_LOWER_LIMIT is not None + and single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT + ): text = f"regression detected {single_node_result.tps} < {test.expected_tps * NOISE_LOWER_LIMIT} = {test.expected_tps} * {NOISE_LOWER_LIMIT}, {test.key} didn't meet TPS requirements" if not test.waived: errors.append(text) else: warnings.append(text) - elif single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT_WARN: + elif ( + NOISE_LOWER_LIMIT_WARN is not None + and single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT_WARN + ): text = f"potential (but within normal noise) regression detected {single_node_result.tps} < {test.expected_tps * NOISE_LOWER_LIMIT_WARN} = {test.expected_tps} * {NOISE_LOWER_LIMIT_WARN}, {test.key} didn't meet TPS requirements" warnings.append(text) - elif single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT: + elif ( + NOISE_UPPER_LIMIT is not None + and single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT + ): text = f"perf improvement detected {single_node_result.tps} > {test.expected_tps * NOISE_UPPER_LIMIT} = {test.expected_tps} * {NOISE_UPPER_LIMIT}, {test.key} exceeded TPS requirements, increase TPS requirements to match new baseline" if not test.waived: errors.append(text) else: warnings.append(text) - elif single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT_WARN: + elif ( + NOISE_UPPER_LIMIT_WARN is not None + and single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT_WARN + ): text = f"potential (but within normal noise) perf improvement detected {single_node_result.tps} > {test.expected_tps * NOISE_UPPER_LIMIT_WARN} = {test.expected_tps} * {NOISE_UPPER_LIMIT_WARN}, {test.key} exceeded TPS requirements, increase TPS requirements to match new baseline" warnings.append(text) +if HIDE_OUTPUT: + print_table(results, by_levels=False, single_field=None) + if warnings: print("Warnings: ") print("\n".join(warnings))