diff --git a/.changelog/3183.internal.md b/.changelog/3183.internal.md new file mode 100644 index 00000000000..143b6220b6d --- /dev/null +++ b/.changelog/3183.internal.md @@ -0,0 +1 @@ +oasis-node/cmd: Add consensus benchmark subcommand diff --git a/go/oasis-node/cmd/debug/conbench/conbench.go b/go/oasis-node/cmd/debug/conbench/conbench.go new file mode 100644 index 00000000000..d859df6cae9 --- /dev/null +++ b/go/oasis-node/cmd/debug/conbench/conbench.go @@ -0,0 +1,460 @@ +package conbench + +import ( + "context" + "crypto" + "fmt" + "math/rand" + "sort" + "sync/atomic" + "time" + + "github.com/spf13/cobra" + flag "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/drbg" + "github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand" + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" + memorySigner "github.com/oasisprotocol/oasis-core/go/common/crypto/signature/signers/memory" + "github.com/oasisprotocol/oasis-core/go/common/entity" + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/quantity" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" + "github.com/oasisprotocol/oasis-core/go/control/api" + "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common" + cmdFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" + cmdGrpc "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/grpc" + cmdSigner "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/signer" + staking "github.com/oasisprotocol/oasis-core/go/staking/api" +) + +const ( + // Number of test accounts to create. + // This also controls the number of parallel transfers. + CfgNumAccounts = "num_accounts" + + // Number of samples (transfers) per account. + CfgNumSamples = "num_samples" + + // Timeout for SubmitTx. + CfgSubmitTxTimeout = "submit_timeout" + + // Use test entity for funding? + CfgUseTestEntity = "use_test_entity" + + // Placeholder value for cachedNonce and cachedGas in localAccount struct + // when they haven't been initialized yet. + notYetCached = uint64(18446744073709551615) +) + +var ( + logger = logging.GetLogger("cmd/conbench") + conbenchCmd = &cobra.Command{ + Use: "conbench", + Short: "benchmark consensus layer", + Long: "Runs a consensus layer benchmark.", + RunE: doRun, + } +) + +type localAccount struct { + signer signature.Signer + addr staking.Address + cachedNonce uint64 + cachedGas uint64 +} + +func transfer(ctx context.Context, cc consensus.ClientBackend, from *localAccount, toAddr staking.Address, amount uint64, noCache bool) error { + var err error + + // Get sender's nonce if not yet cached (or if we're ignoring cache). + nonce := from.cachedNonce + if nonce == notYetCached || noCache { + nonce, err = cc.GetSignerNonce(ctx, &consensus.GetSignerNonceRequest{ + AccountAddress: from.addr, + Height: consensus.HeightLatest, + }) + if err != nil { + return fmt.Errorf("unable to get sender's nonce: %w", err) + } + atomic.StoreUint64(&from.cachedNonce, nonce) + } + + // Construct transfer transaction. + transfer := staking.Transfer{ + To: toAddr, + } + if err = transfer.Amount.FromUint64(amount); err != nil { + return fmt.Errorf("unable to convert given amount from uint64: %w", err) + } + + var fee transaction.Fee + tx := staking.NewTransferTx(nonce, &fee, &transfer) + + // Estimate gas if not yet cached (or if we're ignoring cache). + gas := from.cachedGas + if gas == notYetCached || noCache { + estGas, grr := cc.EstimateGas(ctx, &consensus.EstimateGasRequest{ + Signer: from.signer.Public(), + Transaction: tx, + }) + if grr != nil { + return fmt.Errorf("unable to estimate gas: %w", grr) + } + gas = uint64(estGas) + atomic.StoreUint64(&from.cachedGas, gas) + } + + tx.Fee.Gas = transaction.Gas(gas) + if err = tx.Fee.Amount.FromUint64(gas); err != nil { + return fmt.Errorf("unable to convert fee amount from uint64: %w", err) + } + + signedTx, err := transaction.Sign(from.signer, tx) + if err != nil { + return fmt.Errorf("unable to sign transfer transaction: %w", err) + } + + // Increment cached nonce. + atomic.AddUint64(&from.cachedNonce, 1) + + // Submit with timeout to avoid blocking forever if the client node + // is skipping CheckTx checks. The timeout should be set large enough + // for the network to handle the submission. + timeout := viper.GetDuration(CfgSubmitTxTimeout) + submissionCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + if err = cc.SubmitTx(submissionCtx, signedTx); err != nil { + return err + } + return nil +} + +func refund(ctx context.Context, cc consensus.ClientBackend, sc staking.Backend, from *localAccount, toAddr staking.Address) error { + // Fetch account info. + acct, err := sc.Account(ctx, &staking.OwnerQuery{ + Height: consensus.HeightLatest, + Owner: from.addr, + }) + if err != nil { + return fmt.Errorf("unable to fetch account balance: %w", err) + } + + // Since we're dealing with tiny amounts, we can afford this hack. + amount := acct.General.Balance.ToBigInt().Uint64() + + if amount == 0 { + // Nothing to refund. + return nil + } + + // We don't want refunds to fail, so disable caching. + if err = transfer(ctx, cc, from, toAddr, amount, true); err != nil { + return fmt.Errorf("unable to refund from account: %w", err) + } + + return nil +} + +func refundMultiple(ctx context.Context, cc consensus.ClientBackend, sc staking.Backend, account []localAccount, toAddr staking.Address) { + // Do the refunds in parallel. + doneCh := make(chan bool, len(account)) + for a := range account { + go func(a int) { + if err := refund(ctx, cc, sc, &account[a], toAddr); err != nil { + // Tough luck. + logger.Error("unable to refund from account", + "account_address", account[a].addr, + "err", err, + ) + } + doneCh <- true + }(a) + } + + // Wait for all goroutines to finish. + for range account { + <-doneCh + } +} + +func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo + cmd.SilenceUsage = true + + if err := common.Init(); err != nil { + common.EarlyLogAndExit(err) + } + + numAccounts := viper.GetUint64(CfgNumAccounts) + numSamples := viper.GetUint64(CfgNumSamples) + + if numAccounts < 1 { + return fmt.Errorf("number of accounts must be >= 1") + } + if numSamples < 3 { + return fmt.Errorf("number of samples must be >= 3") + } + + ctx := context.Background() + + // Connect to node. + logger.Debug("dialing node", "addr", viper.GetString(cmdGrpc.CfgAddress)) + conn, err := cmdGrpc.NewClient(cmd) + if err != nil { + return fmt.Errorf("unable to connect to node: %w", err) + } + defer conn.Close() + + cc := consensus.NewConsensusClient(conn) + sc := staking.NewStakingClient(conn) + ncc := api.NewNodeControllerClient(conn) + + // Set chain context from genesis document obtained from the node. + genDoc, err := cc.GetGenesisDocument(ctx) + if err != nil { + return fmt.Errorf("unable to obtain genesis document from node: %w", err) + } + genDoc.SetChainContext() + + // Create new DRBG. + src, err := drbg.New(crypto.SHA512, []byte("consensus benchmark random seeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeed"), nil, []byte("consensus benchmark")) + if err != nil { + return fmt.Errorf("unable to create deterministic random generator: %w", err) + } + rng := rand.New(mathrand.New(src)) + + // Wait for the node to sync. + logger.Debug("waiting for node sync") + if err = ncc.WaitSync(context.Background()); err != nil { + return fmt.Errorf("unable to wait for node sync: %w", err) + } + logger.Debug("node synced") + + // Create multiple accounts. + account := make([]localAccount, numAccounts) + msf := memorySigner.NewFactory() + for a := range account { + signer, grr := msf.Generate(signature.SignerEntity, rng) + if grr != nil { + return fmt.Errorf("unable to generate account %d: %w", a, grr) + } + account[a].signer = signer + account[a].addr = staking.NewAddress(signer.Public()) + account[a].cachedNonce = notYetCached + account[a].cachedGas = notYetCached + } + + var fundingSigner signature.Signer + + if viper.GetBool(CfgUseTestEntity) { + // Use test entity for funding. + _, fundingSigner, _ = entity.TestEntity() + } else { + // Use given signer for funding. + signerDir, grr := cmdSigner.CLIDirOrPwd() + if grr != nil { + return fmt.Errorf("failed to retrieve signer dir: %w", grr) + } + signerFactory, grr := cmdSigner.NewFactory(cmdSigner.Backend(), signerDir, signature.SignerEntity) + if grr != nil { + return fmt.Errorf("failed to create signer factory: %w", grr) + } + fundingSigner, grr = signerFactory.Load(signature.SignerEntity) + if grr != nil { + return fmt.Errorf("failed to load signer: %w", grr) + } + } + + fundingAddr := staking.NewAddress(fundingSigner.Public()) + fundingAcct := localAccount{ + signer: fundingSigner, + addr: fundingAddr, + cachedNonce: notYetCached, + cachedGas: notYetCached, + } + + // Check if funding account has enough funds. + // TODO: Fees? + logger.Debug("checking if funding account has enough funds") + fundingAcctInfo, err := sc.Account(ctx, &staking.OwnerQuery{ + Height: consensus.HeightLatest, + Owner: fundingAddr, + }) + if err != nil { + return fmt.Errorf("unable to fetch funding account balance: %w", err) + } + // TODO: Fees? + requiredFunds := quantity.NewFromUint64(numAccounts * numSamples) + availableFunds := fundingAcctInfo.General.Balance + if availableFunds.Cmp(requiredFunds) < 0 { + return fmt.Errorf("funding account has insufficient funds (%s required, %s available)", requiredFunds.String(), availableFunds.String()) + } + logger.Debug("funding account has enough funds", + "required", requiredFunds.String(), + "available", availableFunds.String(), + ) + + // Fund all accounts from the funding account. + logger.Info("funding test accounts", + "num_accounts", numAccounts, + ) + for a := range account { + // Each account gets numSamples tokens. + // TODO: Fees? + if errr := transfer(ctx, cc, &fundingAcct, account[a].addr, numSamples, true); errr != nil { + // An error has happened while funding, make sure to refund the + // funding account from the accounts funded until this point. + refundMultiple(ctx, cc, sc, account[0:a], fundingAddr) + return fmt.Errorf("unable to fund account %d: %w", a, errr) + } + } + + logger.Info("starting benchmark", "num_accounts", numAccounts) + startStatus, err := cc.GetStatus(ctx) + if err != nil { + refundMultiple(ctx, cc, sc, account, fundingAddr) + return fmt.Errorf("unable to get status: %w", err) + } + benchmarkStartHeight := startStatus.LatestHeight + benchmarkStartT := time.Now() + + // Submit time is the time required to submit the transaction and + // wait for it to be included in a block. + var ( + totalSubmitTimeNs uint64 + numSubmitSamples uint64 + numSubmitErrors uint64 + ) + + // Perform benchmark in parallel, one goroutine per account. + doneCh := make(chan bool, numAccounts*numSamples) + for a := range account { + go func(idx uint64) { + var noCache bool + for s := uint64(0); s < numSamples; s++ { + fromIdx := idx + toIdx := (idx + s + 1) % numAccounts + toAddr := account[toIdx].addr + + startT := time.Now() + if err = transfer(ctx, cc, &account[fromIdx], toAddr, 1, noCache); err != nil { + atomic.AddUint64(&numSubmitErrors, 1) + // Disable cache for the next sample, just in case + // we messed up the nonce or if the gas cost changed. + noCache = true + doneCh <- true + continue + } + atomic.AddUint64(&totalSubmitTimeNs, uint64(time.Since(startT).Nanoseconds())) + atomic.AddUint64(&numSubmitSamples, 1) + doneCh <- true + noCache = false + } + }(uint64(a)) + } + + // Wait for all goroutines to finish. + for i := uint64(0); i < numAccounts*numSamples; i++ { + <-doneCh + } + + benchmarkDuration := time.Since(benchmarkStartT) + stopStatus, err := cc.GetStatus(ctx) + if err != nil { + refundMultiple(ctx, cc, sc, account, fundingAddr) + return fmt.Errorf("unable to get status: %w", err) + } + benchmarkStopHeight := stopStatus.LatestHeight + + // Go through all transactions from benchmarkStartHeight to + // benchmarkStopHeight and calculate the average number of + // transactions per second and other stats. + // Note that we count all transactions, not just the ones made + // by this benchmark. + var totalTxs uint64 + var maxTxs uint64 + minTxs := uint64(18446744073709551615) + txsPerBlock := make([]uint64, 0) + for height := benchmarkStartHeight; height <= benchmarkStopHeight; height++ { + txs, grr := cc.GetTransactions(ctx, height) + if grr != nil { + logger.Error("GetTransactions failed", "err", grr, "height", height) + continue + } + lenTxs := uint64(len(txs)) + totalTxs += lenTxs + txsPerBlock = append(txsPerBlock, lenTxs) + if lenTxs > maxTxs { + maxTxs = lenTxs + } + if lenTxs < minTxs { + minTxs = lenTxs + } + } + + tps := float64(totalTxs) / benchmarkDuration.Seconds() + + // Calculate median number of transactions. + sort.Slice(txsPerBlock, func(i, j int) bool { return txsPerBlock[i] < txsPerBlock[j] }) + medianTxs := txsPerBlock[len(txsPerBlock)/2] + + avgSubmitTimeNs := float64(totalSubmitTimeNs / numSubmitSamples) + + logger.Info("benchmark finished", + // Number of accounts involved in benchmark (level of parallelism). + "num_accounts", numAccounts, + // Average time (in seconds) required to submit a transaction and wait + // for it to be included in a block. + "avg_submit_time_s", avgSubmitTimeNs/1.0e9, + // Transactions per second (this includes all transactions that + // appeared on the network during the time of the benchmark). + "transactions_per_second", tps, + // Number of successful SubmitTx calls (i.e. transfer transactions). + "submit_samples", numSubmitSamples, + // Number of unsuccessful SubmitTx calls. + "submit_errors", numSubmitErrors, + // Duration of the entire benchmark (in seconds). + "bench_duration_s", benchmarkDuration.Seconds(), + // Number of blocks seen on the network during the benchmark. + "num_blocks", len(txsPerBlock), + // Minimum number of transactions per block (during the benchmark). + "min_txs_per_block", minTxs, + // Maximum number of transactions per block (during the benchmark). + "max_txs_per_block", maxTxs, + // Average number of transactions per block (during the benchmark). + "avg_txs_per_block", float64(totalTxs)/float64(len(txsPerBlock)), + // Median number of transactions per block (during the benchmark). + "median_txs_per_block", medianTxs, + // Total number of transactions observed during the benchmark. + "total_txs", totalTxs, + ) + + // Refund money into original funding account. + logger.Info("refunding money") + refundMultiple(ctx, cc, sc, account, fundingAddr) + logger.Info("money refunded") + + return nil +} + +// Register registers the conbench sub-command. +func Register(parentCmd *cobra.Command) { + parentCmd.AddCommand(conbenchCmd) +} + +func init() { + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.Uint64(CfgNumAccounts, 10, "Number of accounts to create for benchmarking (also level of parallelism)") + fs.Uint64(CfgNumSamples, 30, "Number of samples (transfers) per account") + fs.Duration(CfgSubmitTxTimeout, 10*time.Second, "Timeout for SubmitTx (set this based on network parameters)") + fs.Bool(CfgUseTestEntity, false, "Use test entity for funding (only for testing)") + _ = viper.BindPFlags(fs) + conbenchCmd.Flags().AddFlagSet(fs) + + conbenchCmd.Flags().AddFlagSet(cmdGrpc.ClientFlags) + conbenchCmd.Flags().AddFlagSet(cmdFlags.DebugTestEntityFlags) + conbenchCmd.Flags().AddFlagSet(cmdFlags.DebugDontBlameOasisFlag) + conbenchCmd.Flags().AddFlagSet(cmdSigner.CLIFlags) +} diff --git a/go/oasis-node/cmd/debug/conbench/temp-script.sh b/go/oasis-node/cmd/debug/conbench/temp-script.sh new file mode 100755 index 00000000000..705fb9fadfb --- /dev/null +++ b/go/oasis-node/cmd/debug/conbench/temp-script.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# NOTE: This is only a temporary script to make testing easier, it will be +# removed in the final commit. +set -o nounset -o pipefail -o errexit +trap "exit 1" INT + +# Get the root directory of the repository. +ROOT="$(cd $(dirname $0)/../../../../../; pwd -P)" + +# ANSI escape codes to brighten up the output. +GRN=$'\e[32;1m' +OFF=$'\e[0m' + +# Paths to various binaries and config files that we need. +OASIS_NET_RUNNER="${ROOT}/go/oasis-net-runner/oasis-net-runner" +OASIS_NODE="${ROOT}/go/oasis-node/oasis-node" + +# Kill all dangling processes on exit. +cleanup() { + printf "${OFF}" + pkill -P $$ || true + wait || true +} +trap "cleanup" EXIT + +# The base directory for all the node and test env cruft. +TEST_BASE_DIR=$(mktemp -d -t oasis-conbench-XXXXXXXXXX) + +# The oasis-node binary must be in the path for the oasis-net-runner to find it. +export PATH="${PATH}:${ROOT}/go/oasis-node" + +printf "${GRN}### Starting the test network...${OFF}\n" +${OASIS_NET_RUNNER} \ + --basedir.no_temp_dir \ + --basedir ${TEST_BASE_DIR} & + +export OASIS_NODE_GRPC_ADDR="unix:${TEST_BASE_DIR}/net-runner/network/client-0/internal.sock" + +printf "${GRN}### Waiting for all nodes to register...${OFF}\n" +${OASIS_NODE} debug control wait-nodes \ + --address ${OASIS_NODE_GRPC_ADDR} \ + --nodes 1 \ + --wait + +printf "${GRN}### Running benchmark...${OFF}\n" +${OASIS_NODE} debug conbench \ + --address ${OASIS_NODE_GRPC_ADDR} \ + --use_test_entity \ + --num_accounts 100 \ + --num_samples 30 \ + --log.level DEBUG + +# Clean up after a successful run. +rm -rf "${TEST_BASE_DIR}" + +printf "${GRN}### Tests finished.${OFF}\n" diff --git a/go/oasis-node/cmd/debug/debug.go b/go/oasis-node/cmd/debug/debug.go index 67249a2ce3a..aff2dd9324c 100644 --- a/go/oasis-node/cmd/debug/debug.go +++ b/go/oasis-node/cmd/debug/debug.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/debug/byzantine" + "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/debug/conbench" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/debug/consim" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/debug/control" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/debug/dumpdb" @@ -27,6 +28,7 @@ func Register(parentCmd *cobra.Command) { control.Register(debugCmd) consim.Register(debugCmd) dumpdb.Register(debugCmd) + conbench.Register(debugCmd) parentCmd.AddCommand(debugCmd) }