Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
136631: colexec: add cancellation check to top K sort and merge join r=yuzefovich a=yuzefovich

This commit adds the cancellation check to the top K sorter and the merge joiner to be performed on every input batch they read. I think that these two are the only buffering operators that currently don't have any cancellation checks:
- for hash joins and hash aggregation we have the check when performing the hashing
- for ordered aggregator, buffered window functions, cross join, external sort and other disk-backed operators we do the check on each input batch
- for general sort we do the check in the PDQ sort of each column.

Fixes: cockroachdb#136457.

Release note (bug fix): CockroachDB now better respects `statement_timeout` limit on queries involving the top K sort and merge join operations.

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Dec 4, 2024
2 parents 5918e85 + 81b0200 commit 1498cc5
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ type mergeJoinBase struct {
left mergeJoinInput
right mergeJoinInput

cancelChecker colexecutils.CancelChecker

// Output buffer definition.
output coldata.Batch
outputCapacity int
Expand All @@ -574,6 +576,7 @@ var _ colexecop.Closer = &mergeJoinBase{}

func (o *mergeJoinBase) Reset(ctx context.Context) {
o.TwoInputInitHelper.Reset(ctx)
o.cancelChecker.Init(ctx)
o.state = mjEntry
o.bufferedGroup.helper.Reset(ctx)
o.proberState.lBatch = nil
Expand All @@ -597,6 +600,7 @@ func (o *mergeJoinBase) Init(ctx context.Context) {
o.memoryLimit, o.diskQueueCfg, o.fdSemaphore, o.diskAcc, o.diskQueueMemAcc,
)
o.bufferedGroup.helper.init(o.Ctx)
o.cancelChecker.Init(o.Ctx)

o.builderState.lGroups = make([]group, 1)
o.builderState.rGroups = make([]group, 1)
Expand Down Expand Up @@ -739,6 +743,7 @@ func (o *mergeJoinBase) sourceFinished() bool {
// and updates the probing and buffered group states accordingly.
func (o *mergeJoinBase) continueLeftBufferedGroup() {
// Get the next batch from the left.
o.cancelChecker.CheckEveryCall()
o.proberState.lIdx, o.proberState.lBatch = 0, o.InputOne.Next()
o.proberState.lLength = o.proberState.lBatch.Length()
o.bufferedGroup.leftGroupStartIdx = 0
Expand Down Expand Up @@ -805,6 +810,7 @@ func (o *mergeJoinBase) finishRightBufferedGroup() {
// (or until we exhaust the right input).
func (o *mergeJoinBase) completeRightBufferedGroup() {
// Get the next batch from the right.
o.cancelChecker.CheckEveryCall()
o.proberState.rIdx, o.proberState.rBatch = 0, o.InputTwo.Next()
o.proberState.rLength = o.proberState.rBatch.Length()
// The right input has been fully exhausted.
Expand Down Expand Up @@ -870,6 +876,7 @@ func (o *mergeJoinBase) completeRightBufferedGroup() {
// The buffered group is still not complete which means that we have
// just appended all the tuples from batch to it, so we need to get a
// fresh batch from the input.
o.cancelChecker.CheckEveryCall()
o.proberState.rIdx, o.proberState.rBatch = 0, o.InputTwo.Next()
o.proberState.rLength = o.proberState.rBatch.Length()
if o.proberState.rLength == 0 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,10 +1366,12 @@ func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next() coldata.Batch {
// If this is the first batch or we're done with the current batch,
// get the next batch.
if o.proberState.lBatch == nil || (o.proberState.lLength != 0 && o.proberState.lIdx == o.proberState.lLength) {
o.cancelChecker.CheckEveryCall()
o.proberState.lIdx, o.proberState.lBatch = 0, o.InputOne.Next()
o.proberState.lLength = o.proberState.lBatch.Length()
}
if o.proberState.rBatch == nil || (o.proberState.rLength != 0 && o.proberState.rIdx == o.proberState.rLength) {
o.cancelChecker.CheckEveryCall()
o.proberState.rIdx, o.proberState.rBatch = 0, o.InputTwo.Next()
o.proberState.rLength = o.proberState.rBatch.Length()
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/sorttopk.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type topKSorter struct {
emitted int
output coldata.Batch

cancelChecker colexecutils.CancelChecker

exportedFromTopK int
exportedFromBatch int
windowedBatch coldata.Batch
Expand Down Expand Up @@ -150,6 +152,7 @@ func (t *topKSorter) Init(ctx context.Context) {
t.orderState.distincter.Init(t.Ctx)
t.orderState.group = make([]int, t.k)
}
t.cancelChecker.Init(t.Ctx)
}

func (t *topKSorter) Next() coldata.Batch {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/sorttopk_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
// execgen:template<partialOrder>
// execgen:inline
func nextBatch(t *topKSorter, partialOrder bool) {
t.cancelChecker.CheckEveryCall()
t.inputBatch = t.Input.Next()
if partialOrder {
t.orderState.distincterInput.SetBatch(t.inputBatch)
Expand Down

0 comments on commit 1498cc5

Please sign in to comment.