Skip to content

Commit

Permalink
Merge pull request #2814 from oasislabs/kostko/fix/txsource-rt-query-…
Browse files Browse the repository at this point in the history
…blkhash

go/txsource/queries: Wait for indexed blocks before GetBlockByHash
  • Loading branch information
kostko authored Apr 2, 2020
2 parents 70011b1 + 7fb0c93 commit ce667e7
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 48 deletions.
1 change: 1 addition & 0 deletions .changelog/2814.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/txsource/queries: Wait for indexed blocks before GetBlockByHash
125 changes: 77 additions & 48 deletions go/oasis-node/cmd/debug/txsource/workload/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
queriesEarliestHeightRatio = 0.1
// Ratio of queries that should query latest available height.
queriesLatestHeightRatio = 0.1

// queriesIterationTimeout is the combined timeout for running all the queries that are executed
// in a single iteration. The purpose of this timeout is to prevent the client being stuck and
// treating that as an error instead.
queriesIterationTimeout = 60 * time.Second
)

// QueriesFlags are the queries workload flags.
Expand Down Expand Up @@ -363,6 +368,18 @@ func (q *queries) doRuntimeQueries(ctx context.Context, rng *rand.Rand) error {
)
return fmt.Errorf("runtimeClient.GetBlock, round: %d: %w", round, err)
}
// GetBlockByHash requires that the block was actually indexed, so wait for it.
err = q.runtime.WaitBlockIndexed(ctx, &runtimeClient.WaitBlockIndexedRequest{
RuntimeID: q.runtimeID,
Round: round,
})
if err != nil {
q.logger.Error("Runtime WaitBlockIndexed failure",
"round", round,
"err", err,
)
return fmt.Errorf("runtimeClient.WaitBlockIndexed, round: %d: %w", round, err)
}
block2, err := q.runtime.GetBlockByHash(ctx, &runtimeClient.GetBlockByHashRequest{
RuntimeID: q.runtimeID,
BlockHash: block.Header.EncodedHash(),
Expand Down Expand Up @@ -410,6 +427,61 @@ func (q *queries) doRuntimeQueries(ctx context.Context, rng *rand.Rand) error {
return nil
}

func (q *queries) doQueries(ctx context.Context, rng *rand.Rand) error {
block, err := q.consensus.GetBlock(ctx, consensus.HeightLatest)
if err != nil {
return fmt.Errorf("consensus.GetBlock error: %w", err)
}

// Determine the earliest height that we can query.
earliestHeight := int64(1)
if numKept := viper.GetInt64(CfgConsensusNumKeptVersions); numKept < block.Height {
earliestHeight = block.Height - numKept
}

// Select height at which queries should be done. Earliest and latest
// heights are special cased with increased probability to be selected.
var height int64
p := rng.Float32()
switch {
case p < queriesEarliestHeightRatio:
height = earliestHeight
case p < queriesEarliestHeightRatio+queriesLatestHeightRatio:
height = block.Height
default:
// [earliestHeight, block.Height]
height = rng.Int63n(block.Height-earliestHeight+1) + earliestHeight
}

q.logger.Debug("Doing queries",
"height", height,
"height_latest", block.Height,
)

if err := q.doConsensusQueries(ctx, rng, height); err != nil {
return fmt.Errorf("consensus queries error: %w", err)
}
if err := q.doSchedulerQueries(ctx, rng, height); err != nil {
return fmt.Errorf("scheduler queries error: %w", err)
}
if err := q.doRegistryQueries(ctx, rng, height); err != nil {
return fmt.Errorf("registry queries error: %w", err)
}
if err := q.doStakingQueries(ctx, rng, height); err != nil {
return fmt.Errorf("staking queries error: %w", err)
}
if err := q.doRuntimeQueries(ctx, rng); err != nil {
return fmt.Errorf("runtime queries error: %w", err)
}

q.logger.Debug("Queries done",
"height", height,
"height_latest", block.Height,
)

return nil
}

func (q *queries) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.ClientConn, cnsc consensus.ClientBackend, fundingAccount signature.Signer) error {
ctx := context.Background()

Expand Down Expand Up @@ -446,57 +518,14 @@ func (q *queries) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.C
q.runtimeGenesisRound = resp.Header.Round

for {
block, err := q.consensus.GetBlock(ctx, consensus.HeightLatest)
if err != nil {
return fmt.Errorf("consensus.GetBlock error: %w", err)
}

// Determine the earliest height that we can query.
earliestHeight := int64(1)
if numKept := viper.GetInt64(CfgConsensusNumKeptVersions); numKept < block.Height {
earliestHeight = block.Height - numKept
}
loopCtx, cancel := context.WithTimeout(ctx, queriesIterationTimeout)

// Select height at which queries should be done. Earliest and latest
// heights are special cased with increased probability to be selected.
var height int64
p := rng.Float32()
switch {
case p < queriesEarliestHeightRatio:
height = earliestHeight
case p < queriesEarliestHeightRatio+queriesLatestHeightRatio:
height = block.Height
default:
// [earliestHeight, block.Height]
height = rng.Int63n(block.Height-earliestHeight+1) + earliestHeight
}

q.logger.Debug("Doing queries",
"height", height,
"height_latest", block.Height,
)

if err := q.doConsensusQueries(ctx, rng, height); err != nil {
return fmt.Errorf("consensus queries error: %w", err)
}
if err := q.doSchedulerQueries(ctx, rng, height); err != nil {
return fmt.Errorf("scheduler queries error: %w", err)
}
if err := q.doRegistryQueries(ctx, rng, height); err != nil {
return fmt.Errorf("registry queries error: %w", err)
}
if err := q.doStakingQueries(ctx, rng, height); err != nil {
return fmt.Errorf("staking queries error: %w", err)
}
if err := q.doRuntimeQueries(ctx, rng); err != nil {
return fmt.Errorf("runtime queries error: %w", err)
err := q.doQueries(loopCtx, rng)
cancel()
if err != nil {
return err
}

q.logger.Debug("Queries done",
"height", height,
"height_latest", block.Height,
)

select {
case <-time.After(1 * time.Second):
case <-gracefulExit.Done():
Expand Down

0 comments on commit ce667e7

Please sign in to comment.