Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: make sorts and joins actually spill to disk #45318

Merged
merged 4 commits into from
Feb 27, 2020
Merged

colexec: make sorts and joins actually spill to disk #45318

merged 4 commits into from
Feb 27, 2020

Conversation

asubiotto
Copy link
Contributor

@asubiotto asubiotto commented Feb 24, 2020

Refer to commits for more details. The big changes here are that sorts and joins now use DiskQueues and a file descriptor limit has been put in place using a weighted semaphore.

Closes #42407
Closes #44807

@asubiotto asubiotto requested a review from yuzefovich February 24, 2020 14:05
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@asubiotto
Copy link
Contributor Author

asubiotto commented Feb 24, 2020

This is now RFAL. The current PR performs an external sort of tpch.lineitem in 26s vs 31s for the row execution engine with a memory limit of 64MiB (this is with DISCARD ROWS, so results haven't been checked for correctness). I think there is some more performance tuning we can do to reduce the number of partitions while avoiding repartitioning, but it's great to see it outperform the row execution engine without any performance tuning!

@asubiotto asubiotto requested a review from a team February 24, 2020 18:22
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the second commit another tests we should tweak are TestHashJoinerAgainstProcessor and TestSortAgainstProcessor.

Reviewed 4 of 4 files at r1, 11 of 11 files at r2, 21 of 21 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colcontainer/partitionedqueue.go, line 92 at r3 (raw file):

const (
	// PartitionerStrategyDefault is a partitioner strategy in which the
	// PartitionedQueue will keep partitions all partitions open for writing.

nit: "partitions all partitions".


pkg/sql/colcontainer/partitionedqueue.go, line 149 at r3 (raw file):

func (p *PartitionedDiskQueue) closeWritePartition(idx int, releaseFD bool) error {
	if p.partitions[idx].state != partitionStateWriting {
		panic(fmt.Sprintf("illegal state change from %d to partitionStateClosedForWriting, only partitionStateWriting allowed", p.partitions[idx].state))

Should this be execerror.VectorizedInternalPanic? We probably want to add sql/colcontainer package to the linter.


pkg/sql/colcontainer/partitionedqueue_test.go, line 165 at r3 (raw file):

	queueCfg.FS = countingFS

	// TODO(asubiotto): Switch back.

Not sure what this means.


pkg/sql/colcontainer/partitionedqueue_test.go, line 236 at r3 (raw file):

			// We shouldn't have Dequeued an empty batch.
			require.True(t, batch.Length() != 0)

Should this check be moved to right after calls to Dequeue?


pkg/sql/colexec/external_sort.go, line 206 at r3 (raw file):

				// externalSorterSpillPartition will check and re-merge if not.
				// Proceed to the final merging state.
				s.inputDone = true

Indeed, you're right. I think you can go one step further: we can remove s.inputDone field.


pkg/sql/colexec/external_sort.go, line 239 at r3 (raw file):

			continue
		case externalSorterRepeatedMerging:
			// TODO(asubiotto): Shouldn't this be if s.numPartitions <

I think actually both options are equivalent, but yours is easier to read.


pkg/sql/colexec/external_sort.go, line 242 at r3 (raw file):

			//  s.maxNumberPartitions?
			if s.numPartitions < s.maxNumberPartitions {
				if s.inputDone {

This can also be removed.


pkg/sql/colexec/external_sort.go, line 251 at r3 (raw file):

			// TODO(asubiotto): Is there a way to simplify this state transition?
			numPartitionsToMerge := s.maxNumberPartitions
			if numPartitionsToMerge > s.numPartitions {

This if is actually always false, so it can be removed as well.


pkg/sql/colexec/external_sort.go, line 254 at r3 (raw file):

				numPartitionsToMerge = s.numPartitions
			}
			if numPartitionsToMerge == s.numPartitions && s.inputDone {

And this if can be removed.


pkg/sql/colexec/external_sort_test.go, line 155 at r3 (raw file):

	// Interesting disk spilling scenarios:
	// 1) The sorter is forced to spill to disk as soon as possible.
	// 2) The memory limit is set to mon.DefaultPoolAllocationSize, this will

nit: this comment needs an adjustment.


pkg/sql/colexec/routers_test.go, line 206 at r3 (raw file):

			// also get unblocked with no data other than the zero batch.
			unblockEvent: func(_ Operator, o *routerOutputOp) {
				o.addBatch(coldata.ZeroBatch, nil /* selection */)

I find such comments useful.


pkg/sql/colexec/testutils.go, line 193 at r3 (raw file):

func (s *TestingSemaphore) Release(n int) int {
	if n < 0 {
		panic("releasing a negative amount")

The linter will complain about not using execerror.


pkg/sql/colflow/vectorized_flow.go, line 194 at r1 (raw file):

// TODO(asubiotto): Remove once #45098 is resolved.
func (f *vectorizedFlow) tryRemoveAll(root string) error {
	// Unfortunately there is no way to Stat using TempFS, so simply try all

What is "Stat" here?


pkg/sql/execinfra/server_config.go, line 137 at r3 (raw file):

	TempFS fs.FS

	// VecFDSemaphore is a weighted semaphore that restricts the number of open

Why is it "weighted"?

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/explain_plan.go, line 233 at r6 (raw file):

		if ctxSessionData.VectorizeMode == sessiondata.VectorizeOff {
			isVec = false
		} else if !vectorizedThresholdMet && (ctxSessionData.VectorizeMode == sessiondata.Vectorize192Auto || ctxSessionData.VectorizeMode == sessiondata.VectorizeAuto) {

Here you said on the previous PR that we should remove VectorizeAuto from this condition. I don't think we should because if we don't meet the vectorized threshold, we still don't want to run it or display true, right?


pkg/sql/colcontainer/partitionedqueue.go, line 92 at r3 (raw file):

Previously, yuzefovich wrote…

nit: "partitions all partitions".

Done.


pkg/sql/colcontainer/partitionedqueue.go, line 149 at r3 (raw file):

Previously, yuzefovich wrote…

Should this be execerror.VectorizedInternalPanic? We probably want to add sql/colcontainer package to the linter.

Done. Let me know if it looks good.


pkg/sql/colcontainer/partitionedqueue_test.go, line 165 at r3 (raw file):

Previously, yuzefovich wrote…

Not sure what this means.

A note to self to remove the explicit setting of {max,num}Repartitions that slipped through.


pkg/sql/colcontainer/partitionedqueue_test.go, line 236 at r3 (raw file):

Previously, yuzefovich wrote…

Should this check be moved to right after calls to Dequeue?

Actually not sure why that check was still there, removed.


pkg/sql/colexec/external_sort.go, line 206 at r3 (raw file):

Previously, yuzefovich wrote…

Indeed, you're right. I think you can go one step further: we can remove s.inputDone field.

Done.


pkg/sql/colexec/external_sort.go, line 242 at r3 (raw file):

Previously, yuzefovich wrote…

This can also be removed.

Should we assume that s.numPartitions == s.maxNumberPartitions when transitioning to this stage? Might be easier. This state will also always transition to a new partition after merging.


pkg/sql/colexec/external_sort.go, line 251 at r3 (raw file):

Previously, yuzefovich wrote…

This if is actually always false, so it can be removed as well.

Done.


pkg/sql/colexec/external_sort.go, line 254 at r3 (raw file):

Previously, yuzefovich wrote…

And this if can be removed.

Done.


pkg/sql/colexec/external_sort.go, line 279 at r3 (raw file):

			}
			s.firstPartitionIdx += numPartitionsToMerge
			s.numPartitions -= numPartitionsToMerge - 1

We can always set this to 1. Makes me wonder if we should try tuning the number of partitions to reduce to at some point and seeing the performance differences.


pkg/sql/colexec/external_sort_test.go, line 155 at r3 (raw file):

Previously, yuzefovich wrote…

nit: this comment needs an adjustment.

Done.


pkg/sql/colexec/routers_test.go, line 206 at r3 (raw file):

Previously, yuzefovich wrote…

I find such comments useful.

I do too! But it seems like goland doesn't when doing a signature change.


pkg/sql/colexec/testutils.go, line 193 at r3 (raw file):

Previously, yuzefovich wrote…

The linter will complain about not using execerror.

Done.


pkg/sql/colflow/vectorized_flow.go, line 194 at r1 (raw file):

Previously, yuzefovich wrote…

What is "Stat" here?

It's short for status, it's a filesystem call to get the status of a file/dir to get whether it is a file or a directory in this case.


pkg/sql/execinfra/server_config.go, line 137 at r3 (raw file):

Previously, yuzefovich wrote…

Why is it "weighted"?

So that we can acquire/release > 1 resource (if we only ever needed to Acquire(1), we could just use a channel). The "weight" just refers to the number used when calling Acquire, and it's weighted in the sense that some callers will have a greater weight/i.e. more resources acquired.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 23 of 23 files at r4, 23 of 23 files at r5, 6 of 6 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/distsql_running.go, line 150 at r6 (raw file):

	if evalCtx.SessionData.VectorizeMode != sessiondata.VectorizeOff {
		if !vectorizeThresholdMet && evalCtx.SessionData.VectorizeMode == sessiondata.Vectorize192Auto {

We need to add the condition for VectorizeAuto here as well.


pkg/sql/explain_plan.go, line 233 at r6 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Here you said on the previous PR that we should remove VectorizeAuto from this condition. I don't think we should because if we don't meet the vectorized threshold, we still don't want to run it or display true, right?

Yeah, I think you're right. But in another place we need to auto to the condition.


pkg/sql/colcontainer/partitionedqueue_test.go, line 301 at r5 (raw file):

			// We shouldn't have Dequeued an empty batch.
			require.True(t, batch.Length() != 0)

Probably this needs to be removed as well?


pkg/sql/colexec/execplan.go, line 790 at r6 (raw file):

			// A sorter can run in auto mode because it falls back to disk if there
			// is not enough memory available.
			result.CanRunInAutoMode = true

Hash aggregator has been merged.


pkg/sql/colexec/external_sort.go, line 279 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

We can always set this to 1. Makes me wonder if we should try tuning the number of partitions to reduce to at some point and seeing the performance differences.

Probably we should.


pkg/sql/colexec/external_sort.go, line 238 at r5 (raw file):

		case externalSorterRepeatedMerging:
			// We will merge all partitions in range [s.firstPartitionIdx,
			// s.firstPartitionIdx+numPartitionsToMerge) and will spill all the

nit: s/numPartitionsToMerge/s.numPartitions/g.


pkg/sql/colflow/vectorized_flow.go, line 945 at r6 (raw file):

		if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.Vectorize192Auto &&
			pspec.Output[0].Type == execinfrapb.OutputRouterSpec_BY_HASH {
			// exec.HashRouter is not supported when vectorize=192auto since it can

nit: s/exec/colexec/g.


pkg/sql/distsql/columnar_operators_test.go, line 264 at r4 (raw file):

// currently support in the vectorized engine with Ints.
// TODO(asubiotto): Remove after we support interval serialization.
//  Hmm, should we disallow sorts/joins with Intervals when we switch it to

Yeah, that sounds reasonable.


pkg/sql/distsql/columnar_operators_test.go, line 381 at r4 (raw file):

						if spillForced {
							ensureSerializableTypes(inputTypes)
							for i, t := range inputTypes {

This for loop seems redundant.


pkg/sql/execinfra/server_config.go, line 137 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

So that we can acquire/release > 1 resource (if we only ever needed to Acquire(1), we could just use a channel). The "weight" just refers to the number used when calling Acquire, and it's weighted in the sense that some callers will have a greater weight/i.e. more resources acquired.

I see, thanks.

This is used by the DiskQueue to delete all leftover files in its directory
as well as the directory itself, since DeleteDir fails if the directory has
any children.

Vectorized flows now also implement a hacky RemoveAll on cleanup since there
might be leftover files and directories but DeleteDir fails if so.

Release note: None (code movement)
Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed comments for another round, but haven't taken a proper look at CI failures, although probably related to sorting with auto. Should we contain enabling sorting with auto in another PR? I'm worried that this PR is starting to do too much.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/distsql_running.go, line 150 at r6 (raw file):

Previously, yuzefovich wrote…

We need to add the condition for VectorizeAuto here as well.

Done.


pkg/sql/colcontainer/partitionedqueue_test.go, line 301 at r5 (raw file):

Previously, yuzefovich wrote…

Probably this needs to be removed as well?

This one can just be moved above.


pkg/sql/colexec/execplan.go, line 790 at r6 (raw file):

Previously, yuzefovich wrote…

Hash aggregator has been merged.

I'm going to hold off turning it on in this PR to reduce the amount of churn.


pkg/sql/colexec/external_sort.go, line 238 at r5 (raw file):

Previously, yuzefovich wrote…

nit: s/numPartitionsToMerge/s.numPartitions/g.

Done.


pkg/sql/colflow/vectorized_flow.go, line 945 at r6 (raw file):

Previously, yuzefovich wrote…

nit: s/exec/colexec/g.

Done.


pkg/sql/distsql/columnar_operators_test.go, line 264 at r4 (raw file):

Previously, yuzefovich wrote…

Yeah, that sounds reasonable.

Done.


pkg/sql/distsql/columnar_operators_test.go, line 381 at r4 (raw file):

Previously, yuzefovich wrote…

This for loop seems redundant.

Done. Oops

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at some of the failures, and nothing seems concerning to me. I think we should proceed with auto.

Reviewed 8 of 32 files at r7, 5 of 13 files at r8, 22 of 24 files at r9, 6 of 6 files at r10.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colcontainer/partitionedqueue.go, line 149 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Done. Let me know if it looks good.

There are a couple of lint failures left.


pkg/sql/colexec/execplan.go, line 790 at r6 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I'm going to hold off turning it on in this PR to reduce the amount of churn.

Ok, makes sense.


pkg/sql/colexec/execplan.go, line 700 at r10 (raw file):

			for _, t := range inputTypes {
				if t == coltypes.Interval {
					return result, errors.WithIssueLink(errors.Errorf("sort on interval type not supported"), errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/45392"})

This check should be moved above, where the TODO is, and here we should panic if we see interval type. The idea is that we will wrap rowexec.Sorter for now into the vectorized flow when interval type is present.


pkg/sql/colexec/external_sort.go, line 242 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Should we assume that s.numPartitions == s.maxNumberPartitions when transitioning to this stage? Might be easier. This state will also always transition to a new partition after merging.

Yeah, I think it's safe to assume so now. The comments for externalSorterStates need to be slightly adjusted.


pkg/sql/colexec/utils_test.go, line 386 at r10 (raw file):

				// Some operators need an explicit Close if not drained completely of
				// input.
				assert.NoError(t, c.Close())

Why do we use require in one spot and assert in another?


pkg/sql/distsql/columnar_operators_test.go, line 299 at r10 (raw file):

					if rng.Float64() < randTypesProbability {
						inputTypes = generateRandomSupportedTypes(rng, nCols)
						if spillForced {

I think in all of these cases we want to ensure serializable types regardless of value of spillForced.

The Partitioner interface has also been renamed to PartitionedQueue. The
on-disk implementation of a PartitionedQueue is now used by both the external
sorter and joiner, meaning we now have actual disk spilling for these
operators! Note that there are still some items left to take care of stability
wise before disk spilling is production ready.

Release note: None (disk spilling is not enabled by default, this will be
called out in a release note once vectorize=auto has this behavior.)
Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colcontainer/partitionedqueue.go, line 149 at r3 (raw file):

Previously, yuzefovich wrote…

There are a couple of lint failures left.

Done.


pkg/sql/colexec/execplan.go, line 700 at r10 (raw file):

Previously, yuzefovich wrote…

This check should be moved above, where the TODO is, and here we should panic if we see interval type. The idea is that we will wrap rowexec.Sorter for now into the vectorized flow when interval type is present.

It should be fine to return an error here too, right? Not sure we need to panic.


pkg/sql/colexec/external_sort.go, line 242 at r3 (raw file):

Previously, yuzefovich wrote…

Yeah, I think it's safe to assume so now. The comments for externalSorterStates need to be slightly adjusted.

Done. Not sure exactly what to change but I rewrote a small part, let me know what you think.


pkg/sql/colexec/utils_test.go, line 386 at r10 (raw file):

Previously, yuzefovich wrote…

Why do we use require in one spot and assert in another?

Because the tests themselves do this (one uses assert and the other uses require). I'm not sure why this test uses assert, I think we should use require, the difference being that require fails the test immediately, while assert keeps it going (albeit with an error) I guess. We can change it in another PR.


pkg/sql/distsql/columnar_operators_test.go, line 299 at r10 (raw file):

Previously, yuzefovich wrote…

I think in all of these cases we want to ensure serializable types regardless of value of spillForced.

Done.

@asubiotto asubiotto requested a review from a team as a code owner February 25, 2020 21:40
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: let's get this in!

I see that the build is red with two failures: TestVectorizeAllocateSpaceError is probably missing disk queue config in NewColOperatorArgs, and merge joiner is complaining on fakedist-vec-disk config - we probably should make merge joiner "non-streaming" when both sides are keys for now, until the spilling to disk is in-place.

Reviewed 1 of 29 files at r11, 22 of 24 files at r12, 22 of 22 files at r13.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colcontainer/partitionedqueue.go, line 151 at r13 (raw file):

// only be true if reopening a different file in a new scope, to avoid the
// overhead of re-entering the semaphore.
func (p *PartitionedDiskQueue) closeWritePartition(idx int, releaseFD bool) error {

nit: we probably should change the boolean argument to a custom enum struct.


pkg/sql/colexec/execplan.go, line 700 at r10 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

It should be fine to return an error here too, right? Not sure we need to panic.

Well, isSupported method above must have been called at this point, so we should not be trying to create a sort op with interval type, so this is definitely unexpected behavior.


pkg/sql/colexec/execplan.go, line 210 at r13 (raw file):

			}
		}
		for _, t := range inputTypes {

Duplicate loop.


pkg/sql/colexec/external_sort.go, line 242 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Done. Not sure exactly what to change but I rewrote a small part, let me know what you think.

LGTM, thanks!


pkg/sql/colexec/external_sort.go, line 107 at r13 (raw file):

	NonExplainable

	closed             bool

nit: for this we probably should introduce another type enum, something like externalOperatorCloseStatus.


pkg/sql/colexec/utils_test.go, line 386 at r10 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Because the tests themselves do this (one uses assert and the other uses require). I'm not sure why this test uses assert, I think we should use require, the difference being that require fails the test immediately, while assert keeps it going (albeit with an error) I guess. We can change it in another PR.

Ok, sgtm.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TestVectorizeAllocateSpaceError was failing only on the penultimate commit due to a nil FDSemaphore. I'm surprised that it didn't fail on the commit where I initially add the DiskQueue since I don't actually add the DiskQueueCfg, but it doesn't seem to spill due to the external hash joiner getting a zero-length batch from both inputs. Is this expected?

Also ran into another failure with make testlogic FILES=distinct where we get an error that projectingBatch doesn't support ColVecs. This is because we call ColVecs in RetainBatch which makes sense if we have a projection above a sorter. I added support in projectingBatch for ColVecs, let me know what you think.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colcontainer/partitionedqueue.go, line 151 at r13 (raw file):

Previously, yuzefovich wrote…

nit: we probably should change the boolean argument to a custom enum struct.

Done.


pkg/sql/colexec/execplan.go, line 700 at r10 (raw file):

Previously, yuzefovich wrote…

Well, isSupported method above must have been called at this point, so we should not be trying to create a sort op with interval type, so this is definitely unexpected behavior.

Done.


pkg/sql/colexec/execplan.go, line 210 at r13 (raw file):

Previously, yuzefovich wrote…

Duplicate loop.

Done. thanks.


pkg/sql/colexec/external_sort.go, line 107 at r13 (raw file):

Previously, yuzefovich wrote…

nit: for this we probably should introduce another type enum, something like externalOperatorCloseStatus.

I feel like that's overkill, no? We have booleans indicating whether something is closed/done in a bunch of places. Happy to change if you feel strongly about this though.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in TestVectorizeAllocateSpaceError - we never set the length on batch. Probably current behavior is somewhat expected given this bug.

projectingBatch stuff LGTM.

Reviewed 2 of 24 files at r14, 23 of 23 files at r15.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto and @yuzefovich)


pkg/sql/colexec/external_sort.go, line 107 at r13 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I feel like that's overkill, no? We have booleans indicating whether something is closed/done in a bunch of places. Happy to change if you feel strongly about this though.

Sounds good. I don't feel strongly, but Andrei does :D

The vectorized execution engine could previously open an unlimited number of
files when performing an external sort, join, or when hash routing. A weighted
semaphore is introduced in this commit with a default maximum of 256, which
can be tweaked with the COCKROACH_VEC_MAX_OPEN_FDS environment variable.

To reduce the number of open file descriptors during a sort or a join, it is
now illegal to enqueue to a partition that has been dequeued from. This doesn't
require a change in sorter or joiner behavior and allows the write file
descriptor to be closed as soon as a partition is read from.

This commit also adds file descriptor awareness and tests to the
PartitionedQueue.

Release note: None (feature not present in past release)
@asubiotto
Copy link
Contributor Author

asubiotto commented Feb 26, 2020

Getting some logic test failures with some weird aggregation results. I think I'm going to switch back to not yet enabling external sort + hash routing in auto and enable + work through the failures in another PR. Seems like the issue is actually in the external sorter + disk queues, will investigate.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, seems like an issue with the external sorter. All other failures are minor.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto and @yuzefovich)


pkg/sql/colflow/vectorized_flow.go, line 1143 at r15 (raw file):

		output = &execinfra.RowChannel{}
	}
	creator := newVectorizedFlowCreator(newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, nil, output, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, nil)

This has to be non-nil.

@asubiotto
Copy link
Contributor Author

After looking into them the tests seem to be legitimate failures that need some more work (some seem to be due to rocksdb in memory engine filesystem instability). It makes sense to merge this PR in experimental_on mode and slowly enable auto and fix these failures in a separate PR.

@yuzefovich
Copy link
Member

Filed #45481 for one of the bugs.

This commit adds vectorize="192auto", which retains the behaviour of running
vectorize="auto". This commit does not change the behavior of vectorize="auto"
yet, a future PR will start enabling disk-spilling operators in this mode.

Release note: None (a new value for vectorize was added but since this doesn't
change anything semantically, it will be called out in a future release note).
@asubiotto
Copy link
Contributor Author

bors r=yuzefovich

@craig
Copy link
Contributor

craig bot commented Feb 27, 2020

Build succeeded

@craig craig bot merged commit 9c72e72 into cockroachdb:master Feb 27, 2020
@asubiotto asubiotto deleted the prtn branch February 27, 2020 17:58
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 13, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 14, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 14, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
craig bot pushed a commit that referenced this pull request Jul 14, 2022
84324: sqlsmith: make order-dependent aggregation functions deterministic r=msirek,mgartner a=michae2

**cmd: add smith command**

Add a new top-level command `smith` which dumps randomly-generated
sqlsmith queries. This is useful for testing modifications to sqlsmith.

Assists: #83024

Release note: None

**sqlsmith: make order-dependent aggregation functions deterministic**

Some aggregation functions (e.g. string_agg) have results that depend
on the order of input rows. To make sqlsmith more deterministic, add
ORDER BY clauses to these aggregation functions whenever their argument
is a column reference. (When their argument is a constant, ordering
doesn't matter.)

Fixes: #83024

Release note: None

84398: colflow: prevent deadlocks when many queries spill to disk at same time r=yuzefovich a=yuzefovich

**colflow: prevent deadlocks when many queries spill to disk at same time**

This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in #45318 and partially mitigated in #45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Fixes: #80290.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.

**roachtest: remove some debugging printouts in tpch_concurrency**

This was added to track down the deadlock fixed in the previous commit,
so we no longer need it.

Release note: None

84430: sql/schemachanger/scplan: allow plan to move to NonRevertible earlier r=ajwerner a=ajwerner

This is critical for DROP COLUMN. In `DROP COLUMN` we cannot perform the
primary index swap until the dropping column reaches `DELETE_ONLY`, which is
not revertible. The primary index swap itself is revertible. Given this fact,
we need a mechanism to be able to "promote" revertible operations (operations
which don't destroy information or cause irrevocable visible side effects) to
be grouped with or after non-revertible operations. This commit makes that
happen naturally.

Release note: None

84433: rowexec: increase the batch size for join reader ordering strategy r=yuzefovich a=yuzefovich

This commit increases the default value of
`sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting
(which determines the input row batch size for the lookup joins when
ordering needs to be maintained) from 10KiB to 100KiB since the previous
number is likely to have been too conservative. We have seen support
cases (https://github.com/cockroachlabs/support/issues/1627) where
bumping up this setting was needed to get reasonable performance, we
also change this setting to 100KiB in our TPC-E setup
(https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404).

I did some historical digging, and I believe that the number 10KiB was
chosen somewhat arbitrarily with no real justification in #48058. That
PR changed how we measure the input row batches from the number of rows
to the memory footprint of the input rows. Prior to that change we had
100 rows limit, so my guess that 10KiB was somewhat dependent on that
number.

The reason we don't want this batch size to be too large is that we
buffer all looked up rows in a disk-backed container, so if too many
responses come back (because we included too many input rows in the
batch), that container has to spill to disk. To make sure we don't
regress in such scenarios this commit teaches the join reader to lower
the batch size bytes limit if the container does spill to disk, until
10 KiB which is treated as the minimum.

Release note: None

Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 19, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 20, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries if multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead. If a user knows that there is no deadlock and that some
analytical queries that have spilled just taking too long, blocking
other queries from spilling, and is ok with waiting for longer, the user
can adjust newly introduced `sql.distsql.acquire_vec_fds.max_retries`
cluster setting (using 0 to get the previous behavior of indefinite
waiting until spilling resources open up).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants