-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
colexec: introduce batches with dynamic capacity #52453
Conversation
9b76fed
to
11c0672
Compare
bf08d14
to
3a93872
Compare
Alright, I think this is RFAL. I run a quick benchmark of KV95 workload on 3 node roachprod cluster on my laptop with
I'd take it with a grain of salt (maybe they are due to variance of running the benchmark on the mac), but it looks like we might be able to get rid off |
Some TPCC numbers, 3 node roachprod cluster with 100 warehouses, 1 minute of ramp and 5 minutes of load:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to see this and awesome results
Reviewed 68 of 68 files at r1, 22 of 22 files at r2, 13 of 13 files at r3.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/col/coldata/batch.go, line 128 at r1 (raw file):
func NewMemBatchNoCols(typs []*types.T, capacity int) Batch { if max := math.MaxUint16; capacity > max { panic(fmt.Sprintf(`batches cannot have length larger than %d; requested %d`, max, capacity))
s/length/capacity
pkg/col/coldata/vec.go, line 134 at r1 (raw file):
// Capacity returns the capacity of the Golang's slice that is underlying // this Vec. Note that if there is no "slice" (like in case of flat bytes), // then "capacity" of such object is equal to its "length".
I think this last sentence is a bit vague (maybe just to me) how about: then the capacity is equal to the number of elements
pkg/sql/colexec/dynamic_batch_size_helper.go, line 29 at r3 (raw file):
} // DynamicBatchSizeHelper is a utility struct that helps operators work with
I think this comment can use more fleshing out, i.e. how does it "help"?
pkg/sql/colexec/dynamic_batch_size_helper.go, line 40 at r3 (raw file):
// grow the allocated capacity of the batch exponentially, until the batch // reaches coldata.BatchSize(). func (d *DynamicBatchSizeHelper) ResetMaybeReallocate(
Why not make this a part of the allocator? Also, all operators should now be calling this method, right? Are we enforcing this in any way? What valid uses of the normal Reset
are there?
pkg/sql/colexec/mergejoiner.go, line 606 at r1 (raw file):
bufferedGroup = &o.proberState.rBufferedGroup } // TODO(yuzefovich): reuse the same scratch batches when spillingQueue
I have an old branch that I'm hoping to revive
pkg/sql/colexec/routers.go, line 415 at r3 (raw file):
for toAppend := len(selection); toAppend > 0; { if o.mu.pendingBatch == nil { // TODO(yuzefovich): consider whether this should be a dynamic batch.
It's a good question. I don't think so because I consider this a fixed-size scratch buffer that we flush from. What do you think?
pkg/sql/colexec/sorttopk.go, line 60 at r3 (raw file):
// its input. topKSortSpooling topKSortState = iota // topKSortSpooling is the second state of the operator, indicating that
nit: s/topKSortSpooling/topKSortEmitting
pkg/sql/colmem/allocator.go, line 94 at r1 (raw file):
// NewMemBatchWithMaxCapacity allocates a new in-memory coldata.Batch of // coldata.BatchSize() capacity. func (a *Allocator) NewMemBatchWithMaxCapacity(typs []*types.T) coldata.Batch {
Why not keep this as NewMemBatch
?
pkg/sql/colmem/allocator.go, line 100 at r1 (raw file):
// NewMemBatchWithFixedCapacity allocates a new in-memory coldata.Batch with // the given capacity. func (a *Allocator) NewMemBatchWithFixedCapacity(typs []*types.T, capacity int) coldata.Batch {
Why Fixed
if it's going to be dynamic?
3a93872
to
71cd00f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)
pkg/col/coldata/batch.go, line 128 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
s/length/capacity
Done.
pkg/col/coldata/vec.go, line 134 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think this last sentence is a bit vague (maybe just to me) how about:
then the capacity is equal to the number of elements
Done.
I agree, it's a bit vague for non-slice-backed typed, but currently this method is only used to get the memory footprint, so it's ok if we don't define the contract perfectly. I think in the future we should be able to have pools of vectors of all types of different capacities that would be taken from by colmem.Allocator
objects, but we're not there yet.
pkg/sql/colexec/dynamic_batch_size_helper.go, line 29 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think this comment can use more fleshing out, i.e. how does it "help"?
Expanded the comment.
Update: removed the struct.
pkg/sql/colexec/dynamic_batch_size_helper.go, line 40 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Why not make this a part of the allocator? Also, all operators should now be calling this method, right? Are we enforcing this in any way? What valid uses of the normal
Reset
are there?
Made it a part of the allocator.
No, not all operators are expected to use this method - only those for which it makes sense to have the "dynamic size" behavior. My thinking is that all operators that instantiate batches to be returned as their output can be roughly divided into two groups:
- in the first group, the work that operator needs to perform in order to produce a single tuple into the output is about the same, regardless whether that tuple is first, second, or last in the whole output stream (examples of such operators are
cFetcher
andcolumnarizer
). Operators in this group want the "dynamic size" behavior. - in the second group, the work that operator needs to perform in order to produce a single tuple is not "distributed uniformly" among all tuples (examples are hash joiner and hash aggregator). Such operators don't want the "dynamic size" behavior because it wouldn't be beneficial because usually such operators perform other non-batch-related "internal" allocations of different things, so it wouldn't really matter if their output batch behaved dynamically.
In this PR, I first looked over all usages of Allocator.NewMemBatch
method to separate them into NewMemBatchWithMaxCapacity
and NewMemBatchWithFixedCapacity
, and the usages of the latter definitely don't need the dynamic behavior. Then, I looked at all usages of NewMemBatchWithMaxCapacity
in the non-test files and singled out those that I think would benefit from the dynamic behavior, and I converted all such cases to the new pattern. The only operator I wasn't sure about is routerOutputOp
, but I think that should use "max capacity" batch.
ResetMaybeReallocate
effectively replaces outputBatch.ResetInternalBatch
in the operators that want the dynamic behavior. However, since some operators still want the fixed behavior, we need to keep coldata.Batch.ResetInternalBatch
.
pkg/sql/colexec/mergejoiner.go, line 606 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I have an old branch that I'm hoping to revive
Done.
pkg/sql/colexec/routers.go, line 415 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
It's a good question. I don't think so because I consider this a fixed-size scratch buffer that we flush from. What do you think?
Yeah, I think so too.
pkg/sql/colmem/allocator.go, line 94 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Why not keep this as
NewMemBatch
?
I think NewMemBatch
is a little too generic, and I want to force the user of Allocator
to think through whether a batch with fixed size should be used, a batch with maximum size, or a dynamic batch. I'm worried that NewMemBatch
will be considered as the default option without giving it any thought.
pkg/sql/colmem/allocator.go, line 100 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Why
Fixed
if it's going to be dynamic?
The batch itself is not dynamic in size - we currently will allocate a new batch with a bigger size.
71cd00f
to
92f44b3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 35 of 35 files at r4, 22 of 22 files at r5, 12 of 12 files at r6.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/sql/colexec/dynamic_batch_size_helper.go, line 40 at r3 (raw file):
Previously, yuzefovich wrote…
Made it a part of the allocator.
No, not all operators are expected to use this method - only those for which it makes sense to have the "dynamic size" behavior. My thinking is that all operators that instantiate batches to be returned as their output can be roughly divided into two groups:
- in the first group, the work that operator needs to perform in order to produce a single tuple into the output is about the same, regardless whether that tuple is first, second, or last in the whole output stream (examples of such operators are
cFetcher
andcolumnarizer
). Operators in this group want the "dynamic size" behavior.- in the second group, the work that operator needs to perform in order to produce a single tuple is not "distributed uniformly" among all tuples (examples are hash joiner and hash aggregator). Such operators don't want the "dynamic size" behavior because it wouldn't be beneficial because usually such operators perform other non-batch-related "internal" allocations of different things, so it wouldn't really matter if their output batch behaved dynamically.
In this PR, I first looked over all usages of
Allocator.NewMemBatch
method to separate them intoNewMemBatchWithMaxCapacity
andNewMemBatchWithFixedCapacity
, and the usages of the latter definitely don't need the dynamic behavior. Then, I looked at all usages ofNewMemBatchWithMaxCapacity
in the non-test files and singled out those that I think would benefit from the dynamic behavior, and I converted all such cases to the new pattern. The only operator I wasn't sure about isrouterOutputOp
, but I think that should use "max capacity" batch.
ResetMaybeReallocate
effectively replacesoutputBatch.ResetInternalBatch
in the operators that want the dynamic behavior. However, since some operators still want the fixed behavior, we need to keepcoldata.Batch.ResetInternalBatch
.
Did you write this comment before the 1:1? I think it's still worth discussing whether we want to have these two separate groups or just have dynamic batch sizes everywhere. I prefer going down the route of having dynamic batch sizes everywhere because it makes programming simpler and the cost of dynamic batch sizes should be amortized. Also, it's not clear that the second group is that clearly defined, e.g. I think you put the hash joiner in the second group but we brought up the case of a single row join. We didn't finish that discussion because we had to leave to the next meeting, but we were talking about how it needed to allocate a hash table anyway.
Is there a way we could measure the performance impact of having dynamic batch sizes everywhere?
Might be good to discuss the above point at standup cc @jordanlewis @helenmhe |
92f44b3
to
c0337fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)
pkg/sql/colexec/dynamic_batch_size_helper.go, line 40 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Did you write this comment before the 1:1? I think it's still worth discussing whether we want to have these two separate groups or just have dynamic batch sizes everywhere. I prefer going down the route of having dynamic batch sizes everywhere because it makes programming simpler and the cost of dynamic batch sizes should be amortized. Also, it's not clear that the second group is that clearly defined, e.g. I think you put the hash joiner in the second group but we brought up the case of a single row join. We didn't finish that discussion because we had to leave to the next meeting, but we were talking about how it needed to allocate a hash table anyway.
Is there a way we could measure the performance impact of having dynamic batch sizes everywhere?
Yes, I did write this down before our 1:1.
I have gone ahead and audited all usages of NewMemBatchWith*Capacity
methods in non-test files and added the dynamic batch size behavior in several places. I also added a linter to prohibit calls to NewMemBatchWithMaxCapacity
from non-test files so that the engineer was forced to think whether dynamic batch size behavior is desired. The only operators that haven't been converted (but in theory which could have been) are the aggregators (because reallocating an output batch breaks the contract of aggregate functions) and relative_rank
operators (because that code is already pretty hard to reason about, and introduction of dynamic batch size would likely make things worse without giving any performance benefit).
I think it's not worth spending more time on this at this point.
1200b04
to
53b2018
Compare
I figured out why the tests were failing (problem with falling over from in-memory hash joiner to the external hash joiner on |
🎉 All dependencies have been resolved ! |
53b2018
to
b556ae9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but are the SIGQUIT
CI failures concerning?
Reviewed 79 of 79 files at r7, 68 of 68 files at r8, 26 of 26 files at r10, 34 of 34 files at r11.
Reviewable status: complete! 1 of 0 LGTMs obtained
I'm thinking it's logic tests timeouts. |
That's surprising to me, (edited with updated link) |
Since we vary batch size, a single run might not be representative. I also saw the timeout failure on your PR for context cancellation fix. I have a hypothesis that dynamic batch size logic might be making tests run even slower (to be confirmed), so the timeouts are more likely to occur. Another thing is that I think I've seen timeouts a couple of times recently, so I'm pretty sure that currently on master we're pretty close to 30 minutes often. |
I'm not so sure. Given that this change modifies a pretty fundamental part of the code, isn't it likely that there's something else going on? Do you have a link to those timeouts? Combing through the last ~10 runs on master https://teamcity.cockroachdb.com/viewType.html?buildTypeId=Cockroach_EssentialCi&branch_Cockroach=%3Cdefault%3E&tab=buildTypeStatusDiv the runtime is more or less steady at ~20-22 mins with one run at 25mins. Unfortunately I don't think it's easy to see the batch size for successful builds. |
I'm not sure either. I want to wait for another CI build on this branch before jumping to any conclusions, but I thought I'd share my current guess. |
I see what you mean regarding timeouts in the context cancellation PR. It doesn't seem like that's a normal timeout. It looks like
|
I'm pretty sure the failures are timeouts - the dump of goroutines on SIGQUIT shows a bunch of them have been running around 29-30 minutes, possibly it's a coincidence, but I think it's more likely to be timeouts (locally, the files that the tests failed on pass when ran one at a time). |
This commit introduces the concept of capacity to `coldata.Batch` which describes the maximum number of tuples the batch can store. Note that it is a lower bound meaning that some vectors in the batch might have larger underlying capacity (e.g. when they were appended to). Additionally this commit does several mechanical changes to rename the methods. Release note: None
Ordered aggregator, hash and merge joiners, and hash router had custom input/output batch size logic that was put in place in order to increase testing. This, however, is no longer required since we now randomize `coldata.BatchSize()` value during the test runs, so that custom logic is now removed. Additionally, this commit removes several unit tests of the merge joiner which are now exact copies of each other (previously, they had different output batch size set). One notable change is that this commit removes a tiny optimization from the merge joiner when there are no output columns (meaning we have a COUNT query). This work has been done in order to ease follow-up work on the dynamic batch sizes. Release note: None
276a0a2
to
4b39ee2
Compare
There was a simple bug in I'll wait for a CI run and merge if green. |
This commit introduces `ResetMaybeReallocate` method on `colmem.Allocator` which might allocate a new batch (it uses an exponential capacity growth until `coldata.BatchSize()` and also supports a minimum capacity argument). The method can be used by the operators that want "dynamic batch size" behavior. All usages of `NewMemBatchWithMaxCapacity` and `NewMemBatchWithFixedCapacity` in non-test files have been audited, and most of the operators have been updated to exhibit the dynamic batch size behavior (most notable exception to this are the aggregators because currently aggregate functions hold on their output vectors, so we can't just reallocate an output batch). The usage of `NewMemBatchWithMaxCapacity` is now prohibited in non-test files by a linter in order to encourage the engineers to think whether a dynamic batch size behavior is desired. Release note: None
4b39ee2
to
895125b
Compare
TFTR! bors r+ |
Build succeeded: |
Depends on #52728.
col, colexec: introduce the concept of capacity to coldata.Batch
This commit introduces the concept of capacity to
coldata.Batch
whichdescribes the maximum number of tuples the batch can store. Note that it
is a lower bound meaning that some vectors in the batch might have
larger underlying capacity (e.g. when they were appended to).
Additionally this commit does several mechanical changes to rename the
methods.
Release note: None
colexec: remove custom input/output batch size logic from few places
Ordered aggregator, hash and merge joiners, and hash router had custom
input/output batch size logic that was put in place in order to increase
testing. This, however, is no longer required since we now randomize
coldata.BatchSize()
value during the test runs, so that custom logicis now removed.
Additionally, this commit removes several unit tests of the merge joiner
which are now exact copies of each other (previously, they had different
output batch size set).
One notable change is that this commit removes a tiny optimization from
the merge joiner when there are no output columns (meaning we have
a COUNT query).
This work has been done in order to ease follow-up work on the dynamic
batch sizes.
Release note: None
colexec: use batches with dynamic capacity in several operators
This commit introduces
ResetMaybeReallocate
method oncolmem.Allocator
which might allocate a new batch (it uses anexponential capacity growth until
coldata.BatchSize()
and alsosupports a minimum capacity argument). The method can be used by the
operators that want "dynamic batch size" behavior. All usages of
NewMemBatchWithMaxCapacity
andNewMemBatchWithFixedCapacity
innon-test files have been audited, and most of the operators have been
updated to exhibit the dynamic batch size behavior (most notable
exception to this are the aggregators because currently aggregate
functions hold on their output vectors, so we can't just reallocate an
output batch). The usage of
NewMemBatchWithMaxCapacity
is nowprohibited in non-test files by a linter in order to encourage the
engineers to think whether a dynamic batch size behavior is desired.
Resolves: #49796.
Release note: None