Skip to content

Commit

Permalink
go/oasis-node/txsource: add parallel workload
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Feb 26, 2020
1 parent 66d5a62 commit 366c2ff
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
1 change: 1 addition & 0 deletions .changelog/2724.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/oasis-node/txsource: add parallel workload
119 changes: 119 additions & 0 deletions go/oasis-node/cmd/debug/txsource/workload/parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package workload

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"google.golang.org/grpc"

"github.com/oasislabs/oasis-core/go/common/crypto/signature"
memorySigner "github.com/oasislabs/oasis-core/go/common/crypto/signature/signers/memory"
"github.com/oasislabs/oasis-core/go/common/logging"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/consensus/api/transaction"
runtimeClient "github.com/oasislabs/oasis-core/go/runtime/client/api"
staking "github.com/oasislabs/oasis-core/go/staking/api"
)

const (
NameParallel = "parallel"

parallelSendWaitTimeoutInterval = 10 * time.Second
parallelSendTimeoutInterval = 60 * time.Second
parallelConcurency = 200
parallelTxGasAmount = 10
)

var parallelLogger = logging.GetLogger("cmd/txsource/workload/parallel")

type parallel struct{}

func (parallel) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.ClientConn, cnsc consensus.ClientBackend, rtc runtimeClient.RuntimeClient) error {
ctx := context.Background()

accounts := make([]signature.Signer, parallelConcurency)
var err error
fac := memorySigner.NewFactory()
for i := range accounts {
accounts[i], err = fac.Generate(signature.SignerEntity, rng)
if err != nil {
return fmt.Errorf("memory signer factory Generate account %d: %w", i, err)
}
}

var nonce uint64
fee := transaction.Fee{
Gas: parallelTxGasAmount,
}

for {
errCh := make(chan error, parallelConcurency)
var wg sync.WaitGroup
wg.Add(parallelConcurency)

for i := 0; i < parallelConcurency; i++ {

go func(txSigner signature.Signer, nonce uint64) {
defer wg.Done()

// Transfer tx.
transfer := staking.Transfer{
To: txSigner.Public(),
}
tx := staking.NewTransferTx(nonce, &fee, &transfer)

signedTx, err := transaction.Sign(txSigner, tx)
if err != nil {
parallelLogger.Error("transaction.Sign error", "err", err)
errCh <- fmt.Errorf("transaction.Sign: %w", err)
return
}

parallelLogger.Debug("submitting self transfer",
"account", txSigner.Public(),
)
if err = cnsc.SubmitTx(ctx, signedTx); err != nil {
parallelLogger.Error("SubmitTx error", "err", err)
errCh <- fmt.Errorf("cnsc.SubmitTx: %w", err)
return
}

}(accounts[i], nonce)
}

// Wait for transactions.
waitC := make(chan struct{})
go func() {
defer close(waitC)
wg.Wait()
nonce++
}()

select {
case <-time.After(parallelSendWaitTimeoutInterval):
parallelLogger.Error("transactions not completed within timeout")
return fmt.Errorf("workload parallel: transactions not completed within timeout")

case err := <-errCh:
parallelLogger.Error("error subimit transaction",
"err", err,
)
return fmt.Errorf("workload parallel: error submiting transaction: %w", err)

case <-waitC:
parallelLogger.Debug("all transfers successfull",
"concurency", parallelConcurency,
)
}

select {
case <-time.After(parallelSendTimeoutInterval):
case <-gracefulExit.Done():
parallelLogger.Debug("time's up")
return nil
}
}
}
1 change: 1 addition & 0 deletions go/oasis-node/cmd/debug/txsource/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Workload interface {
var ByName = map[string]Workload{
NameTransfer: transfer{},
NameOversized: oversized{},
NameParallel: parallel{},
}
2 changes: 2 additions & 0 deletions go/oasis-test-runner/scenario/e2e/txsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var TxSourceMultiShort scenario.Scenario = &txSourceImpl{
workloads: []string{
workload.NameTransfer,
workload.NameOversized,
workload.NameParallel,
},
timeLimit: timeLimitShort,
livenessCheckInterval: livenessCheckInterval,
Expand All @@ -48,6 +49,7 @@ var TxSourceMulti scenario.Scenario = &txSourceImpl{
workloads: []string{
workload.NameTransfer,
workload.NameOversized,
workload.NameParallel,
},
timeLimit: timeLimitLong,
nodeRestartInterval: nodeRestartIntervalLong,
Expand Down

0 comments on commit 366c2ff

Please sign in to comment.