From 50605e1474252b083ffa1defd6cd1c207806189a Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 25 Feb 2020 15:59:23 +0100 Subject: [PATCH] go/oasis-node/txsource: add parallel workload --- .changelog/2724.feature.md | 1 + .../cmd/debug/txsource/workload/parallel.go | 121 ++++++++++++++++++ .../cmd/debug/txsource/workload/workload.go | 1 + go/oasis-test-runner/scenario/e2e/txsource.go | 2 + 4 files changed, 125 insertions(+) create mode 100644 .changelog/2724.feature.md create mode 100644 go/oasis-node/cmd/debug/txsource/workload/parallel.go diff --git a/.changelog/2724.feature.md b/.changelog/2724.feature.md new file mode 100644 index 00000000000..78e75b9fdd7 --- /dev/null +++ b/.changelog/2724.feature.md @@ -0,0 +1 @@ +go/oasis-node/txsource: add parallel workload diff --git a/go/oasis-node/cmd/debug/txsource/workload/parallel.go b/go/oasis-node/cmd/debug/txsource/workload/parallel.go new file mode 100644 index 00000000000..fb70777d6b0 --- /dev/null +++ b/go/oasis-node/cmd/debug/txsource/workload/parallel.go @@ -0,0 +1,121 @@ +package workload + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/oasislabs/oasis-core/go/common/crypto/signature" + memorySigner "github.com/oasislabs/oasis-core/go/common/crypto/signature/signers/memory" + "github.com/oasislabs/oasis-core/go/common/logging" + consensus "github.com/oasislabs/oasis-core/go/consensus/api" + "github.com/oasislabs/oasis-core/go/consensus/api/transaction" + runtimeClient "github.com/oasislabs/oasis-core/go/runtime/client/api" + staking "github.com/oasislabs/oasis-core/go/staking/api" +) + +const ( + NameParallel = "parallel" + + parallelSendWaitTimeoutInterval = 30 * time.Second + parallelSendTimeoutInterval = 60 * time.Second + parallelConcurency = 200 + parallelTxGasAmount = 10 +) + +var parallelLogger = logging.GetLogger("cmd/txsource/workload/parallel") + +type parallel struct{} + +func (parallel) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.ClientConn, cnsc consensus.ClientBackend, rtc runtimeClient.RuntimeClient) error { + ctx := context.Background() + + accounts := make([]signature.Signer, parallelConcurency) + var err error + fac := memorySigner.NewFactory() + for i := range accounts { + // NOTE: no balances are needed for now + accounts[i], err = fac.Generate(signature.SignerEntity, rng) + if err != nil { + return fmt.Errorf("memory signer factory Generate account %d: %w", i, err) + } + } + + // A single global nonce is enough as we wait for all submissions to + // complete before proceeding with a new batch. + var nonce uint64 + fee := transaction.Fee{ + Gas: parallelTxGasAmount, + } + + for { + errCh := make(chan error, parallelConcurency) + var wg sync.WaitGroup + wg.Add(parallelConcurency) + + for i := 0; i < parallelConcurency; i++ { + go func(txSigner signature.Signer, nonce uint64) { + defer wg.Done() + + // Transfer tx. + transfer := staking.Transfer{ + To: txSigner.Public(), + } + tx := staking.NewTransferTx(nonce, &fee, &transfer) + + signedTx, err := transaction.Sign(txSigner, tx) + if err != nil { + parallelLogger.Error("transaction.Sign error", "err", err) + errCh <- fmt.Errorf("transaction.Sign: %w", err) + return + } + + parallelLogger.Debug("submitting self transfer", + "account", txSigner.Public(), + ) + if err = cnsc.SubmitTx(ctx, signedTx); err != nil { + parallelLogger.Error("SubmitTx error", "err", err) + errCh <- fmt.Errorf("cnsc.SubmitTx: %w", err) + return + } + + }(accounts[i], nonce) + } + + // Wait for transactions. + waitC := make(chan struct{}) + go func() { + defer close(waitC) + wg.Wait() + nonce++ + }() + + select { + case <-time.After(parallelSendWaitTimeoutInterval): + parallelLogger.Error("transactions not completed within timeout") + return fmt.Errorf("workload parallel: transactions not completed within timeout") + + case err := <-errCh: + parallelLogger.Error("error subimit transaction", + "err", err, + ) + return fmt.Errorf("workload parallel: error submiting transaction: %w", err) + + case <-waitC: + parallelLogger.Debug("all transfers successful", + "concurency", parallelConcurency, + ) + } + + select { + case <-time.After(parallelSendTimeoutInterval): + case <-gracefulExit.Done(): + parallelLogger.Debug("time's up") + return nil + } + } +} diff --git a/go/oasis-node/cmd/debug/txsource/workload/workload.go b/go/oasis-node/cmd/debug/txsource/workload/workload.go index aeb640ec8fe..3a556f56c74 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/workload.go +++ b/go/oasis-node/cmd/debug/txsource/workload/workload.go @@ -30,4 +30,5 @@ var ByName = map[string]Workload{ NameTransfer: transfer{}, NameOversized: oversized{}, NameRegistration: ®istration{}, + NameParallel: parallel{}, } diff --git a/go/oasis-test-runner/scenario/e2e/txsource.go b/go/oasis-test-runner/scenario/e2e/txsource.go index 8dc233d10f1..2446b770a6d 100644 --- a/go/oasis-test-runner/scenario/e2e/txsource.go +++ b/go/oasis-test-runner/scenario/e2e/txsource.go @@ -38,6 +38,7 @@ var TxSourceMultiShort scenario.Scenario = &txSourceImpl{ workload.NameTransfer, workload.NameOversized, workload.NameRegistration, + workload.NameParallel, }, timeLimit: timeLimitShort, livenessCheckInterval: livenessCheckInterval, @@ -50,6 +51,7 @@ var TxSourceMulti scenario.Scenario = &txSourceImpl{ workload.NameTransfer, workload.NameOversized, workload.NameRegistration, + workload.NameParallel, }, timeLimit: timeLimitLong, nodeRestartInterval: nodeRestartIntervalLong,