Skip to content

Commit

Permalink
oasis-node/cmd: Add consensus benchmark subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Aug 11, 2020
1 parent 5083907 commit 524ed49
Show file tree
Hide file tree
Showing 4 changed files with 441 additions and 0 deletions.
1 change: 1 addition & 0 deletions .changelog/3183.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
oasis-node/cmd: Add consensus benchmark subcommand
384 changes: 384 additions & 0 deletions go/oasis-node/cmd/debug/conbench/conbench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,384 @@
package conbench

import (
"context"
"crypto"
"fmt"
"math/rand"
"sync/atomic"
"time"

"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/oasisprotocol/oasis-core/go/common/crypto/drbg"
"github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
memorySigner "github.com/oasisprotocol/oasis-core/go/common/crypto/signature/signers/memory"
"github.com/oasisprotocol/oasis-core/go/common/entity"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/quantity"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/control/api"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common"
cmdFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
cmdGrpc "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/grpc"
cmdSigner "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/signer"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
)

const (
// How many test accounts to create?
// This also controls the number of parallel transfers.
CfgNumAccounts = "num_accounts"

// Number of samples.
CfgNumSamples = "num_samples"

// Use test entity for funding?
CfgUseTestEntity = "use_test_entity"
)

var (
logger = logging.GetLogger("cmd/conbench")
conbenchCmd = &cobra.Command{
Use: "conbench",
Short: "benchmark consensus layer",
RunE: doRun,
}
)

func transfer(ctx context.Context, cc consensus.ClientBackend, from signature.Signer, toAddr staking.Address, amount uint64) error {
fromAddr := staking.NewAddress(from.Public())

// Get sender's nonce.
nonce, err := cc.GetSignerNonce(ctx, &consensus.GetSignerNonceRequest{
AccountAddress: fromAddr,
Height: consensus.HeightLatest,
})
if err != nil {
return fmt.Errorf("unable to get sender's nonce: %w", err)
}

// Construct transfer transaction.
transfer := staking.Transfer{
To: toAddr,
}
if err = transfer.Amount.FromUint64(amount); err != nil {
return fmt.Errorf("unable to convert given amount from uint64: %w", err)
}

var fee transaction.Fee
tx := staking.NewTransferTx(nonce, &fee, &transfer)

gas, err := cc.EstimateGas(ctx, &consensus.EstimateGasRequest{
Signer: from.Public(),
Transaction: tx,
})
if err != nil {
return fmt.Errorf("unable to estimate gas: %w", err)
}

tx.Fee.Gas = gas
if err = tx.Fee.Amount.FromUint64(uint64(gas)); err != nil {
return fmt.Errorf("unable to convert fee amount from uint64: %w", err)
}

signedTx, err := transaction.Sign(from, tx)
if err != nil {
return fmt.Errorf("unable to sign transfer transaction: %w", err)
}

// Submit with timeout to avoid blocking forever if the client node
// is skipping CheckTx checks.
submissionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err = cc.SubmitTx(submissionCtx, signedTx); err != nil {
return err
}
return nil
}

func refund(ctx context.Context, cc consensus.ClientBackend, sc staking.Backend, from signature.Signer, toAddr staking.Address) error {
// Fetch account info.
acct, err := sc.Account(ctx, &staking.OwnerQuery{
Height: consensus.HeightLatest,
Owner: staking.NewAddress(from.Public()),
})
if err != nil {
return fmt.Errorf("unable to fetch account balance: %w", err)
}

// Since we're dealing with tiny amounts, we can afford this hack.
amount := acct.General.Balance.ToBigInt().Uint64()

// Refund.
if err = transfer(ctx, cc, from, toAddr, amount); err != nil {
return fmt.Errorf("unable to refund from account: %w", err)
}

return nil
}

func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
cmd.SilenceUsage = true

if err := common.Init(); err != nil {
common.EarlyLogAndExit(err)
}

numAccounts := viper.GetUint64(CfgNumAccounts)
numSamples := viper.GetUint64(CfgNumSamples)

if numAccounts < 1 {
return fmt.Errorf("number of accounts must be >= 1")
}
if numSamples < 3 {
return fmt.Errorf("number of samples must be >= 3")
}

ctx := context.Background()

