From 366c2ffd2b85f0b04f83f2c4b9f64dd9a6adf64c Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 25 Feb 2020 15:59:23 +0100 Subject: [PATCH] go/oasis-node/txsource: add parallel workload --- .changelog/2724.feature.md | 1 + .../cmd/debug/txsource/workload/parallel.go | 119 ++++++++++++++++++ .../cmd/debug/txsource/workload/workload.go | 1 + go/oasis-test-runner/scenario/e2e/txsource.go | 2 + 4 files changed, 123 insertions(+) create mode 100644 .changelog/2724.feature.md create mode 100644 go/oasis-node/cmd/debug/txsource/workload/parallel.go diff --git a/.changelog/2724.feature.md b/.changelog/2724.feature.md new file mode 100644 index 00000000000..78e75b9fdd7 --- /dev/null +++ b/.changelog/2724.feature.md @@ -0,0 +1 @@ +go/oasis-node/txsource: add parallel workload diff --git a/go/oasis-node/cmd/debug/txsource/workload/parallel.go b/go/oasis-node/cmd/debug/txsource/workload/parallel.go new file mode 100644 index 00000000000..678a2cd8a3e --- /dev/null +++ b/go/oasis-node/cmd/debug/txsource/workload/parallel.go @@ -0,0 +1,119 @@ +package workload + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/oasislabs/oasis-core/go/common/crypto/signature" + memorySigner "github.com/oasislabs/oasis-core/go/common/crypto/signature/signers/memory" + "github.com/oasislabs/oasis-core/go/common/logging" + consensus "github.com/oasislabs/oasis-core/go/consensus/api" + "github.com/oasislabs/oasis-core/go/consensus/api/transaction" + runtimeClient "github.com/oasislabs/oasis-core/go/runtime/client/api" + staking "github.com/oasislabs/oasis-core/go/staking/api" +) + +const ( + NameParallel = "parallel" + + parallelSendWaitTimeoutInterval = 10 * time.Second + parallelSendTimeoutInterval = 60 * time.Second + parallelConcurency = 200 + parallelTxGasAmount = 10 +) + +var parallelLogger = logging.GetLogger("cmd/txsource/workload/parallel") + +type parallel struct{} + +func (parallel) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.ClientConn, cnsc consensus.ClientBackend, rtc runtimeClient.RuntimeClient) error { + ctx := context.Background() + + accounts := make([]signature.Signer, parallelConcurency) + var err error + fac := memorySigner.NewFactory() + for i := range accounts { + accounts[i], err = fac.Generate(signature.SignerEntity, rng) + if err != nil { + return fmt.Errorf("memory signer factory Generate account %d: %w", i, err) + } + } + + var nonce uint64 + fee := transaction.Fee{ + Gas: parallelTxGasAmount, + } + + for { + errCh := make(chan error, parallelConcurency) + var wg sync.WaitGroup + wg.Add(parallelConcurency) + + for i := 0; i < parallelConcurency; i++ { + + go func(txSigner signature.Signer, nonce uint64) { + defer wg.Done() + + // Transfer tx. + transfer := staking.Transfer{ + To: txSigner.Public(), + } + tx := staking.NewTransferTx(nonce, &fee, &transfer) + + signedTx, err := transaction.Sign(txSigner, tx) + if err != nil { + parallelLogger.Error("transaction.Sign error", "err", err) + errCh <- fmt.Errorf("transaction.Sign: %w", err) + return + } + + parallelLogger.Debug("submitting self transfer", + "account", txSigner.Public(), + ) + if err = cnsc.SubmitTx(ctx, signedTx); err != nil { + parallelLogger.Error("SubmitTx error", "err", err) + errCh <- fmt.Errorf("cnsc.SubmitTx: %w", err) + return + } + + }(accounts[i], nonce) + } + + // Wait for transactions. + waitC := make(chan struct{}) + go func() { + defer close(waitC) + wg.Wait() + nonce++ + }() + + select { + case <-time.After(parallelSendWaitTimeoutInterval): + parallelLogger.Error("transactions not completed within timeout") + return fmt.Errorf("workload parallel: transactions not completed within timeout") + + case err := <-errCh: + parallelLogger.Error("error subimit transaction", + "err", err, + ) + return fmt.Errorf("workload parallel: error submiting transaction: %w", err) + + case <-waitC: + parallelLogger.Debug("all transfers successfull", + "concurency", parallelConcurency, + ) + } + + select { + case <-time.After(parallelSendTimeoutInterval): + case <-gracefulExit.Done(): + parallelLogger.Debug("time's up") + return nil + } + } +} diff --git a/go/oasis-node/cmd/debug/txsource/workload/workload.go b/go/oasis-node/cmd/debug/txsource/workload/workload.go index 91f376dcb35..181539292ac 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/workload.go +++ b/go/oasis-node/cmd/debug/txsource/workload/workload.go @@ -29,4 +29,5 @@ type Workload interface { var ByName = map[string]Workload{ NameTransfer: transfer{}, NameOversized: oversized{}, + NameParallel: parallel{}, } diff --git a/go/oasis-test-runner/scenario/e2e/txsource.go b/go/oasis-test-runner/scenario/e2e/txsource.go index f5ea91adfbc..ce1234bcf34 100644 --- a/go/oasis-test-runner/scenario/e2e/txsource.go +++ b/go/oasis-test-runner/scenario/e2e/txsource.go @@ -37,6 +37,7 @@ var TxSourceMultiShort scenario.Scenario = &txSourceImpl{ workloads: []string{ workload.NameTransfer, workload.NameOversized, + workload.NameParallel, }, timeLimit: timeLimitShort, livenessCheckInterval: livenessCheckInterval, @@ -48,6 +49,7 @@ var TxSourceMulti scenario.Scenario = &txSourceImpl{ workloads: []string{ workload.NameTransfer, workload.NameOversized, + workload.NameParallel, }, timeLimit: timeLimitLong, nodeRestartInterval: nodeRestartIntervalLong,