Skip to content

Commit

Permalink
previewnet flow in the benchmark (aptos-labs#10305)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos authored and Zekun Wang committed Oct 13, 2023
1 parent 47c6448 commit d07d417
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 54 deletions.
2 changes: 1 addition & 1 deletion execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ fn add_accounts_impl<V>(
let now_version = db.reader.get_latest_version().unwrap();
let delta_v = now_version - start_version;
info!(
"Overall TPS: account creation: {} txn/s",
"Overall TPS: create_db: account creation: {} txn/s",
delta_v as f32 / elapsed,
);

Expand Down
206 changes: 153 additions & 53 deletions testsuite/single_node_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class Flow(Flag):
# Tests that are run manually when using a smaller representative mode.
# (i.e. for measuring speed of the machine)
REPRESENTATIVE = auto()
# Tests used for previewnet evaluation
PREVIEWNET = auto()
# Tests used for previewnet evaluation
PREVIEWNET_LARGE_DB = auto()


@dataclass
Expand All @@ -34,6 +38,8 @@ class RunGroupKey:
transaction_weights_override: Optional[str] = field(default=None)
sharding_traffic_flags: Optional[str] = field(default=None)

smaller_working_set: bool = field(default=False)


@dataclass
class RunGroupConfig:
Expand All @@ -43,6 +49,25 @@ class RunGroupConfig:
waived: bool = field(default=False)


SELECTED_FLOW = Flow[os.environ.get("FLOW", default="LAND_BLOCKING")]
IS_PREVIEWNET = SELECTED_FLOW in [Flow.PREVIEWNET, Flow.PREVIEWNET_LARGE_DB]

DEFAULT_NUM_INIT_ACCOUNTS = (
"100000000" if SELECTED_FLOW == Flow.PREVIEWNET_LARGE_DB else "2000000"
)
DEFAULT_MAX_BLOCK_SIZE = "25000" if IS_PREVIEWNET else "10000"

MAX_BLOCK_SIZE = int(os.environ.get("MAX_BLOCK_SIZE", default=DEFAULT_MAX_BLOCK_SIZE))
NUM_BLOCKS = int(os.environ.get("NUM_BLOCKS_PER_TEST", default=15))
NUM_BLOCKS_DETAILED = 10
NUM_ACCOUNTS = max(
[
int(os.environ.get("NUM_INIT_ACCOUNTS", default=DEFAULT_NUM_INIT_ACCOUNTS)),
(2 + 2 * NUM_BLOCKS) * MAX_BLOCK_SIZE,
]
)
MAIN_SIGNER_ACCOUNTS = 2 * MAX_BLOCK_SIZE

# numbers are based on the machine spec used by github action
# Calibrate from https://gist.github.com/igor-aptos/7b12ca28de03894cddda8e415f37889e
# Local machine numbers will be higher.
Expand Down Expand Up @@ -85,30 +110,33 @@ class RunGroupConfig:

RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded", sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.REPRESENTATIVE),
RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded", sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.REPRESENTATIVE),

# setting separately for previewnet, as we run on a different number of cores.
RunGroupConfig(expected_tps=29000 if NUM_ACCOUNTS < 5000000 else 20000, key=RunGroupKey("coin-transfer", smaller_working_set=True), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB),
RunGroupConfig(expected_tps=23000 if NUM_ACCOUNTS < 5000000 else 15000, key=RunGroupKey("account-generation"), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB),
RunGroupConfig(expected_tps=130 if NUM_ACCOUNTS < 5000000 else 60, key=RunGroupKey("publish-package"), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB),
RunGroupConfig(expected_tps=1500 if NUM_ACCOUNTS < 5000000 else 1500, key=RunGroupKey("token-v2-ambassador-mint"), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB),
RunGroupConfig(expected_tps=12000 if NUM_ACCOUNTS < 5000000 else 7000, key=RunGroupKey("token-v2-ambassador-mint", module_working_set_size=100), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB),
RunGroupConfig(expected_tps=35000 if NUM_ACCOUNTS < 5000000 else 28000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded", sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB, waived=True),
RunGroupConfig(expected_tps=27000 if NUM_ACCOUNTS < 5000000 else 23000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded", sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.PREVIEWNET | Flow.PREVIEWNET_LARGE_DB, waived=True),
]
# fmt: on

NOISE_LOWER_LIMIT = 0.8
NOISE_LOWER_LIMIT_WARN = 0.9
NOISE_LOWER_LIMIT = 0.98 if IS_PREVIEWNET else 0.8
NOISE_LOWER_LIMIT_WARN = None if IS_PREVIEWNET else 0.9
# If you want to calibrate the upper limit for perf improvement, you can
# increase this value temporarily (i.e. to 1.3) and readjust back after a day or two of runs
NOISE_UPPER_LIMIT = 1.15
NOISE_UPPER_LIMIT_WARN = 1.05
NOISE_UPPER_LIMIT = 5 if IS_PREVIEWNET else 1.15
NOISE_UPPER_LIMIT_WARN = None if IS_PREVIEWNET else 1.05

# bump after a perf improvement, so you can easily distinguish runs
# that are on top of this commit
CODE_PERF_VERSION = "v4"

NUMBER_OF_EXECUTION_THREADS = 8
MAX_BLOCK_SIZE = int(os.environ.get("MAX_BLOCK_SIZE", default="10000"))
NUM_BLOCKS = 15
NUM_BLOCKS_DETAILED = 10
NUM_ACCOUNTS = max([2000000, 4 * NUM_BLOCKS * MAX_BLOCK_SIZE])
ADDITIONAL_DST_POOL_ACCOUNTS = 2 * NUM_BLOCKS * MAX_BLOCK_SIZE
MAIN_SIGNER_ACCOUNTS = 2 * MAX_BLOCK_SIZE

# default to using production number of execution threads for assertions
NUMBER_OF_EXECUTION_THREADS = os.environ.get("NUMBER_OF_EXECUTION_THREADS", default=8)
NUMBER_OF_EXECUTION_THREADS = int(
os.environ.get("NUMBER_OF_EXECUTION_THREADS", default=8)
)

if os.environ.get("DETAILED"):
EXECUTION_ONLY_NUMBER_OF_THREADS = [1, 2, 4, 8, 16, 32, 60]
Expand All @@ -120,7 +148,6 @@ class RunGroupConfig:
else:
BUILD_FLAG = "--profile performance"

SELECTED_FLOW = Flow[os.environ.get("FLOW", default="LAND_BLOCKING")]

if os.environ.get("PROD_DB_FLAGS"):
DB_CONFIG_FLAGS = ""
Expand All @@ -129,6 +156,13 @@ class RunGroupConfig:
"--split-ledger-db --use-sharded-state-merkle-db --skip-index-and-usage"
)

if os.environ.get("ENABLE_PRUNER"):
DB_PRUNER_FLAGS = "--enable-state-pruner --enable-ledger-pruner --enable-epoch-snapshot-pruner --ledger-pruning-batch-size 10000 --state-prune-window 3000000 --epoch-snapshot-prune-window 3000000 --ledger-prune-window 3000000"
else:
DB_PRUNER_FLAGS = ""

HIDE_OUTPUT = os.environ.get("HIDE_OUTPUT")

# Run the single node with performance optimizations enabled
target_directory = "execution/executor-benchmark/src"

Expand All @@ -148,17 +182,22 @@ def execute_command(command):
# stream to output while command is executing
if p.stdout is not None:
for line in p.stdout:
print(line, end="")
if not HIDE_OUTPUT:
print(line, end="")
result.append(line)

if p.returncode != 0:
raise CalledProcessError(p.returncode, p.args)

# return the full output in the end for postprocessing
full_result = "\n".join(result)

if p.returncode != 0:
if HIDE_OUTPUT:
print(full_result)
raise CalledProcessError(p.returncode, p.args)

if " ERROR " in full_result:
print("ERROR log line in execution")
if HIDE_OUTPUT:
print(full_result)
exit(1)

return full_result
Expand Down Expand Up @@ -188,28 +227,47 @@ def get_only(values):
return values[0]


def extract_run_results(output: str, execution_only: bool) -> RunResults:
def extract_run_results(
output: str, execution_only: bool, create_db: bool = False
) -> RunResults:
if execution_only:
tps = float(re.findall(r"Overall execution TPS: (\d+\.?\d*) txn/s", output)[-1])
gps = float(re.findall(r"Overall execution GPS: (\d+\.?\d*) gas/s", output)[-1])
gpt = float(
re.findall(r"Overall execution GPT: (\d+\.?\d*) gas/txn", output)[-1]
)

elif create_db:
tps = float(
get_only(
re.findall(
r"Overall TPS: create_db: account creation: (\d+\.?\d*) txn/s",
output,
)
)
)
gps = 0
gpt = 0
else:
tps = float(get_only(re.findall(r"Overall TPS: (\d+\.?\d*) txn/s", output)))
gps = float(get_only(re.findall(r"Overall GPS: (\d+\.?\d*) gas/s", output)))
gpt = float(get_only(re.findall(r"Overall GPT: (\d+\.?\d*) gas/txn", output)))

fraction_in_execution = float(
re.findall(r"Overall fraction of total: (\d+\.?\d*) in execution", output)[-1]
)
fraction_of_execution_in_vm = float(
re.findall(r"Overall fraction of execution (\d+\.?\d*) in VM", output)[-1]
)
fraction_in_commit = float(
re.findall(r"Overall fraction of total: (\d+\.?\d*) in commit", output)[-1]
)
if create_db:
fraction_in_execution = 0
fraction_of_execution_in_vm = 0
fraction_in_commit = 0
else:
fraction_in_execution = float(
re.findall(r"Overall fraction of total: (\d+\.?\d*) in execution", output)[
-1
]
)
fraction_of_execution_in_vm = float(
re.findall(r"Overall fraction of execution (\d+\.?\d*) in VM", output)[-1]
)
fraction_in_commit = float(
re.findall(r"Overall fraction of total: (\d+\.?\d*) in commit", output)[-1]
)

return RunResults(
tps,
Expand Down Expand Up @@ -282,11 +340,24 @@ def print_table(
warnings = []

with tempfile.TemporaryDirectory() as tmpdirname:
create_db_command = f"cargo run {BUILD_FLAG} -- --block-size {MAX_BLOCK_SIZE} --execution-threads {NUMBER_OF_EXECUTION_THREADS} {DB_CONFIG_FLAGS} create-db --data-dir {tmpdirname}/db --num-accounts {NUM_ACCOUNTS}"
print(f"Warmup - creating DB with {NUM_ACCOUNTS} accounts")
create_db_command = f"cargo run {BUILD_FLAG} -- --block-size {MAX_BLOCK_SIZE} --execution-threads {NUMBER_OF_EXECUTION_THREADS} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} create-db --data-dir {tmpdirname}/db --num-accounts {NUM_ACCOUNTS}"
output = execute_command(create_db_command)

results = []

results.append(
RunGroupInstance(
key=RunGroupKey("warmup"),
single_node_result=extract_run_results(
output, execution_only=False, create_db=True
),
number_of_threads_results={},
block_size=MAX_BLOCK_SIZE,
expected_tps=0,
)
)

for (
test_index,
test,
Expand All @@ -313,10 +384,16 @@ def print_table(
elif test.key.executor_type == "native":
executor_type_str = "--use-native-executor --transactions-per-sender 1"
elif test.key.executor_type == "sharded":
executor_type_str = f"--async-partitioning --num-executor-shards {NUMBER_OF_EXECUTION_THREADS} {sharding_traffic_flags}"
executor_type_str = f"--num-executor-shards {NUMBER_OF_EXECUTION_THREADS} {sharding_traffic_flags}"
else:
raise Exception(f"executor type not supported {test.key.executor_type}")
common_command_suffix = f"{executor_type_str} --generate-then-execute --block-size {cur_block_size} {DB_CONFIG_FLAGS} run-executor {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp"
txn_emitter_prefix_str = "" if NUM_BLOCKS > 200 else " --generate-then-execute"

ADDITIONAL_DST_POOL_ACCOUNTS = (
2 * MAX_BLOCK_SIZE * (1 if test.key.smaller_working_set else NUM_BLOCKS)
)

common_command_suffix = f"{executor_type_str} {txn_emitter_prefix_str} --block-size {cur_block_size} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} run-executor {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp"

number_of_threads_results = {}

Expand Down Expand Up @@ -363,43 +440,66 @@ def print_table(
)
)

print_table(
results, by_levels=True, single_field=("t/s", lambda r: int(round(r.tps)))
)
print_table(
results, by_levels=True, single_field=("g/s", lambda r: int(round(r.gps)))
)
print_table(
results,
by_levels=True,
single_field=("exe/total", lambda r: round(r.fraction_in_execution, 3)),
)
print_table(
results,
by_levels=True,
single_field=("vm/exe", lambda r: round(r.fraction_of_execution_in_vm, 3)),
)
print_table(results, by_levels=False, single_field=None)
if not HIDE_OUTPUT:
print_table(
results,
by_levels=True,
single_field=("t/s", lambda r: int(round(r.tps))),
)
print_table(
results,
by_levels=True,
single_field=("g/s", lambda r: int(round(r.gps))),
)
print_table(
results,
by_levels=True,
single_field=("exe/total", lambda r: round(r.fraction_in_execution, 3)),
)
print_table(
results,
by_levels=True,
single_field=(
"vm/exe",
lambda r: round(r.fraction_of_execution_in_vm, 3),
),
)
print_table(results, by_levels=False, single_field=None)

if single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT:
if (
NOISE_LOWER_LIMIT is not None
and single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT
):
text = f"regression detected {single_node_result.tps} < {test.expected_tps * NOISE_LOWER_LIMIT} = {test.expected_tps} * {NOISE_LOWER_LIMIT}, {test.key} didn't meet TPS requirements"
if not test.waived:
errors.append(text)
else:
warnings.append(text)
elif single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT_WARN:
elif (
NOISE_LOWER_LIMIT_WARN is not None
and single_node_result.tps < test.expected_tps * NOISE_LOWER_LIMIT_WARN
):
text = f"potential (but within normal noise) regression detected {single_node_result.tps} < {test.expected_tps * NOISE_LOWER_LIMIT_WARN} = {test.expected_tps} * {NOISE_LOWER_LIMIT_WARN}, {test.key} didn't meet TPS requirements"
warnings.append(text)
elif single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT:
elif (
NOISE_UPPER_LIMIT is not None
and single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT
):
text = f"perf improvement detected {single_node_result.tps} > {test.expected_tps * NOISE_UPPER_LIMIT} = {test.expected_tps} * {NOISE_UPPER_LIMIT}, {test.key} exceeded TPS requirements, increase TPS requirements to match new baseline"
if not test.waived:
errors.append(text)
else:
warnings.append(text)
elif single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT_WARN:
elif (
NOISE_UPPER_LIMIT_WARN is not None
and single_node_result.tps > test.expected_tps * NOISE_UPPER_LIMIT_WARN
):
text = f"potential (but within normal noise) perf improvement detected {single_node_result.tps} > {test.expected_tps * NOISE_UPPER_LIMIT_WARN} = {test.expected_tps} * {NOISE_UPPER_LIMIT_WARN}, {test.key} exceeded TPS requirements, increase TPS requirements to match new baseline"
warnings.append(text)

if HIDE_OUTPUT:
print_table(results, by_levels=False, single_field=None)

if warnings:
print("Warnings: ")
print("\n".join(warnings))
Expand Down

0 comments on commit d07d417

Please sign in to comment.