Skip to content

Commit

Permalink
fix badges
Browse files Browse the repository at this point in the history
  • Loading branch information
asonnino committed Feb 8, 2024
1 parent d0e2fd8 commit dc01ac8
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 134 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# HotStuff

[![build status](https://img.shields.io/github/workflow/status/asonnino/hotstuff/Rust/main?style=flat-square&logo=github)](https://github.com/asonnino/hotstuff/actions)
![build status](https://img.shields.io/github/actions/workflow/status/asonnino/hotstuff/rust.yml?style=flat-square&logo=GitHub&logoColor=white&link=https%3A%2F%2Fgithub.com%2Fasonnino%2Fhotstuff%2Factions)
[![rustc](https://img.shields.io/badge/rustc-1.64+-blue?style=flat-square&logo=rust)](https://www.rust-lang.org)
[![python](https://img.shields.io/badge/python-3.9-blue?style=flat-square&logo=python&logoColor=white)](https://www.python.org/downloads/release/python-390/)
[![license](https://img.shields.io/badge/license-Apache-blue.svg?style=flat-square)](LICENSE)

This repo provides a minimal implementation of the [2-chain variant of the HotStuff consensus protocol](https://arxiv.org/abs/2106.10362) used at the core of [Diem](https://www.diem.com/en-us/). The codebase has been designed to be small, efficient, and easy to benchmark and modify. It has not been designed to run in production but uses real cryptography ([dalek](https://doc.dalek.rs/ed25519_dalek)), networking ([tokio](https://docs.rs/tokio)), and storage ([rocksdb](https://docs.rs/rocksdb)).
Expand Down
158 changes: 78 additions & 80 deletions benchmark/benchmark/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
from os.path import join
import subprocess

from benchmark.config import Committee, Key, NodeParameters, BenchParameters, ConfigError
from benchmark.config import (
Committee,
Key,
NodeParameters,
BenchParameters,
ConfigError,
)
from benchmark.utils import BenchError, Print, PathMaker, progress_bar
from benchmark.commands import CommandMaker
from benchmark.logs import LogParser, ParseError
from benchmark.instance import InstanceManager


class FabricError(Exception):
''' Wrapper for Fabric exception with a meaningfull error message. '''
"""Wrapper for Fabric exception with a meaningfull error message."""

def __init__(self, error):
assert isinstance(error, GroupException)
Expand All @@ -38,7 +44,7 @@ def __init__(self, ctx):
)
self.connect = ctx.connect_kwargs
except (IOError, PasswordRequiredException, SSHException) as e:
raise BenchError('Failed to load SSH key', e)
raise BenchError("Failed to load SSH key", e)

def _check_stderr(self, output):
if isinstance(output, dict):
Expand All @@ -50,47 +56,43 @@ def _check_stderr(self, output):
raise ExecutionError(output.stderr)

def install(self):
Print.info('Installing rust and cloning the repo...')
Print.info("Installing rust and cloning the repo...")
cmd = [
'sudo apt-get update',
'sudo apt-get -y upgrade',
'sudo apt-get -y autoremove',

"sudo apt-get update",
"sudo apt-get -y upgrade",
"sudo apt-get -y autoremove",
# The following dependencies prevent the error: [error: linker `cc` not found].
'sudo apt-get -y install build-essential',
'sudo apt-get -y install cmake',

"sudo apt-get -y install build-essential",
"sudo apt-get -y install cmake",
# Install rust (non-interactive).
'curl --proto "=https" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y',
'source $HOME/.cargo/env',
'rustup default stable',

"source $HOME/.cargo/env",
"rustup default stable",
# This is missing from the Rocksdb installer (needed for Rocksdb).
'sudo apt-get install -y clang',

"sudo apt-get install -y clang",
# Clone the repo.
f'(git clone {self.settings.repo_url} || (cd {self.settings.repo_name} ; git pull))'
f"(git clone {self.settings.repo_url} || (cd {self.settings.repo_name} ; git pull))",
]
hosts = self.manager.hosts(flat=True)
try:
g = Group(*hosts, user='ubuntu', connect_kwargs=self.connect)
g.run(' && '.join(cmd), hide=True)
Print.heading(f'Initialized testbed of {len(hosts)} nodes')
g = Group(*hosts, user="ubuntu", connect_kwargs=self.connect)
g.run(" && ".join(cmd), hide=True)
Print.heading(f"Initialized testbed of {len(hosts)} nodes")
except (GroupException, ExecutionError) as e:
e = FabricError(e) if isinstance(e, GroupException) else e
raise BenchError('Failed to install repo on testbed', e)
raise BenchError("Failed to install repo on testbed", e)

def kill(self, hosts=[], delete_logs=False):
assert isinstance(hosts, list)
assert isinstance(delete_logs, bool)
hosts = hosts if hosts else self.manager.hosts(flat=True)
delete_logs = CommandMaker.clean_logs() if delete_logs else 'true'
cmd = [delete_logs, f'({CommandMaker.kill()} || true)']
delete_logs = CommandMaker.clean_logs() if delete_logs else "true"
cmd = [delete_logs, f"({CommandMaker.kill()} || true)"]
try:
g = Group(*hosts, user='ubuntu', connect_kwargs=self.connect)
g.run(' && '.join(cmd), hide=True)
g = Group(*hosts, user="ubuntu", connect_kwargs=self.connect)
g.run(" && ".join(cmd), hide=True)
except GroupException as e:
raise BenchError('Failed to kill nodes', FabricError(e))
raise BenchError("Failed to kill nodes", FabricError(e))

def _select_hosts(self, bench_parameters):
nodes = max(bench_parameters.nodes)
Expand All @@ -108,29 +110,25 @@ def _select_hosts(self, bench_parameters):
def _background_run(self, host, command, log_file):
name = splitext(basename(log_file))[0]
cmd = f'tmux new -d -s "{name}" "{command} |& tee {log_file}"'
c = Connection(host, user='ubuntu', connect_kwargs=self.connect)
c = Connection(host, user="ubuntu", connect_kwargs=self.connect)
output = c.run(cmd, hide=True)
self._check_stderr(output)

def _update(self, hosts):
Print.info(
f'Updating {len(hosts)} nodes (branch "{self.settings.branch}")...'
)
Print.info(f'Updating {len(hosts)} nodes (branch "{self.settings.branch}")...')
cmd = [
f'(cd {self.settings.repo_name} && git fetch -f)',
f'(cd {self.settings.repo_name} && git checkout -f {self.settings.branch})',
f'(cd {self.settings.repo_name} && git pull -f)',
'source $HOME/.cargo/env',
f'(cd {self.settings.repo_name}/node && {CommandMaker.compile()})',
CommandMaker.alias_binaries(
f'./{self.settings.repo_name}/target/release/'
)
f"(cd {self.settings.repo_name} && git fetch -f)",
f"(cd {self.settings.repo_name} && git checkout -f {self.settings.branch})",
f"(cd {self.settings.repo_name} && git pull -f)",
"source $HOME/.cargo/env",
f"(cd {self.settings.repo_name}/node && {CommandMaker.compile()})",
CommandMaker.alias_binaries(f"./{self.settings.repo_name}/target/release/"),
]
g = Group(*hosts, user='ubuntu', connect_kwargs=self.connect)
g.run(' && '.join(cmd), hide=True)
g = Group(*hosts, user="ubuntu", connect_kwargs=self.connect)
g.run(" && ".join(cmd), hide=True)

def _config(self, hosts, node_parameters):
Print.info('Generating configuration files...')
Print.info("Generating configuration files...")

# Cleanup all local configuration files.
cmd = CommandMaker.cleanup()
Expand All @@ -153,31 +151,31 @@ def _config(self, hosts, node_parameters):
keys += [Key.from_file(filename)]

names = [x.name for x in keys]
consensus_addr = [f'{x}:{self.settings.consensus_port}' for x in hosts]
front_addr = [f'{x}:{self.settings.front_port}' for x in hosts]
mempool_addr = [f'{x}:{self.settings.mempool_port}' for x in hosts]
consensus_addr = [f"{x}:{self.settings.consensus_port}" for x in hosts]
front_addr = [f"{x}:{self.settings.front_port}" for x in hosts]
mempool_addr = [f"{x}:{self.settings.mempool_port}" for x in hosts]
committee = Committee(names, consensus_addr, front_addr, mempool_addr)
committee.print(PathMaker.committee_file())

node_parameters.print(PathMaker.parameters_file())

# Cleanup all nodes.
cmd = f'{CommandMaker.cleanup()} || true'
g = Group(*hosts, user='ubuntu', connect_kwargs=self.connect)
cmd = f"{CommandMaker.cleanup()} || true"
g = Group(*hosts, user="ubuntu", connect_kwargs=self.connect)
g.run(cmd, hide=True)

# Upload configuration files.
progress = progress_bar(hosts, prefix='Uploading config files:')
progress = progress_bar(hosts, prefix="Uploading config files:")
for i, host in enumerate(progress):
c = Connection(host, user='ubuntu', connect_kwargs=self.connect)
c.put(PathMaker.committee_file(), '.')
c.put(PathMaker.key_file(i), '.')
c.put(PathMaker.parameters_file(), '.')
c = Connection(host, user="ubuntu", connect_kwargs=self.connect)
c.put(PathMaker.committee_file(), ".")
c.put(PathMaker.key_file(i), ".")
c.put(PathMaker.parameters_file(), ".")

return committee

def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=False):
Print.info('Booting testbed...')
Print.info("Booting testbed...")

# Kill any potentially unfinished run and delete logs.
self.kill(hosts=hosts, delete_logs=True)
Expand All @@ -186,17 +184,13 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
# Filter all faulty nodes from the client addresses (or they will wait
# for the faulty nodes to be online).
committee = Committee.load(PathMaker.committee_file())
addresses = [f'{x}:{self.settings.front_port}' for x in hosts]
addresses = [f"{x}:{self.settings.front_port}" for x in hosts]
rate_share = ceil(rate / committee.size()) # Take faults into account.
timeout = node_parameters.timeout_delay
client_logs = [PathMaker.client_log_file(i) for i in range(len(hosts))]
for host, addr, log_file in zip(hosts, addresses, client_logs):
cmd = CommandMaker.run_client(
addr,
bench_parameters.tx_size,
rate_share,
timeout,
nodes=addresses
addr, bench_parameters.tx_size, rate_share, timeout, nodes=addresses
)
self._background_run(host, cmd, log_file)

Expand All @@ -210,17 +204,17 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
PathMaker.committee_file(),
db,
PathMaker.parameters_file(),
debug=debug
debug=debug,
)
self._background_run(host, cmd, log_file)

# Wait for the nodes to synchronize
Print.info('Waiting for the nodes to synchronize...')
Print.info("Waiting for the nodes to synchronize...")
sleep(2 * node_parameters.timeout_delay / 1000)

# Wait for all transactions to be processed.
duration = bench_parameters.duration
for _ in progress_bar(range(20), prefix=f'Running benchmark ({duration} sec):'):
for _ in progress_bar(range(20), prefix=f"Running benchmark ({duration} sec):"):
sleep(ceil(duration / 20))
self.kill(hosts=hosts, delete_logs=False)

Expand All @@ -230,71 +224,75 @@ def _logs(self, hosts, faults):
subprocess.run([cmd], shell=True, stderr=subprocess.DEVNULL)

# Download log files.
progress = progress_bar(hosts, prefix='Downloading logs:')
progress = progress_bar(hosts, prefix="Downloading logs:")
for i, host in enumerate(progress):
c = Connection(host, user='ubuntu', connect_kwargs=self.connect)
c = Connection(host, user="ubuntu", connect_kwargs=self.connect)
c.get(PathMaker.node_log_file(i), local=PathMaker.node_log_file(i))
c.get(
PathMaker.client_log_file(i), local=PathMaker.client_log_file(i)
)
c.get(PathMaker.client_log_file(i), local=PathMaker.client_log_file(i))

# Parse logs and return the parser.
Print.info('Parsing logs and computing performance...')
Print.info("Parsing logs and computing performance...")
return LogParser.process(PathMaker.logs_path(), faults=faults)

def run(self, bench_parameters_dict, node_parameters_dict, debug=False):
assert isinstance(debug, bool)
Print.heading('Starting remote benchmark')
Print.heading("Starting remote benchmark")
try:
bench_parameters = BenchParameters(bench_parameters_dict)
node_parameters = NodeParameters(node_parameters_dict)
except ConfigError as e:
raise BenchError('Invalid nodes or bench parameters', e)
raise BenchError("Invalid nodes or bench parameters", e)

# Select which hosts to use.
selected_hosts = self._select_hosts(bench_parameters)
if not selected_hosts:
Print.warn('There are not enough instances available')
Print.warn("There are not enough instances available")
return

# Update nodes.
try:
self._update(selected_hosts)
except (GroupException, ExecutionError) as e:
e = FabricError(e) if isinstance(e, GroupException) else e
raise BenchError('Failed to update nodes', e)
raise BenchError("Failed to update nodes", e)

# Run benchmarks.
for n in bench_parameters.nodes:
for r in bench_parameters.rate:
Print.heading(f'\nRunning {n} nodes (input rate: {r:,} tx/s)')
Print.heading(f"\nRunning {n} nodes (input rate: {r:,} tx/s)")
hosts = selected_hosts[:n]

# Upload all configuration files.
try:
self._config(hosts, node_parameters)
except (subprocess.SubprocessError, GroupException) as e:
e = FabricError(e) if isinstance(e, GroupException) else e
Print.error(BenchError('Failed to configure nodes', e))
Print.error(BenchError("Failed to configure nodes", e))
continue

# Do not boot faulty nodes.
faults = bench_parameters.faults
hosts = hosts[:n-faults]
hosts = hosts[: n - faults]

# Run the benchmark.
for i in range(bench_parameters.runs):
Print.heading(f'Run {i+1}/{bench_parameters.runs}')
Print.heading(f"Run {i+1}/{bench_parameters.runs}")
try:
self._run_single(
hosts, r, bench_parameters, node_parameters, debug
)
self._logs(hosts, faults).print(PathMaker.result_file(
faults, n, r, bench_parameters.tx_size
))
except (subprocess.SubprocessError, GroupException, ParseError) as e:
self._logs(hosts, faults).print(
PathMaker.result_file(
faults, n, r, bench_parameters.tx_size
)
)
except (
subprocess.SubprocessError,
GroupException,
ParseError,
) as e:
self.kill(hosts=hosts)
if isinstance(e, GroupException):
e = FabricError(e)
Print.error(BenchError('Benchmark failed', e))
Print.error(BenchError("Benchmark failed", e))
continue
Loading

0 comments on commit dc01ac8

Please sign in to comment.