Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: sql: don't use the streamer for queries with mutations #81838

Merged
merged 1 commit into from
May 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,16 @@ func (dsp *DistSQLPlanner) Run(
localState.Collection = planCtx.planner.Descriptors()
}

// noMutations indicates whether we know for sure that the plan doesn't have
// any mutations. If we don't have the access to the planner (which can be
// the case not on the main query execution path, i.e. BulkIO, CDC, etc),
// then we are ignorant of the details of the execution plan, so we choose
// to be on the safe side and mark 'noMutations' as 'false'.
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)

if planCtx.isLocal {
localState.IsLocal = true
if planCtx.parallelizeScansIfLocal {
if noMutations && planCtx.parallelizeScansIfLocal {
// Even though we have a single flow on the gateway node, we might
// have decided to parallelize the scans. If that's the case, we
// will need to use the Leaf txn.
Expand All @@ -448,24 +455,33 @@ func (dsp *DistSQLPlanner) Run(
}
}
}
// Even if planCtx.isLocal is false (which is the case when we think it's
// worth distributing the query), we need to go through the processors to
// figure out whether any of them have concurrency.
//
// At the moment of writing, this is only relevant whenever the Streamer API
// might be used by some of the processors. The Streamer internally can have
// concurrency, so it expects to be given a LeafTxn. In order for that
// LeafTxn to be created later, during the flow setup, we need to populate
// leafInputState below, so we tell the localState that there is
// concurrency.
if row.CanUseStreamer(ctx, dsp.st) {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
if jr.IsIndexJoin() {
// Index joins are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
if noMutations {
// Even if planCtx.isLocal is false (which is the case when we think
// it's worth distributing the query), we need to go through the
// processors to figure out whether any of them have concurrency.
//
// However, the concurrency requires the usage of LeafTxns which is only
// acceptable if we don't have any mutations in the plan.
// TODO(yuzefovich): we could be smarter here and allow the usage of the
// RootTxn by the mutations while still using the Streamer (that gets a
// LeafTxn) iff the plan is such that there is no concurrency between
// the root and the leaf txns.
//
// At the moment of writing, this is only relevant whenever the Streamer
// API might be used by some of the processors. The Streamer internally
// can have concurrency, so it expects to be given a LeafTxn. In order
// for that LeafTxn to be created later, during the flow setup, we need
// to populate leafInputState below, so we tell the localState that
// there is concurrency.
if row.CanUseStreamer(ctx, dsp.st) {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
if jr.IsIndexJoin() {
// Index joins are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
}
}
}
}
Expand Down