Skip to content

Commit

Permalink
Merge pull request #81838 from yuzefovich/backport22.1-81796
Browse files Browse the repository at this point in the history
release-22.1: sql: don't use the streamer for queries with mutations
  • Loading branch information
yuzefovich authored May 25, 2022
2 parents 7fc2ca2 + a9f67e9 commit e4ca082
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,16 @@ func (dsp *DistSQLPlanner) Run(
localState.Collection = planCtx.planner.Descriptors()
}

// noMutations indicates whether we know for sure that the plan doesn't have
// any mutations. If we don't have the access to the planner (which can be
// the case not on the main query execution path, i.e. BulkIO, CDC, etc),
// then we are ignorant of the details of the execution plan, so we choose
// to be on the safe side and mark 'noMutations' as 'false'.
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)

if planCtx.isLocal {
localState.IsLocal = true
if planCtx.parallelizeScansIfLocal {
if noMutations && planCtx.parallelizeScansIfLocal {
// Even though we have a single flow on the gateway node, we might
// have decided to parallelize the scans. If that's the case, we
// will need to use the Leaf txn.
Expand All @@ -448,24 +455,33 @@ func (dsp *DistSQLPlanner) Run(
}
}
}
// Even if planCtx.isLocal is false (which is the case when we think it's
// worth distributing the query), we need to go through the processors to
// figure out whether any of them have concurrency.
//
// At the moment of writing, this is only relevant whenever the Streamer API
// might be used by some of the processors. The Streamer internally can have
// concurrency, so it expects to be given a LeafTxn. In order for that
// LeafTxn to be created later, during the flow setup, we need to populate
// leafInputState below, so we tell the localState that there is
// concurrency.
if row.CanUseStreamer(ctx, dsp.st) {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
if jr.IsIndexJoin() {
// Index joins are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
if noMutations {
// Even if planCtx.isLocal is false (which is the case when we think
// it's worth distributing the query), we need to go through the
// processors to figure out whether any of them have concurrency.
//
// However, the concurrency requires the usage of LeafTxns which is only
// acceptable if we don't have any mutations in the plan.
// TODO(yuzefovich): we could be smarter here and allow the usage of the
// RootTxn by the mutations while still using the Streamer (that gets a
// LeafTxn) iff the plan is such that there is no concurrency between
// the root and the leaf txns.
//
// At the moment of writing, this is only relevant whenever the Streamer
// API might be used by some of the processors. The Streamer internally
// can have concurrency, so it expects to be given a LeafTxn. In order
// for that LeafTxn to be created later, during the flow setup, we need
// to populate leafInputState below, so we tell the localState that
// there is concurrency.
if row.CanUseStreamer(ctx, dsp.st) {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
if jr.IsIndexJoin() {
// Index joins are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
}
}
}
}
Expand Down

0 comments on commit e4ca082

Please sign in to comment.