// Connect to node.
logger.Debug("dialing node", "addr", viper.GetString(cmdGrpc.CfgAddress))
conn, err := cmdGrpc.NewClient(cmd)
if err != nil {
return fmt.Errorf("unable to connect to node: %w", err)
}
defer conn.Close()

cc := consensus.NewConsensusClient(conn)
sc := staking.NewStakingClient(conn)
ncc := api.NewNodeControllerClient(conn)

// Set chain context from genesis document obtained from the node.
genDoc, err := cc.GetGenesisDocument(ctx)
if err != nil {
return fmt.Errorf("unable to obtain genesis document from node: %w", err)
}
genDoc.SetChainContext()

// Create new DRBG.
src, err := drbg.New(crypto.SHA512, []byte("consensus benchmark random seeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeed"), nil, []byte("consensus benchmark"))
if err != nil {
return fmt.Errorf("unable to create deterministic random generator: %w", err)
}
rng := rand.New(mathrand.New(src))

// Wait for the node to sync.
logger.Debug("waiting for node sync")
if err = ncc.WaitSync(context.Background()); err != nil {
return fmt.Errorf("node controller client WaitSync: %w", err)
}
logger.Debug("node synced")

// Create multiple accounts.
account := make([]struct {
signer signature.Signer
addr staking.Address
}, numAccounts)
msf := memorySigner.NewFactory()
for a := range account {
signer, grr := msf.Generate(signature.SignerEntity, rng)
if grr != nil {
return fmt.Errorf("unable to generate account %d: %w", a, grr)
}
account[a].signer = signer
account[a].addr = staking.NewAddress(signer.Public())
}

var fundingSigner signature.Signer

if viper.GetBool(CfgUseTestEntity) {
// Use test entity for funding.
_, fundingSigner, _ = entity.TestEntity()
} else {
// Use given signer for funding.
signerDir, grr := cmdSigner.CLIDirOrPwd()
if grr != nil {
return fmt.Errorf("failed to retrieve signer dir: %w", grr)
}
signerFactory, grr := cmdSigner.NewFactory(cmdSigner.Backend(), signerDir, signature.SignerEntity)
if grr != nil {
return fmt.Errorf("failed to create signer factory: %w", grr)
}
fundingSigner, grr = signerFactory.Load(signature.SignerEntity)
if grr != nil {
return fmt.Errorf("failed to load signer: %w", grr)
}
}

fundingAddr := staking.NewAddress(fundingSigner.Public())

// Check if funding account has enough funds.
logger.Debug("checking if funding account has enough funds")
fundingAcctInfo, err := sc.Account(ctx, &staking.OwnerQuery{
Height: consensus.HeightLatest,
Owner: fundingAddr,
})
if err != nil {
return fmt.Errorf("unable to fetch funding account balance: %w", err)
}
requiredFunds := quantity.NewFromUint64(numAccounts * numSamples)
availableFunds := fundingAcctInfo.General.Balance
if availableFunds.Cmp(requiredFunds) < 0 {
return fmt.Errorf("funding account has insufficient funds (%s required, %s available)", requiredFunds.String(), availableFunds.String())
}
logger.Debug("funding account has enough funds",
"required", requiredFunds.String(),
"available", availableFunds.String(),
)

// Fund all accounts from the funding account.
logger.Info("funding test accounts",
"num_accounts", numAccounts,
)
for a := range account {
// Each account gets numSamples tokens.
if errr := transfer(ctx, cc, fundingSigner, account[a].addr, numSamples); errr != nil {
// An error has happened while funding, make sure to refund the
// funding account from the accounts funded until this point.
for i := 0; i < a; i++ {
if grr := refund(ctx, cc, sc, account[i].signer, fundingAddr); grr != nil {
// Tough luck.
logger.Error("unable to refund from account",
"account_address", account[i].addr,
"err", grr,
)
continue
}
}
return fmt.Errorf("unable to fund account %d: %w", a, errr)
}
}

logger.Info("starting benchmark")

// Submit time is the time required to submit the transaction and wait for
// it to be included in a block.
// Latency is the submit time + the time required for the transaction to
// appear as an event.
// TODO: Maybe measure latency as just the latter, without submit time?

var (
totalSubmitTimeNs uint64
numSubmitSamples uint64
numSubmitErrors uint64
totalLatencyNs uint64
numLatencySamples uint64
numLatencyTimeouts uint64
)

