-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jobs,changefeedccl: only load adopted jobs #70612
Conversation
Still a draft as I'm writing some tests for this. I'm pretty tempted to change some of the jobs API further, but I want to put some more thought into them before doing that. |
@@ -708,7 +711,7 @@ func (b *changefeedResumer) resumeWithRetries( | |||
// a dummy channel. | |||
startedCh := make(chan tree.Datums, 1) | |||
|
|||
if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { | |||
if err = distChangefeedFlow(ctx, jobExec, jobID, jobSessionID, details, progress, startedCh); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to do something special if dist flow returns error due to incorrect session id. Are we sure that this job cannot transition to failed because of that? Even if that's the case, I'm not liking this spooky action at distance.
Regardless, I think we need a comment here around the semantics of errors due to incorrect job session id.
@@ -178,6 +179,8 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF | |||
"job %d: with status '%s': expected session '%s' but found '%s'", | |||
j.ID(), statusString, j.sessionID, storedSession) | |||
} | |||
} else { | |||
log.VInfof(ctx, 1, "job %s: update called with no session ID", j.sessionID.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like we should even log warning? Or do we have too many call sites that don't load session id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I want to make this a warning, just testing out locally how noisy this will be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that some of the stats jobs may also be updating without a session id.
87f4d1a
to
567cb04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 10 files at r1, 4 of 9 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @stevendanna)
pkg/jobs/jobs.go, line 242 at r2 (raw file):
// SessionID returns the session ID of the job. func (j *Job) SessionID() sqlliveness.SessionID {
do we still need this exported method?
pkg/jobs/registry.go, line 690 at r2 (raw file):
defer r.mu.Unlock() adoptedJob, ok := r.mu.adoptedJobs[jobID]
probably want to unlock registry prior to running load below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still a draft as I'm writing some tests for this. I'm pretty tempted to change some of the jobs API further, but I want to put some more thought into them before doing that.
I'm a big +1 on that, namely I'd make it so that you can only call one of the mutating methods from a job for which you have the claim. That'd be swell.
That being said, let's do that after we get a backportable fix up.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @stevendanna)
pkg/jobs/registry.go, line 686 at r2 (raw file):
// system.jobs table. The job must have already been adopted by this // Registry. func (r *Registry) LoadAdoptedJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) {
nit: let's use the Claimed
language here? so, LoadClaimedJob
?
82e9eda
to
47641be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @stevendanna)
pkg/jobs/jobs.go, line 242 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
do we still need this exported method?
Nope, removed.
pkg/jobs/registry.go, line 688 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Furthermore, because we had to change protocol message specifications, it appears that this change would actually be less backportable than a change that didn't change any protocol messages.
I believe that the protobuf changes should be strictly compatible; but we will need to review that carefully.
And alternative, which I feel would be backportable would be to have something like:
LoadAdoptedJob (or perhaps LoadLocallyExecutingJob). Same deal, but return an error if job is not adopted by local registry, or set session id to an invalid value so that updates fail.I like this, I'll look into this as I think it should still be rather small and not require threading more data through the proto.
Done.
pkg/jobs/registry.go, line 690 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
probably want to unlock registry prior to running load below.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a big +1 on that, namely I'd make it so that you can only call one of the mutating methods from a job for which you have the claim. That'd be swell.
There aren't too many callsites of LoadJob, so I don't think this will be too bad to make progress on. I'll still defer as to keep this backportable. But I think we should do this while it is fresh in our minds.
Reviewable is no longer showing me Andrew's comments, but I changed the new function to LoadClaimedJob.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @stevendanna)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/ccl/changefeedccl/changefeed_stmt.go, line 714 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I wonder if we need to do something special if dist flow returns error due to incorrect session id. Are we sure that this job cannot transition to failed because of that? Even if that's the case, I'm not liking this spooky action at distance.
Regardless, I think we need a comment here around the semantics of errors due to incorrect job session id.
It would nice to remove the special cases for sessionID being unset, but the *Job that the registry uses when transitioning jobs should have the sessionID set, so any state transitions will fail:
E210924 14:01:39.089999 125912 jobs/adopt.go:420 ⋮ [n1] 2359 job ‹696060947264077825›: adoption completed with error job ‹696060947264077825›: could not mark as reverting: job ‹696060947264077825›: with status '‹'running'›': expected session '‹0a3971131ae74345a9c738ba079a4f2b›' but found NULL: job ‹696060947264077825›: with status '‹'running'›': expected session '‹0a3971131ae74345a9c738ba079a4f2b›' but found NULL
which is what we want, since the job is likely to be claimed by another node, so any state transition from us is erroneous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @miretskiy, and @stevendanna)
pkg/jobs/registry.go, line 679 at r3 (raw file):
// LoadJob loads an existing job with the given jobID from the system.jobs // table. func (r *Registry) LoadJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) {
I know it's "old" code... I wonder if we should slap a warning on this method plus a todo to fix api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @stevendanna)
pkg/ccl/changefeedccl/changefeed_stmt.go, line 714 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
It would nice to remove the special cases for sessionID being unset, but the *Job that the registry uses when transitioning jobs should have the sessionID set, so any state transitions will fail:
E210924 14:01:39.089999 125912 jobs/adopt.go:420 ⋮ [n1] 2359 job ‹696060947264077825›: adoption completed with error job ‹696060947264077825›: could not mark as reverting: job ‹696060947264077825›: with status '‹'running'›': expected session '‹0a3971131ae74345a9c738ba079a4f2b›' but found NULL: job ‹696060947264077825›: with status '‹'running'›': expected session '‹0a3971131ae74345a9c738ba079a4f2b›' but found NULL
which is what we want, since the job is likely to be claimed by another node, so any state transition from us is erroneous.
It'd be nice if the jobs layer would notice that that was the problem and log something less scary at the INFO
severity.
Many of these nil session ID paths remained for the migration. That migration is over. I think we can tighten this API surface area substantially.
pkg/jobs/registry.go, line 690 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Done.
nit: just make a helper for this to allow for deferring the locking.
pkg/jobs/registry.go, line 679 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I know it's "old" code... I wonder if we should slap a warning on this method plus a todo to fix api?
Or remove it fully in a follow-up PR.
pkg/jobs/update.go, line 183 at r3 (raw file):
} } else { log.VInfof(ctx, 1, "job %s: update called with no session ID", j.sessionID.String())
What would be cool is to get to a point where the only place that these calls happen are inside the registry and then we can use some boolean somewhere on the struct to decide whether we're allowed. Not for this PR.
I did a brief survey of the other call sites of LoadJob that look problematic: #70682 |
47641be
to
2cd7203
Compare
2cd7203
to
a23ace8
Compare
The changeFrontier updates the job multiple times during its run. It does this via a *job.Job object that it explicitly loads at startup using a JobID that is passed to it via its distflow spec. However, when loading the job in this way, the sessionID that was associated with the initial adoption of the job is not set. This means that this change frontier can continue updating the job without error even long after the session from the original adoption is expired. This can potentially lead to a single changefeed job running twice. This change addresses that by introducing a new method, LoadClaimedJob to the Registry, which consults the Registry's map of adopted job to find the sessionID, and ensure that the sessionID is included in the query to load the job. Note that for this to be correct it _requires_ that the changeFrontier is only ever scheduled on the sql instance that adopts the job. This API should probably change further to make it much harder to interact with jobs that haven't been adopted by your registry. However, the goal of this change is to be backportable. Release note (enterprise change): Fixes a bug that could have led to duplicate instances of a single changefeed job running for prolonged periods of time.
a23ace8
to
ec530a3
Compare
bors r=miretskiy |
Build succeeded: |
The changeFrontier updates the job multiple times during its run. It
does this via a *job.Job object that it explicitly loads at startup
using a JobID that is passed to it via its distflow spec.
However, when loading the job in this way, the sessionID that was
associated with the initial adoption of the job is not set. This means
that this change frontier can continue updating the job without error
even long after the session from the original adoption is
expired. This can potentially lead to a single changefeed job running
twice.
This change addresses that by introducing a new method, LoadAdoptedJob
to the Registry, which consults the Registry's map of adopted job to
find the sessionID, and ensure that the sessionID is included in the
query to load the job.
Note that for this to be correct it requires that the changeFrontier
is only ever scheduled on the sql instance that adopts the job.
This API should probably change further to make it much harder to
interact with jobs that haven't been adopted by your registry.
However, the goal of this change is to be backportable.
Release note (enterprise change): Fixes a bug that could have led to
duplicate instances of a single changefeed job running for prolonged
periods of time.