-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jobs: reduce round trips for control queries #119355
jobs: reduce round trips for control queries #119355
Conversation
09cf01e
to
e2b60f1
Compare
if err != nil { | ||
return err | ||
} | ||
job := Job{registry: r, id: jobID} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instantiating a job struct directly like this could be unsafe if a subsequent method on the job struct assumes the job struct contains some state. It also seems like we do this elsewhere. Yet another job todo would be to create some sort of statelessJob
interface on which we can safely call these methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely agree having some different types here would be good. I think I'll defer that for now to avoid increasing the blast radius of this change.
pkg/jobs/update.go
Outdated
return fmt.Errorf("job with status %s cannot be requested to be paused", md.Status) | ||
} | ||
if fn != nil { | ||
if err := ju.PauseRequestFuncRunner(ctx, fn, md); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the jobUpdater
doctstring, it is merely responsible for accumulating metadata changes to be persisted to disk, but with this change, the jobUpdater
now executes the onPauseRequestFunc
, which I think can do much more than update in memory metadata.
This is making it harder for me to distinguish what an updater
does and what a jobUpdater
does. Could you add some comments to updater
which explains what it does and its relation to jobUpdater
? Further, could you add some commentary to jobUpdater
definition that explains what jobUpdater.PauseRequestFunc
and jobUpdater.PauseRequestFuncRunner
do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm. Now i feel like we should just delete this updater/JobUpdater.PauseRequestedWithFunc
logic. The only user of it is changefeeds, which use it to update their job status. I think they can update their job status in a much less complicated way. Further, no one uses the PauseRequesterInterface
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could also land this refactor, and chat with cdc about removing PauseRequestedWithFunc
afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a second look at this. Given that as far as I can tell no jobs actually implement OnPauseRequest, we can completely remove that method. That allows us to simplify the implementation of PauseRequestedWithFunc such that the caller just gets passed the job updater which I think will fix the semantic problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that in this PR or we will never do it.
@@ -66,30 +67,26 @@ func (n *controlJobsNode) startExec(params runParams) error { | |||
return errors.AssertionFailedf("%q: expected *DInt, found %T", jobIDDatum, jobIDDatum) | |||
} | |||
|
|||
job, err := reg.LoadJobWithTxn(params.ctx, jobspb.JobID(jobID), params.p.InternalSQLTxn()) | |||
if err != nil { | |||
if err := reg.UpdateJobWithTxn(params.ctx, jobspb.JobID(jobID), params.p.InternalSQLTxn(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice cleanup!
e2b60f1
to
c078f30
Compare
c078f30
to
b70cb4e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cleanup!
@@ -1309,15 +1309,12 @@ func (r *Registry) cancelRequested(ctx context.Context, txn isql.Txn, id jobspb. | |||
func (r *Registry) PauseRequested( | |||
ctx context.Context, txn isql.Txn, id jobspb.JobID, reason string, | |||
) error { | |||
job, resumer, err := r.getJobFn(ctx, txn, id) | |||
job, _, err := r.getJobFn(ctx, txn, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could use LoadJobWIthTxn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:D Yeah, have a commit locally that removes getJobFn completely. I'll push that after merging this.
TFTR! bors r=msbutler |
Build failed (retrying...): |
Build failed: |
bors cancel |
Forgot to push some test fixups. |
This adds PAUSE, RESUME, and CANCEL JOB to the rttanalysis tests for jobs. Epic: none Release note: None
Previously, we were loading the job twice during control operations since we were both calling registry.LoadJobWithTxn and Update (which also loads the job). Removing this double load required re-arranging some code so that could be run in the context of the job updater. Epic: None Release note: None
We were marking Progress and updated despite not actually writing anything to progress. Resulting in a little bit of unncessary work. Epic: None Release note: None
b70cb4e
to
09e5cd2
Compare
bors r=msbutler |
Build succeeded: |
118907: build: update patch with bug fix to runtime grunning r=sumeerbhola,rickystewart a=aadityasondhi In our previous version of this patch, we missed two entry points and one exit point into the grunning state of a Goroutine. This led to `grunning.Time()` being non-monotonic. This new patch adds those missing integration points. Fixes #95529. Release note: None 119356: jobs: remove double-load from metrics poller r=msbutler a=stevendanna All but the last commit is #119355 Previously, this code loaded the job twice, once via LoadJobWithTxn and again via the call to Unpaused. Here, we re-arrange the code so that it only loads it once. Epic: none Release note: None 119480: opt: reduce allocations for filter implication with disjunctions r=mgartner a=mgartner This commit reduces allocations in `Implicator.FiltersImplyPredicate` by using stack-allocated arrays when there are five or less adjacent disjunctions in the filter or partial index predicate. This should cover the most common cases of filters and predicates with disjunctions. Epic: None Release note: None 119519: ttljob: skip TestSpanToQueryBoundsCompositeKeys under stress r=fqazi a=annrpom This patch skips TestSpanToQueryBoundsCompositeKeys under stress/stressrace as it often hits OOM issues under these conditions. Epic: none Release note: None Co-authored-by: Aaditya Sondhi <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Marcus Gartner <[email protected]> Co-authored-by: Annie Pompa <[email protected]>
The APIs provided by the job system makes it very easy for calling code
to inadvertently do a good deal of extra work.
This small PR addresses one such example. The control operations were
all loading the job record twice.
In the newly added rttanalysis test for these operations, a bit of code-re-arrangement gets
a nice reduction in the number of KV batches sent for these queries:
Before:
After:
Epic: None