From daf0662e583b85d602af4db8ed31b5f7602855f8 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 2 Apr 2020 11:15:59 +0200 Subject: [PATCH 1/2] go/txsource/queries: Wait for indexed blocks before GetBlockByHash --- .changelog/2814.bugfix.md | 1 + go/oasis-node/cmd/debug/txsource/workload/queries.go | 12 ++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 .changelog/2814.bugfix.md diff --git a/.changelog/2814.bugfix.md b/.changelog/2814.bugfix.md new file mode 100644 index 00000000000..03a388bbaec --- /dev/null +++ b/.changelog/2814.bugfix.md @@ -0,0 +1 @@ +go/txsource/queries: Wait for indexed blocks before GetBlockByHash diff --git a/go/oasis-node/cmd/debug/txsource/workload/queries.go b/go/oasis-node/cmd/debug/txsource/workload/queries.go index e4b0607e06d..31c9e05a3ae 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/queries.go +++ b/go/oasis-node/cmd/debug/txsource/workload/queries.go @@ -363,6 +363,18 @@ func (q *queries) doRuntimeQueries(ctx context.Context, rng *rand.Rand) error { ) return fmt.Errorf("runtimeClient.GetBlock, round: %d: %w", round, err) } + // GetBlockByHash requires that the block was actually indexed, so wait for it. + err = q.runtime.WaitBlockIndexed(ctx, &runtimeClient.WaitBlockIndexedRequest{ + RuntimeID: q.runtimeID, + Round: round, + }) + if err != nil { + q.logger.Error("Runtime WaitBlockIndexed failure", + "round", round, + "err", err, + ) + return fmt.Errorf("runtimeClient.WaitBlockIndexed, round: %d: %w", round, err) + } block2, err := q.runtime.GetBlockByHash(ctx, &runtimeClient.GetBlockByHashRequest{ RuntimeID: q.runtimeID, BlockHash: block.Header.EncodedHash(), From 7fb0c9387d9eb06ddbf9e46431d8af47afb5d7d4 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 2 Apr 2020 11:51:30 +0200 Subject: [PATCH 2/2] go/txsource/queries: Introduce a per-iteration timeout The purpose of this timeout is to prevent the client being stuck and treating that as an error instead. --- .../cmd/debug/txsource/workload/queries.go | 113 ++++++++++-------- 1 file changed, 65 insertions(+), 48 deletions(-) diff --git a/go/oasis-node/cmd/debug/txsource/workload/queries.go b/go/oasis-node/cmd/debug/txsource/workload/queries.go index 31c9e05a3ae..88c3238b562 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/queries.go +++ b/go/oasis-node/cmd/debug/txsource/workload/queries.go @@ -39,6 +39,11 @@ const ( queriesEarliestHeightRatio = 0.1 // Ratio of queries that should query latest available height. queriesLatestHeightRatio = 0.1 + + // queriesIterationTimeout is the combined timeout for running all the queries that are executed + // in a single iteration. The purpose of this timeout is to prevent the client being stuck and + // treating that as an error instead. + queriesIterationTimeout = 60 * time.Second ) // QueriesFlags are the queries workload flags. @@ -422,6 +427,61 @@ func (q *queries) doRuntimeQueries(ctx context.Context, rng *rand.Rand) error { return nil } +func (q *queries) doQueries(ctx context.Context, rng *rand.Rand) error { + block, err := q.consensus.GetBlock(ctx, consensus.HeightLatest) + if err != nil { + return fmt.Errorf("consensus.GetBlock error: %w", err) + } + + // Determine the earliest height that we can query. + earliestHeight := int64(1) + if numKept := viper.GetInt64(CfgConsensusNumKeptVersions); numKept < block.Height { + earliestHeight = block.Height - numKept + } + + // Select height at which queries should be done. Earliest and latest + // heights are special cased with increased probability to be selected. + var height int64 + p := rng.Float32() + switch { + case p < queriesEarliestHeightRatio: + height = earliestHeight + case p < queriesEarliestHeightRatio+queriesLatestHeightRatio: + height = block.Height + default: + // [earliestHeight, block.Height] + height = rng.Int63n(block.Height-earliestHeight+1) + earliestHeight + } + + q.logger.Debug("Doing queries", + "height", height, + "height_latest", block.Height, + ) + + if err := q.doConsensusQueries(ctx, rng, height); err != nil { + return fmt.Errorf("consensus queries error: %w", err) + } + if err := q.doSchedulerQueries(ctx, rng, height); err != nil { + return fmt.Errorf("scheduler queries error: %w", err) + } + if err := q.doRegistryQueries(ctx, rng, height); err != nil { + return fmt.Errorf("registry queries error: %w", err) + } + if err := q.doStakingQueries(ctx, rng, height); err != nil { + return fmt.Errorf("staking queries error: %w", err) + } + if err := q.doRuntimeQueries(ctx, rng); err != nil { + return fmt.Errorf("runtime queries error: %w", err) + } + + q.logger.Debug("Queries done", + "height", height, + "height_latest", block.Height, + ) + + return nil +} + func (q *queries) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.ClientConn, cnsc consensus.ClientBackend, fundingAccount signature.Signer) error { ctx := context.Background() @@ -458,57 +518,14 @@ func (q *queries) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.C q.runtimeGenesisRound = resp.Header.Round for { - block, err := q.consensus.GetBlock(ctx, consensus.HeightLatest) - if err != nil { - return fmt.Errorf("consensus.GetBlock error: %w", err) - } - - // Determine the earliest height that we can query. - earliestHeight := int64(1) - if numKept := viper.GetInt64(CfgConsensusNumKeptVersions); numKept < block.Height { - earliestHeight = block.Height - numKept - } - - // Select height at which queries should be done. Earliest and latest - // heights are special cased with increased probability to be selected. - var height int64 - p := rng.Float32() - switch { - case p < queriesEarliestHeightRatio: - height = earliestHeight - case p < queriesEarliestHeightRatio+queriesLatestHeightRatio: - height = block.Height - default: - // [earliestHeight, block.Height] - height = rng.Int63n(block.Height-earliestHeight+1) + earliestHeight - } + loopCtx, cancel := context.WithTimeout(ctx, queriesIterationTimeout) - q.logger.Debug("Doing queries", - "height", height, - "height_latest", block.Height, - ) - - if err := q.doConsensusQueries(ctx, rng, height); err != nil { - return fmt.Errorf("consensus queries error: %w", err) - } - if err := q.doSchedulerQueries(ctx, rng, height); err != nil { - return fmt.Errorf("scheduler queries error: %w", err) - } - if err := q.doRegistryQueries(ctx, rng, height); err != nil { - return fmt.Errorf("registry queries error: %w", err) - } - if err := q.doStakingQueries(ctx, rng, height); err != nil { - return fmt.Errorf("staking queries error: %w", err) - } - if err := q.doRuntimeQueries(ctx, rng); err != nil { - return fmt.Errorf("runtime queries error: %w", err) + err := q.doQueries(loopCtx, rng) + cancel() + if err != nil { + return err } - q.logger.Debug("Queries done", - "height", height, - "height_latest", block.Height, - ) - select { case <-time.After(1 * time.Second): case <-gracefulExit.Done():