Skip to content

Commit

Permalink
sql: fix pausable portal execution of apply joins with subqueries
Browse files Browse the repository at this point in the history
Previously, when executing a query with an apply join with "inner"
subqueries via pausable portals model we would crash because we'd
incorrectly apply the "pausable portal" execution model to those "inner"
subqueries (even though they need to be executed completely). We already
disabled this execution model for the main "inner" query, but it should
be disabled for the whole apply join iteration.

I decided to not include a release note given that pausable portals are
disabled by default and apply joins with inner subqueries seem like an
edge case.

Release note: None
  • Loading branch information
yuzefovich committed Jul 12, 2023
1 parent a6dbdee commit 411ebc7
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 23 deletions.
44 changes: 21 additions & 23 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ func runPlanInsidePlan(
)
defer recv.Release()

plannerCopy := *params.p
plannerCopy.curPlan.planComponents = *plan
// "Pausable portal" execution model is only applicable to the outer
// statement since we actually need to execute all inner plans to completion
// before we can produce any "outer" rows to be returned to the client, so
// we make sure to unset pausablePortal field on the planner.
plannerCopy.pausablePortal = nil
evalCtxFactory := func() *extendedEvalContext {
evalCtx := params.p.ExtendedEvalContextCopy()
evalCtx.Planner = &plannerCopy
evalCtx.StreamManagerFactory = &plannerCopy
return evalCtx
}

if len(plan.subqueryPlans) != 0 {
// We currently don't support cases when both the "inner" and the
// "outer" plans have subqueries due to limitations of how we're
Expand All @@ -287,26 +301,15 @@ func runPlanInsidePlan(
if len(params.p.curPlan.subqueryPlans) != 0 {
return unimplemented.NewWithIssue(66447, `apply joins with subqueries in the "inner" and "outer" contexts are not supported`)
}
// Right now curPlan.subqueryPlans are the subqueries from the "outer"
// plan (and we know there are none given the check above). If parts of
// the "inner" plan refer to the subqueries, we know that they must
// refer to the "inner" subqueries. To allow for that to happen we have
// to manually replace the subqueries on the planner's curPlan and
// restore the original state before exiting.
oldSubqueries := params.p.curPlan.subqueryPlans
params.p.curPlan.subqueryPlans = plan.subqueryPlans
defer func() {
params.p.curPlan.subqueryPlans = oldSubqueries
}()
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
// return from this method (after the main query is executed).
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
defer subqueryResultMemAcc.Close(ctx)
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
ctx,
params.p,
params.extendedEvalCtx.copy,
&plannerCopy,
evalCtxFactory,
plan.subqueryPlans,
recv,
&subqueryResultMemAcc,
Expand All @@ -315,15 +318,12 @@ func runPlanInsidePlan(
) {
return resultWriter.Err()
}
} else {
// We don't have "inner" subqueries, so the apply join can only refer to
// the "outer" ones.
plannerCopy.curPlan.subqueryPlans = params.p.curPlan.subqueryPlans
}

// Make a copy of the EvalContext so it can be safely modified.
evalCtx := params.p.ExtendedEvalContextCopy()
plannerCopy := *params.p
// If we reach this part when re-executing a pausable portal, we won't want to
// resume the flow bound to it. The inner-plan should have its own lifecycle
// for its flow.
plannerCopy.pausablePortal = nil
distributePlan := getPlanDistribution(
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData().DistSQLMode, plan.main,
Expand All @@ -332,10 +332,8 @@ func runPlanInsidePlan(
if distributePlan.WillDistribute() {
distributeType = DistributionTypeAlways
}
evalCtx := evalCtxFactory()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
planCtx.planner.curPlan.planComponents = *plan
planCtx.ExtendedEvalCtx.Planner = &plannerCopy
planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy
planCtx.stmtType = recv.stmtType
planCtx.mustUseLeafTxn = atomic.LoadUint32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1

Expand Down
58 changes: 58 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
Original file line number Diff line number Diff line change
Expand Up @@ -1247,4 +1247,62 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"SELECT 4"}
{"Type":"ReadyForQuery","TxStatus":"T"}

send
Query {"String": "COMMIT"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"COMMIT"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end

subtest subquery_in_inner_apply_join_plan

send
Query {"String": "DEALLOCATE ALL;"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "DROP TABLE IF EXISTS xy;"}
Query {"String": "CREATE TABLE xy (x INT, y INT);"}
Query {"String": "INSERT INTO xy VALUES (1, 1), (2, 2);"}
Parse {"Name": "q1", "Query": "SELECT * FROM xy JOIN LATERAL (SELECT * FROM (VALUES (x), ((SELECT y FROM xy LIMIT 1))) v(a)) foo ON y = a;"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p1", "MaxRows": 1}
Sync
----

until ignore=NoticeResponse
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"INSERT 0 2"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"DataRow","Values":[{"text":"2"},{"text":"2"},{"text":"2"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end

0 comments on commit 411ebc7

Please sign in to comment.