// Perform benchmark in parallel, one goroutine per account.
doneCh := make(chan bool, numAccounts*numSamples)
for a := range account {
go func(idx uint64) {
for s := uint64(0); s < numSamples; s++ {
fromIdx := idx
toIdx := (idx + 1) % numAccounts

fromAddr := account[fromIdx].addr
toAddr := account[toIdx].addr

startT := time.Now()
if err = transfer(ctx, cc, account[fromIdx].signer, toAddr, 1); err != nil {
atomic.AddUint64(&numSubmitErrors, 1)
doneCh <- true
continue
}
atomic.AddUint64(&totalSubmitTimeNs, uint64(time.Since(startT).Nanoseconds()))
atomic.AddUint64(&numSubmitSamples, 1)

// Wait for the transfer to appear as an event.
// Do the waiting in a separate goroutine, so we can keep
// submitting transactions as fast as possible.
go func(fromAddr, toAddr staking.Address, latencyStartT time.Time) {
ch, sub, grr := sc.WatchTransfers(ctx)
if grr != nil {
logger.Error("unable to watch for transfers", "err", grr)
doneCh <- true
return
}
defer sub.Close()

TransferWaitLoop:
for {
select {
case evt := <-ch:
if evt.From.Equal(fromAddr) && evt.To.Equal(toAddr) && evt.Amount.Cmp(quantity.NewFromUint64(1)) == 0 {
// If we care about precision, then this
// should also subtract the time it takes
// to spawn this goroutine, set-up watching
// transfers, etc.
// Since the results are on the order of
// seconds, the time for the above is
// negligible and can be ignored.
atomic.AddUint64(&totalLatencyNs, uint64(time.Since(latencyStartT).Nanoseconds()))
atomic.AddUint64(&numLatencySamples, 1)
break TransferWaitLoop
}
case <-time.After(60 * time.Second):
atomic.AddUint64(&numLatencyTimeouts, 1)
break TransferWaitLoop
}
}
doneCh <- true
}(fromAddr, toAddr, startT)
}
}(uint64(a))
}

// Wait for all goroutines to finish.
for i := uint64(0); i < numAccounts*numSamples; i++ {
<-doneCh
}

avgSubmitTimeNs := float64(totalSubmitTimeNs / numSubmitSamples)
avgLatencyNs := float64(totalLatencyNs / numLatencySamples)

logger.Info("benchmark finished",
"avg_submit_time_s", avgSubmitTimeNs/1.0e9,
"transactions_per_second", 1.0e9/avgSubmitTimeNs,
"submit_samples", numSubmitSamples,
"submit_errors", numSubmitErrors,
"avg_latency_s", avgLatencyNs/1.0e9,
"latency_samples", numLatencySamples,
"latency_timeouts", numLatencyTimeouts,
)

// Refund money into original funding account.
logger.Info("refunding money")
for a := range account {
if err = refund(ctx, cc, sc, account[a].signer, fundingAddr); err != nil {
logger.Error("unable to refund from account",
"account_address", account[a].addr,
"err", err,
)
continue
}
}
logger.Info("money refunded")

return nil
}

// Register registers the conbench sub-command.
func Register(parentCmd *cobra.Command) {
parentCmd.AddCommand(conbenchCmd)
}

func init() {
fs := flag.NewFlagSet("", flag.ContinueOnError)
fs.Uint64(CfgNumAccounts, 10, "Number of accounts to create for benchmarking (also level of parallelism)")
fs.Uint64(CfgNumSamples, 30, "Number of samples per account")
fs.Bool(CfgUseTestEntity, false, "Use test entity for funding")
_ = viper.BindPFlags(fs)
conbenchCmd.Flags().AddFlagSet(fs)

conbenchCmd.Flags().AddFlagSet(cmdGrpc.ClientFlags)
conbenchCmd.Flags().AddFlagSet(cmdFlags.DebugTestEntityFlags)
conbenchCmd.Flags().AddFlagSet(cmdFlags.DebugDontBlameOasisFlag)
conbenchCmd.Flags().AddFlagSet(cmdSigner.CLIFlags)
}
Loading

0 comments on commit 524ed49

Please sign in to comment.