-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathapply_join.go
403 lines (368 loc) · 13.8 KB
/
apply_join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"strconv"
"sync/atomic"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
// applyJoinNode implements apply join: the execution component of correlated
// subqueries. Note that only correlated subqueries that the optimizer's
// tranformations couldn't decorrelate get planned using apply joins.
// The node reads rows from the left planDataSource, and for each
// row, re-plans the right side of the join after replacing its outer columns
// with the corresponding values from the current row on the left. The new right
// plan is then executed and joined with the left row according to normal join
// semantics. This node doesn't support right or full outer joins, or set
// operations.
type applyJoinNode struct {
joinType descpb.JoinType
// The data source with no outer columns.
input planDataSource
// pred represents the join predicate.
pred *joinPredicate
// columns contains the metadata for the results of this node.
columns colinfo.ResultColumns
// rightTypes is the schema of the rows produced by the right side of the
// join, as built in the optimization phase. Later on, every re-planning of
// the right side will emit these same columns.
rightTypes []*types.T
planRightSideFn exec.ApplyJoinPlanRightSideFn
// iterationCount tracks the number of times planRightSideFn has been
// invoked (in other words, the number of left rows for which we have
// performed the apply join so far).
iterationCount int
run struct {
// emptyRight is a cached, all-NULL slice that's used for left outer joins
// in the case of finding no match on the left.
emptyRight tree.Datums
// leftRow is the current left row being processed.
leftRow tree.Datums
// leftRowFoundAMatch is set to true when a left row found any match at all,
// so that left outer joins and antijoins can know to output a row.
leftRowFoundAMatch bool
// rightRows will be populated with the result of the right side of the join
// each time it's run.
rightRows rowContainerHelper
// rightRowsIterator, if non-nil, is the iterator into rightRows.
rightRowsIterator *rowContainerIterator
// out is the full result row, populated on each call to Next.
out tree.Datums
// done is true if the left side has been exhausted.
done bool
}
}
func newApplyJoinNode(
joinType descpb.JoinType,
left planDataSource,
rightCols colinfo.ResultColumns,
pred *joinPredicate,
planRightSideFn exec.ApplyJoinPlanRightSideFn,
) (planNode, error) {
switch joinType {
case descpb.RightOuterJoin, descpb.FullOuterJoin:
return nil, errors.AssertionFailedf("unsupported right outer apply join: %d", redact.Safe(joinType))
case descpb.ExceptAllJoin, descpb.IntersectAllJoin:
return nil, errors.AssertionFailedf("unsupported apply set op: %d", redact.Safe(joinType))
case descpb.RightSemiJoin, descpb.RightAntiJoin:
return nil, errors.AssertionFailedf("unsupported right semi/anti apply join: %d", redact.Safe(joinType))
}
return &applyJoinNode{
joinType: joinType,
input: left,
pred: pred,
rightTypes: getTypesFromResultColumns(rightCols),
planRightSideFn: planRightSideFn,
columns: pred.cols,
}, nil
}
func (a *applyJoinNode) startExec(params runParams) error {
// If needed, pre-allocate a right row of NULL tuples for when the
// join predicate fails to match.
if a.joinType == descpb.LeftOuterJoin {
a.run.emptyRight = make(tree.Datums, len(a.rightTypes))
for i := range a.run.emptyRight {
a.run.emptyRight[i] = tree.DNull
}
}
a.run.out = make(tree.Datums, len(a.columns))
a.run.rightRows.Init(params.ctx, a.rightTypes, params.extendedEvalCtx, "apply-join" /* opName */)
return nil
}
func (a *applyJoinNode) Next(params runParams) (bool, error) {
if a.run.done {
return false, nil
}
for {
if a.run.rightRowsIterator != nil {
// We have right rows set up - check the next one for a match.
for {
// Note that if a.rightTypes has zero length, non-nil rrow is
// returned the correct number of times.
rrow, err := a.run.rightRowsIterator.Next()
if err != nil {
return false, err
}
if rrow == nil {
// We have exhausted all rows from the right side.
break
}
// Compute join.
predMatched, err := a.pred.eval(params.ctx, params.EvalContext(), a.run.leftRow, rrow)
if err != nil {
return false, err
}
if !predMatched {
// Didn't match? Try with the next right-side row.
continue
}
a.run.leftRowFoundAMatch = true
if a.joinType == descpb.LeftAntiJoin ||
a.joinType == descpb.LeftSemiJoin {
// We found a match, but we're doing an anti or semi join,
// so we're done with this left row.
break
}
// We're doing an ordinary join, so prep the row and emit it.
a.pred.prepareRow(a.run.out, a.run.leftRow, rrow)
return true, nil
}
// We're either out of right side rows or we broke out of the loop
// before consuming all right rows because we found a match for an
// anti or semi join. Clear the right rows to prepare them for the
// next left row.
if err := a.clearRightRows(params); err != nil {
return false, err
}
}
// We're out of right side rows. Reset the match state for next time.
foundAMatch := a.run.leftRowFoundAMatch
a.run.leftRowFoundAMatch = false
if a.run.leftRow != nil {
// If we have a left row already, we have to check to see if we need to
// emit rows for semi, outer, or anti joins.
if foundAMatch {
if a.joinType == descpb.LeftSemiJoin {
// We found a match, and we're doing an semi-join, so we're done
// with this left row after we output it.
a.pred.prepareRow(a.run.out, a.run.leftRow, nil)
a.run.leftRow = nil
return true, nil
}
} else {
// We found no match. Output LEFT OUTER or ANTI match if necessary.
switch a.joinType {
case descpb.LeftOuterJoin:
a.pred.prepareRow(a.run.out, a.run.leftRow, a.run.emptyRight)
a.run.leftRow = nil
return true, nil
case descpb.LeftAntiJoin:
a.pred.prepareRow(a.run.out, a.run.leftRow, nil)
a.run.leftRow = nil
return true, nil
}
}
}
// We need a new row on the left.
ok, err := a.input.plan.Next(params)
if err != nil {
return false, err
}
if !ok {
// No more rows on the left. Goodbye!
a.run.done = true
return false, nil
}
// Extract the values of the outer columns of the other side of the apply
// from the latest input row.
leftRow := a.input.plan.Values()
a.run.leftRow = leftRow
a.iterationCount++
// At this point, it's time to do the major lift of apply join: re-planning
// the right side of the join using the optimizer, with all outer columns
// in the right side replaced by the bindings that were defined by the most
// recently read left row.
if err := a.runNextRightSideIteration(params, leftRow); err != nil {
return false, err
}
// We've got fresh right rows. Continue along in the loop, which will deal
// with joining the right plan's output with our left row.
}
}
// clearRightRows clears rightRows and resets rightRowsIterator. This function
// must be called before reusing rightRows and rightRowsIterator.
func (a *applyJoinNode) clearRightRows(params runParams) error {
if err := a.run.rightRows.Clear(params.ctx); err != nil {
return err
}
a.run.rightRowsIterator.Close()
a.run.rightRowsIterator = nil
return nil
}
// runNextRightSideIteration generates a planTop based on the re-optimized right
// hand side of the apply join given the next left row and runs the plan to
// completion, stashing the result in a.run.rightRows, ready for retrieval. An
// error indicates that something went wrong during execution of the right hand
// side of the join, and that we should completely give up on the outer join.
func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree.Datums) error {
opName := "apply-join-iteration-" + strconv.Itoa(a.iterationCount)
ctx, sp := tracing.ChildSpan(params.ctx, opName)
defer sp.Finish()
p, err := a.planRightSideFn(ctx, newExecFactory(ctx, params.p), leftRow)
if err != nil {
return err
}
plan := p.(*planComponents)
rowResultWriter := NewRowResultWriter(&a.run.rightRows)
if err := runPlanInsidePlan(
ctx, params, plan, rowResultWriter, nil, /* deferredRoutineSender */
); err != nil {
return err
}
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows)
return nil
}
// runPlanInsidePlan is used to run a plan and gather the results in the
// resultWriter, as part of the execution of an "outer" plan.
func runPlanInsidePlan(
ctx context.Context,
params runParams,
plan *planComponents,
resultWriter rowResultWriter,
deferredRoutineSender eval.DeferredRoutineSender,
) error {
defer plan.close(ctx)
execCfg := params.ExecCfg()
recv := MakeDistSQLReceiver(
ctx, resultWriter, tree.Rows,
execCfg.RangeDescriptorCache,
params.p.Txn(),
execCfg.Clock,
params.p.extendedEvalCtx.Tracing,
)
defer recv.Release()
plannerCopy := *params.p
plannerCopy.curPlan.planComponents = *plan
// "Pausable portal" execution model is only applicable to the outer
// statement since we actually need to execute all inner plans to completion
// before we can produce any "outer" rows to be returned to the client, so
// we make sure to unset pausablePortal field on the planner.
plannerCopy.pausablePortal = nil
evalCtxFactory := func() *extendedEvalContext {
plannerCopy.extendedEvalCtx = *params.p.ExtendedEvalContextCopy()
evalCtx := &plannerCopy.extendedEvalCtx
evalCtx.Planner = &plannerCopy
evalCtx.StreamManagerFactory = &plannerCopy
if deferredRoutineSender != nil {
evalCtx.RoutineSender = deferredRoutineSender
}
return evalCtx
}
if len(plan.subqueryPlans) != 0 {
// We currently don't support cases when both the "inner" and the
// "outer" plans have subqueries due to limitations of how we're
// propagating the results of the subqueries.
// TODO(mgartner): We should be able to lift this restriction for
// apply-joins, similarly to how subqueries within UDFs are planned - as
// routines instead of subqueries.
if len(params.p.curPlan.subqueryPlans) != 0 {
return unimplemented.NewWithIssue(66447, `apply joins with subqueries in the "inner" and "outer" contexts are not supported`)
}
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
// return from this method (after the main query is executed).
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
defer subqueryResultMemAcc.Close(ctx)
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
ctx,
&plannerCopy,
evalCtxFactory,
plan.subqueryPlans,
recv,
&subqueryResultMemAcc,
false, /* skipDistSQLDiagramGeneration */
atomic.LoadInt32(¶ms.p.atomic.innerPlansMustUseLeafTxn) == 1,
) {
return resultWriter.Err()
}
} else {
// We don't have "inner" subqueries, so the apply join can only refer to
// the "outer" ones.
plannerCopy.curPlan.subqueryPlans = params.p.curPlan.subqueryPlans
// During cleanup, nil out the inner subquery plans before closing the plan
// components. Otherwise, we may inadvertently close nodes that are needed
// when executing the outer query.
defer func() {
plan.subqueryPlans = nil
}()
}
distributePlan := getPlanDistribution(
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData().DistSQLMode, plan.main,
)
distributeType := DistributionType(DistributionTypeNone)
if distributePlan.WillDistribute() {
distributeType = DistributionTypeAlways
}
evalCtx := evalCtxFactory()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
planCtx.stmtType = recv.stmtType
planCtx.mustUseLeafTxn = atomic.LoadInt32(¶ms.p.atomic.innerPlansMustUseLeafTxn) == 1
// Wrap PlanAndRun in a function call so that we clean up immediately.
func() {
finishedSetupFn, cleanup := getFinishedSetupFn(&plannerCopy)
defer cleanup()
execCfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, plannerCopy.Txn(), plan.main, recv, finishedSetupFn,
)
}()
// Check if there was an error interacting with the resultWriter.
if recv.commErr != nil {
return recv.commErr
}
if resultWriter.Err() != nil {
return resultWriter.Err()
}
evalCtxFactory2 := func(usedConcurrently bool) *extendedEvalContext {
return evalCtxFactory()
}
plannerCopy.autoCommit = false
execCfg.DistSQLPlanner.PlanAndRunCascadesAndChecks(
ctx, &plannerCopy, evalCtxFactory2, &plannerCopy.curPlan.planComponents, recv,
)
// We might have appended some cascades or checks to the plannerCopy, so we
// need to update the plan for cleanup purposes before proceeding.
*plan = plannerCopy.curPlan.planComponents
if recv.commErr != nil {
return recv.commErr
}
return resultWriter.Err()
}
func (a *applyJoinNode) Values() tree.Datums {
return a.run.out
}
func (a *applyJoinNode) Close(ctx context.Context) {
a.input.plan.Close(ctx)
a.run.rightRows.Close(ctx)
if a.run.rightRowsIterator != nil {
a.run.rightRowsIterator.Close()
a.run.rightRowsIterator = nil
}
}