diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index b3c735dbaa9e..2ca44d47a597 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -437,9 +437,16 @@ func (dsp *DistSQLPlanner) Run( localState.Collection = planCtx.planner.Descriptors() } + // noMutations indicates whether we know for sure that the plan doesn't have + // any mutations. If we don't have the access to the planner (which can be + // the case not on the main query execution path, i.e. BulkIO, CDC, etc), + // then we are ignorant of the details of the execution plan, so we choose + // to be on the safe side and mark 'noMutations' as 'false'. + noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation) + if planCtx.isLocal { localState.IsLocal = true - if planCtx.parallelizeScansIfLocal { + if noMutations && planCtx.parallelizeScansIfLocal { // Even though we have a single flow on the gateway node, we might // have decided to parallelize the scans. If that's the case, we // will need to use the Leaf txn. @@ -448,24 +455,33 @@ func (dsp *DistSQLPlanner) Run( } } } - // Even if planCtx.isLocal is false (which is the case when we think it's - // worth distributing the query), we need to go through the processors to - // figure out whether any of them have concurrency. - // - // At the moment of writing, this is only relevant whenever the Streamer API - // might be used by some of the processors. The Streamer internally can have - // concurrency, so it expects to be given a LeafTxn. In order for that - // LeafTxn to be created later, during the flow setup, we need to populate - // leafInputState below, so we tell the localState that there is - // concurrency. - if row.CanUseStreamer(ctx, dsp.st) { - for _, proc := range plan.Processors { - if jr := proc.Spec.Core.JoinReader; jr != nil { - if jr.IsIndexJoin() { - // Index joins are executed via the Streamer API that has - // concurrency. - localState.HasConcurrency = true - break + if noMutations { + // Even if planCtx.isLocal is false (which is the case when we think + // it's worth distributing the query), we need to go through the + // processors to figure out whether any of them have concurrency. + // + // However, the concurrency requires the usage of LeafTxns which is only + // acceptable if we don't have any mutations in the plan. + // TODO(yuzefovich): we could be smarter here and allow the usage of the + // RootTxn by the mutations while still using the Streamer (that gets a + // LeafTxn) iff the plan is such that there is no concurrency between + // the root and the leaf txns. + // + // At the moment of writing, this is only relevant whenever the Streamer + // API might be used by some of the processors. The Streamer internally + // can have concurrency, so it expects to be given a LeafTxn. In order + // for that LeafTxn to be created later, during the flow setup, we need + // to populate leafInputState below, so we tell the localState that + // there is concurrency. + if row.CanUseStreamer(ctx, dsp.st) { + for _, proc := range plan.Processors { + if jr := proc.Spec.Core.JoinReader; jr != nil { + if jr.IsIndexJoin() { + // Index joins are executed via the Streamer API that has + // concurrency. + localState.HasConcurrency = true + break + } } } }