From 15a91e9f12860d51a2c27ba8c184257101cfd9bd Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Mon, 16 Oct 2023 11:02:04 -0700 Subject: [PATCH] sql: improve inner plan leaf transaction tracking This PR changes planner's `innerPlansMustUseLeafTxn` from being treated as a boolean to a counter instead. As a result, the contract with `getFinishedSetupFn` has changed so that the returned `finishedSetupFn` and `cleanup` functions should only be called once. This change cleans up how we handle leaf transactions when running a plan inside a plan, e.g., executing UDFs. Epic: none Informs: #111097 Release note: None --- pkg/sql/apply_join.go | 19 +++++++++++-------- pkg/sql/distsql_running.go | 12 +++++++----- pkg/sql/logictest/testdata/logic_test/udf_fk | 3 +++ pkg/sql/planner.go | 2 +- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index aa1daeb1222c..94b7b499a21f 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -326,7 +326,7 @@ func runPlanInsidePlan( recv, &subqueryResultMemAcc, false, /* skipDistSQLDiagramGeneration */ - atomic.LoadUint32(¶ms.p.atomic.innerPlansMustUseLeafTxn) == 1, + atomic.LoadInt32(¶ms.p.atomic.innerPlansMustUseLeafTxn) == 1, ) { return resultWriter.Err() } @@ -353,13 +353,16 @@ func runPlanInsidePlan( evalCtx := evalCtxFactory() planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType) planCtx.stmtType = recv.stmtType - planCtx.mustUseLeafTxn = atomic.LoadUint32(¶ms.p.atomic.innerPlansMustUseLeafTxn) == 1 - - finishedSetupFn, cleanup := getFinishedSetupFn(&plannerCopy) - defer cleanup() - execCfg.DistSQLPlanner.PlanAndRun( - ctx, evalCtx, planCtx, plannerCopy.Txn(), plan.main, recv, finishedSetupFn, - ) + planCtx.mustUseLeafTxn = atomic.LoadInt32(¶ms.p.atomic.innerPlansMustUseLeafTxn) == 1 + + // Wrap PlanAndRun in a function call so that we clean up immediately. + func() { + finishedSetupFn, cleanup := getFinishedSetupFn(&plannerCopy) + defer cleanup() + execCfg.DistSQLPlanner.PlanAndRun( + ctx, evalCtx, planCtx, plannerCopy.Txn(), plan.main, recv, finishedSetupFn, + ) + }() // Check if there was an error interacting with the resultWriter. if recv.commErr != nil { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 39d2f63cf152..42271a4a5b69 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1609,16 +1609,16 @@ func (r *DistSQLReceiver) ProducerDone() { // function updates the passed-in planner to make sure that the "inner" plans // use the LeafTxns if the "outer" plan happens to have concurrency. It also // returns a non-nil cleanup function that must be called once all plans (the -// "outer" as well as all "inner" ones) are done. The returned functions can be -// called multiple times. +// "outer" as well as all "inner" ones) are done. The returned functions should +// only be called once. func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow), cleanup func()) { finishedSetupFn = func(localFlow flowinfra.Flow) { if localFlow.GetFlowCtx().Txn.Type() == kv.LeafTxn { - atomic.StoreUint32(&planner.atomic.innerPlansMustUseLeafTxn, 1) + atomic.AddInt32(&planner.atomic.innerPlansMustUseLeafTxn, 1) } } cleanup = func() { - atomic.StoreUint32(&planner.atomic.innerPlansMustUseLeafTxn, 0) + atomic.AddInt32(&planner.atomic.innerPlansMustUseLeafTxn, -1) } return finishedSetupFn, cleanup } @@ -2023,7 +2023,9 @@ func (dsp *DistSQLPlanner) PlanAndRun( } finalizePlanWithRowCount(ctx, localPlanCtx, localPhysPlan, localPlanCtx.planner.curPlan.mainRowCount) recv.expectedRowsRead = int64(localPhysPlan.TotalEstimatedScannedRows) - dsp.Run(ctx, localPlanCtx, txn, localPhysPlan, recv, evalCtx, finishedSetupFn) + // We already called finishedSetupFn in the previous call to Run, since we + // only got here if we got a distributed error, not an error during setup. + dsp.Run(ctx, localPlanCtx, txn, localPhysPlan, recv, evalCtx, nil) } } diff --git a/pkg/sql/logictest/testdata/logic_test/udf_fk b/pkg/sql/logictest/testdata/logic_test/udf_fk index e87c81b345c0..9b4012e11e8a 100644 --- a/pkg/sql/logictest/testdata/logic_test/udf_fk +++ b/pkg/sql/logictest/testdata/logic_test/udf_fk @@ -486,6 +486,9 @@ subtest end # Test a query with both an apply join and a UDF with a cascade. subtest apply_join +statement ok +CREATE TABLE IF NOT EXISTS parent_cascade (p INT PRIMARY KEY); + statement ok CREATE TABLE child_cascade ( c INT PRIMARY KEY, diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 6bd0b345192c..9381195e8ba2 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -195,7 +195,7 @@ type planner struct { // planNodeToRowSource adapter before the "outer" query figured out that // it must use the LeafTxn. Solving that issue properly is not trivial // and is tracked in #41992. - innerPlansMustUseLeafTxn uint32 + innerPlansMustUseLeafTxn int32 } monitor *mon.BytesMonitor