Skip to content

Commit

Permalink
sql: improve inner plan leaf transaction tracking
Browse files Browse the repository at this point in the history
This PR changes planner's `innerPlansMustUseLeafTxn` from being treated
as a boolean to a counter instead. As a result, the contract with
`getFinishedSetupFn` has changed so that the returned `finishedSetupFn`
and `cleanup` functions should only be called once.

This change cleans up how we handle leaf transactions when running a
plan inside a plan, e.g., executing UDFs.

Epic: none
Informs: #111097

Release note: None
  • Loading branch information
rharding6373 committed Oct 17, 2023
1 parent ab5fb87 commit 15a91e9
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
19 changes: 11 additions & 8 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func runPlanInsidePlan(
recv,
&subqueryResultMemAcc,
false, /* skipDistSQLDiagramGeneration */
atomic.LoadUint32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1,
atomic.LoadInt32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1,
) {
return resultWriter.Err()
}
Expand All @@ -353,13 +353,16 @@ func runPlanInsidePlan(
evalCtx := evalCtxFactory()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
planCtx.stmtType = recv.stmtType
planCtx.mustUseLeafTxn = atomic.LoadUint32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1

finishedSetupFn, cleanup := getFinishedSetupFn(&plannerCopy)
defer cleanup()
execCfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, plannerCopy.Txn(), plan.main, recv, finishedSetupFn,
)
planCtx.mustUseLeafTxn = atomic.LoadInt32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1

// Wrap PlanAndRun in a function call so that we clean up immediately.
func() {
finishedSetupFn, cleanup := getFinishedSetupFn(&plannerCopy)
defer cleanup()
execCfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, plannerCopy.Txn(), plan.main, recv, finishedSetupFn,
)
}()

// Check if there was an error interacting with the resultWriter.
if recv.commErr != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,16 +1609,16 @@ func (r *DistSQLReceiver) ProducerDone() {
// function updates the passed-in planner to make sure that the "inner" plans
// use the LeafTxns if the "outer" plan happens to have concurrency. It also
// returns a non-nil cleanup function that must be called once all plans (the
// "outer" as well as all "inner" ones) are done. The returned functions can be
// called multiple times.
// "outer" as well as all "inner" ones) are done. The returned functions should
// only be called once.
func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow), cleanup func()) {
finishedSetupFn = func(localFlow flowinfra.Flow) {
if localFlow.GetFlowCtx().Txn.Type() == kv.LeafTxn {
atomic.StoreUint32(&planner.atomic.innerPlansMustUseLeafTxn, 1)
atomic.AddInt32(&planner.atomic.innerPlansMustUseLeafTxn, 1)
}
}
cleanup = func() {
atomic.StoreUint32(&planner.atomic.innerPlansMustUseLeafTxn, 0)
atomic.AddInt32(&planner.atomic.innerPlansMustUseLeafTxn, -1)
}
return finishedSetupFn, cleanup
}
Expand Down Expand Up @@ -2023,7 +2023,9 @@ func (dsp *DistSQLPlanner) PlanAndRun(
}
finalizePlanWithRowCount(ctx, localPlanCtx, localPhysPlan, localPlanCtx.planner.curPlan.mainRowCount)
recv.expectedRowsRead = int64(localPhysPlan.TotalEstimatedScannedRows)
dsp.Run(ctx, localPlanCtx, txn, localPhysPlan, recv, evalCtx, finishedSetupFn)
// We already called finishedSetupFn in the previous call to Run, since we
// only got here if we got a distributed error, not an error during setup.
dsp.Run(ctx, localPlanCtx, txn, localPhysPlan, recv, evalCtx, nil)
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/udf_fk
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ subtest end
# Test a query with both an apply join and a UDF with a cascade.
subtest apply_join

statement ok
CREATE TABLE IF NOT EXISTS parent_cascade (p INT PRIMARY KEY);

statement ok
CREATE TABLE child_cascade (
c INT PRIMARY KEY,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type planner struct {
// planNodeToRowSource adapter before the "outer" query figured out that
// it must use the LeafTxn. Solving that issue properly is not trivial
// and is tracked in #41992.
innerPlansMustUseLeafTxn uint32
innerPlansMustUseLeafTxn int32
}

monitor *mon.BytesMonitor
Expand Down

0 comments on commit 15a91e9

Please sign in to comment.