-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: take the concurrencyLimit from feature flags and keep in dependencies #4564
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, it seems reasonable to me.
I am curious if @nathanielc has thoughts on this approach.
if concurrencyQuota > int(execOptions.ConcurrencyLimit) { | ||
concurrencyQuota = int(execOptions.ConcurrencyLimit) | ||
} else if concurrencyQuota == 0 { | ||
concurrencyQuota = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious about this branch here. If we had a trivial query whose execution graph just had a ReadWindowAggregate
node, I guess concurrencyQuota
could be 0. Do we require it to be positive even if there are no non-sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be mistaken, but my guess is there needs to be at least one goroutine to work the consecutive transport belonging to the source nodes. The source nodes themselves I think just deposit messages to the outgoing dataset and that's as far as the source goroutines take it. There needs to be a dispatcher thread that reads those messages and writes to CSV writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anecdotally, during recent refactors when I failed to mark any nodes as roots (my mistake) the concurrency quota was set to zero, which produced an error. It didn't seem like there was any conditional around that check since it failed for from/range/filter which I think would have be rewritten as a single source.
Lines 80 to 85 in dc08c57
func validatePlan(p *plan.Spec) error { | |
if p.Resources.ConcurrencyQuota == 0 { | |
return errors.New(codes.Invalid, "plan must have a non-zero concurrency quota") | |
} | |
return nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right makes sense.
Given Go's convention of having a useful zero value, I wonder if we should change the meaning of concurrency quota to be the number of additional goroutines after the required one. Or maybe 0
should just mean the default of 1
.
Nothing that needs to change here for this PR, it just seems a little weird.
15c350b
to
2b61cb2
Compare
Pull the concurrencyLimit from feature flags at the start of the execution process, before planning, and stash it in the ExecutionOptions, which live in the ExecutionDependencies. From there it can then be modified by planner rules. This allows parallelization rules to raise the limit if they parallelize a query. At the same time we move the defaultMemoryLimit from the planner and into the execution options. We also move the computation of memory limit and concurrency quota from the planner and into the executor. Included are test cases covering the existing and the new method of determining query concurrency.
2b61cb2
to
9a4aad1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I agree with Chris that the handling of that 0 case feels a little clumsy, but I don't really see what we can do about it. 🤷
Pull the
concurrencyLimit
from feature flags at the start of the executionprocess, before planning, and stash it in the
ExecutionOptions
, which live inthe
ExecutionDependencies
. From there it can then be modified by planner rules.This allows parallelization rules to raise the limit if they parallelize a
query.
At the same time we move the
defaultMemoryLimit
from the planner and into theexecution options. We also move the computation of memory limit and concurrency
quota from the planner and into the executor.
Included are test cases covering the existing and the new method of
determining query concurrency.
Done checklist