Skip to content

Commit

Permalink
go/txsource/queries: Introduce a per-iteration timeout
Browse files Browse the repository at this point in the history
The purpose of this timeout is to prevent the client being stuck and treating
that as an error instead.
  • Loading branch information
kostko committed Apr 2, 2020
1 parent daf0662 commit 7fb0c93
Showing 1 changed file with 65 additions and 48 deletions.
113 changes: 65 additions & 48 deletions go/oasis-node/cmd/debug/txsource/workload/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
queriesEarliestHeightRatio = 0.1
// Ratio of queries that should query latest available height.
queriesLatestHeightRatio = 0.1

// queriesIterationTimeout is the combined timeout for running all the queries that are executed
// in a single iteration. The purpose of this timeout is to prevent the client being stuck and
// treating that as an error instead.
queriesIterationTimeout = 60 * time.Second
)

// QueriesFlags are the queries workload flags.
Expand Down Expand Up @@ -422,6 +427,61 @@ func (q *queries) doRuntimeQueries(ctx context.Context, rng *rand.Rand) error {
return nil
}

func (q *queries) doQueries(ctx context.Context, rng *rand.Rand) error {
block, err := q.consensus.GetBlock(ctx, consensus.HeightLatest)
if err != nil {
return fmt.Errorf("consensus.GetBlock error: %w", err)
}

// Determine the earliest height that we can query.
earliestHeight := int64(1)
if numKept := viper.GetInt64(CfgConsensusNumKeptVersions); numKept < block.Height {
earliestHeight = block.Height - numKept
}

// Select height at which queries should be done. Earliest and latest
// heights are special cased with increased probability to be selected.
var height int64
p := rng.Float32()
switch {
case p < queriesEarliestHeightRatio:
height = earliestHeight
case p < queriesEarliestHeightRatio+queriesLatestHeightRatio:
height = block.Height
default:
// [earliestHeight, block.Height]
height = rng.Int63n(block.Height-earliestHeight+1) + earliestHeight
}

q.logger.Debug("Doing queries",
"height", height,
"height_latest", block.Height,
)

if err := q.doConsensusQueries(ctx, rng, height); err != nil {
return fmt.Errorf("consensus queries error: %w", err)
}
if err := q.doSchedulerQueries(ctx, rng, height); err != nil {
return fmt.Errorf("scheduler queries error: %w", err)
}
if err := q.doRegistryQueries(ctx, rng, height); err != nil {
return fmt.Errorf("registry queries error: %w", err)
}
if err := q.doStakingQueries(ctx, rng, height); err != nil {
return fmt.Errorf("staking queries error: %w", err)
}
if err := q.doRuntimeQueries(ctx, rng); err != nil {
return fmt.Errorf("runtime queries error: %w", err)
}

q.logger.Debug("Queries done",
"height", height,
"height_latest", block.Height,
)

return nil
}

func (q *queries) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.ClientConn, cnsc consensus.ClientBackend, fundingAccount signature.Signer) error {
ctx := context.Background()

Expand Down Expand Up @@ -458,57 +518,14 @@ func (q *queries) Run(gracefulExit context.Context, rng *rand.Rand, conn *grpc.C
q.runtimeGenesisRound = resp.Header.Round

for {
block, err := q.consensus.GetBlock(ctx, consensus.HeightLatest)
if err != nil {
return fmt.Errorf("consensus.GetBlock error: %w", err)
}

// Determine the earliest height that we can query.
earliestHeight := int64(1)
if numKept := viper.GetInt64(CfgConsensusNumKeptVersions); numKept < block.Height {
earliestHeight = block.Height - numKept
}

// Select height at which queries should be done. Earliest and latest
// heights are special cased with increased probability to be selected.
var height int64
p := rng.Float32()
switch {
case p < queriesEarliestHeightRatio:
height = earliestHeight
case p < queriesEarliestHeightRatio+queriesLatestHeightRatio:
height = block.Height
default:
// [earliestHeight, block.Height]
height = rng.Int63n(block.Height-earliestHeight+1) + earliestHeight
}
loopCtx, cancel := context.WithTimeout(ctx, queriesIterationTimeout)

q.logger.Debug("Doing queries",
"height", height,
"height_latest", block.Height,
)

if err := q.doConsensusQueries(ctx, rng, height); err != nil {
return fmt.Errorf("consensus queries error: %w", err)
}
if err := q.doSchedulerQueries(ctx, rng, height); err != nil {
return fmt.Errorf("scheduler queries error: %w", err)
}
if err := q.doRegistryQueries(ctx, rng, height); err != nil {
return fmt.Errorf("registry queries error: %w", err)
}
if err := q.doStakingQueries(ctx, rng, height); err != nil {
return fmt.Errorf("staking queries error: %w", err)
}
if err := q.doRuntimeQueries(ctx, rng); err != nil {
return fmt.Errorf("runtime queries error: %w", err)
err := q.doQueries(loopCtx, rng)
cancel()
if err != nil {
return err
}

q.logger.Debug("Queries done",
"height", height,
"height_latest", block.Height,
)

select {
case <-time.After(1 * time.Second):
case <-gracefulExit.Done():
Expand Down

0 comments on commit 7fb0c93

Please sign in to comment.