Skip to content

Commit

Permalink
Merge pull request #88378 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.2-88302

release-22.2: colexecdisk: add cancel checking for disk-spilling operators
  • Loading branch information
yuzefovich authored Sep 22, 2022
2 parents c8fb167 + 194ad68 commit ae2278a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colcontainer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/sql/colexecerror",
"//pkg/sql/types",
"//pkg/storage/fs",
"//pkg/util/cancelchecker",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/uuid",
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -545,7 +546,20 @@ func (d *diskQueue) writeFooterAndFlush(ctx context.Context) (err error) {
return nil
}

//gcassert:inline
func checkCancellation(ctx context.Context) error {
select {
case <-ctx.Done():
return cancelchecker.QueryCanceledError
default:
return nil
}
}

func (d *diskQueue) Enqueue(ctx context.Context, b coldata.Batch) error {
if err := checkCancellation(ctx); err != nil {
return err
}
if d.state == diskQueueStateDequeueing {
if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls {
return errors.Errorf(
Expand Down Expand Up @@ -719,6 +733,9 @@ func (d *diskQueue) maybeInitDeserializer(ctx context.Context) (bool, error) {
// Dequeue dequeues a batch from disk and deserializes it into b. Note that the
// deserialized batch is only valid until the next call to Dequeue.
func (d *diskQueue) Dequeue(ctx context.Context, b coldata.Batch) (bool, error) {
if err := checkCancellation(ctx); err != nil {
return false, err
}
if d.serializer != nil && d.numBufferedBatches > 0 {
if err := d.writeFooterAndFlush(ctx); err != nil {
return false, err
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/colexec/colexecdisk/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ const (
// some amount of RAM for its buffer. This is determined by
// maxNumberPartitions variable.
type externalSorter struct {
colexecop.OneInputHelper
colexecop.OneInputNode
colexecop.InitHelper
colexecop.NonExplainable
colexecop.CloserHelper

Expand All @@ -128,6 +129,8 @@ type externalSorter struct {
// the dequeued batches and the output batch.
mergeMemoryLimit int64

cancelChecker colexecutils.CancelChecker

state externalSorterState
inputTypes []*types.T
ordering execinfrapb.Ordering
Expand Down Expand Up @@ -290,7 +293,7 @@ func NewExternalSorter(
partitionedDiskQueueSemaphore = nil
}
es := &externalSorter{
OneInputHelper: colexecop.MakeOneInputHelper(inMemSorter),
OneInputNode: colexecop.NewOneInputNode(inMemSorter),
mergeUnlimitedAllocator: mergeUnlimitedAllocator,
outputUnlimitedAllocator: outputUnlimitedAllocator,
mergeMemoryLimit: mergeMemoryLimit,
Expand Down Expand Up @@ -341,8 +344,17 @@ func (s *externalSorter) resetPartitionsInfo(i int) {
s.partitionsInfo.maxBatchMemSize[i] = 0
}

func (s *externalSorter) Init(ctx context.Context) {
if !s.InitHelper.Init(ctx) {
return
}
s.Input.Init(s.Ctx)
s.cancelChecker.Init(s.Ctx)
}

func (s *externalSorter) Next() coldata.Batch {
for {
s.cancelChecker.CheckEveryCall()
switch s.state {
case externalSorterNewPartition:
b := s.Input.Next()
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/colexecdisk/hash_based_partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type hashBasedPartitioner struct {
fdSemaphore semaphore.Semaphore
acquiredFDs int
}
cancelChecker colexecutils.CancelChecker

partitioners []*colcontainer.PartitionedDiskQueue
partitionedInputs []*partitionerToOperator
Expand Down Expand Up @@ -321,6 +322,7 @@ func (op *hashBasedPartitioner) Init(ctx context.Context) {
for i := range op.inputs {
op.inputs[i].Init(op.Ctx)
}
op.cancelChecker.Init(op.Ctx)
op.partitionsToProcessUsingMain = make(map[int]*hbpPartitionInfo)
// If we are initializing the hash-based partitioner, it means that we had
// to fallback from the in-memory one since the inputs had more tuples that
Expand Down Expand Up @@ -405,6 +407,7 @@ func (op *hashBasedPartitioner) Next() coldata.Batch {
var batches [2]coldata.Batch
StateChanged:
for {
op.cancelChecker.CheckEveryCall()
switch op.state {
case hbpInitialPartitioning:
allZero := true
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,7 @@ func TestLint(t *testing.T) {
var buf strings.Builder
if err := gcassert.GCAssert(&buf,
"../../col/coldata",
"../../sql/colcontainer",
"../../sql/colconv",
"../../sql/colexec",
"../../sql/colexec/colexecagg",
Expand Down

0 comments on commit ae2278a

Please sign in to comment.