From ec530a3ac1b04629d9a203d2d2ebe610c564064b Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 23 Sep 2021 11:38:03 +0100 Subject: [PATCH] jobs,changefeedccl: only load claimed jobs The changeFrontier updates the job multiple times during its run. It does this via a *job.Job object that it explicitly loads at startup using a JobID that is passed to it via its distflow spec. However, when loading the job in this way, the sessionID that was associated with the initial adoption of the job is not set. This means that this change frontier can continue updating the job without error even long after the session from the original adoption is expired. This can potentially lead to a single changefeed job running twice. This change addresses that by introducing a new method, LoadClaimedJob to the Registry, which consults the Registry's map of adopted job to find the sessionID, and ensure that the sessionID is included in the query to load the job. Note that for this to be correct it _requires_ that the changeFrontier is only ever scheduled on the sql instance that adopts the job. This API should probably change further to make it much harder to interact with jobs that haven't been adopted by your registry. However, the goal of this change is to be backportable. Release note (enterprise change): Fixes a bug that could have led to duplicate instances of a single changefeed job running for prolonged periods of time. --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/changefeed_processors.go | 2 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 2 +- pkg/ccl/changefeedccl/changefeed_test.go | 67 +++++++++++++++++++ pkg/jobs/jobs.go | 25 +++++-- pkg/jobs/registry.go | 35 ++++++++++ pkg/jobs/update.go | 3 + 7 files changed, 128 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 503276fe23f2..14bbc91e56f3 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -185,6 +185,7 @@ go_test( "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlliveness", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/storage", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index b509819120bd..b98a8aa799aa 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1086,7 +1086,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { - job, err := cf.flowCtx.Cfg.JobRegistry.LoadJob(ctx, cf.spec.JobID) + job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) if err != nil { cf.MoveToDraining(err) return diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a4f2808aeeb5..624b96a9094e 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -749,7 +749,7 @@ func (b *changefeedResumer) resumeWithRetries( } // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. - reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID) + reloadedJob, reloadErr := execCfg.JobRegistry.LoadClaimedJob(ctx, jobID) if reloadErr != nil { if ctx.Err() != nil { return ctx.Err() diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 5978f2ad8381..b24920fe2eb8 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -57,6 +57,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -2358,6 +2359,72 @@ func TestChangefeedRetryableError(t *testing.T) { t.Run(`webhook`, webhookTest(testFn)) } +type alwaysAliveSession string + +func (f alwaysAliveSession) ID() sqlliveness.SessionID { return sqlliveness.SessionID(f) } +func (f alwaysAliveSession) Expiration() hlc.Timestamp { return hlc.MaxTimestamp } +func (f alwaysAliveSession) RegisterCallbackForSessionExpiry(func(context.Context)) {} + +func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set TestingKnobs to return a known session for easier + // comparison. + testSession := alwaysAliveSession("known-test-session") + adoptionInterval := 20 * time.Minute + sessionOverride := withKnobsFn(func(knobs *base.TestingKnobs) { + knobs.SQLLivenessKnobs = &sqlliveness.TestingKnobs{ + SessionOverride: func(_ context.Context) (sqlliveness.Session, error) { + return testSession, nil + }, + } + // This is a hack to avoid the job adoption loop from + // immediately re-adopting the job that is running. The job + // adoption loop basically just sets the claim ID, which will + // undo our deletion of the claim ID below. + knobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.Adopt = &adoptionInterval + }) + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + knobs := f.Server().TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + errChan := make(chan error, 1) + knobs.HandleDistChangefeedError = func(err error) error { + errChan <- err + return err + } + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) + sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (1, 1)`) + + cf := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") + defer closeFeed(t, cf) + + assertPayloads(t, cf, []string{ + `foo: [1]->{"after": {"a": 1, "b": 1}}`, + }) + + // Mimic the claim dying and being cleaned up by + // another node. + jobID := cf.(cdctest.EnterpriseTestFeed).JobID() + sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID) + + // Expect that the distflow fails since it can't + // update the checkpoint. + select { + case err := <-errChan: + require.Error(t, err) + // TODO(ssd): Replace this error in the jobs system with + // an error type we can check against. + require.Contains(t, err.Error(), fmt.Sprintf("expected session '%s' but found NULL", testSession.ID().String())) + case <-time.After(5 * time.Second): + t.Fatal("expected distflow to fail but it hasn't after 5 seconds") + } + + } + RunRandomSinkTest(t, "fails as expected", testFn, feedTestNoTenants, sessionOverride) +} + // TestChangefeedDataTTL ensures that changefeeds fail with an error in the case // where the feed has fallen behind the GC TTL of the table data. func TestChangefeedDataTTL(t *testing.T) { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 0f3f5be77e1c..2cc2f0988ca2 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -741,11 +741,15 @@ func (j *Job) runInTxn( // JobNotFoundError is returned from load when the job does not exist. type JobNotFoundError struct { - jobID jobspb.JobID + jobID jobspb.JobID + sessionID sqlliveness.SessionID } // Error makes JobNotFoundError an error. func (e *JobNotFoundError) Error() string { + if e.sessionID != "" { + return fmt.Sprintf("job with ID %d does not exist with claim session id %q", e.jobID, e.sessionID.String()) + } return fmt.Sprintf("job with ID %d does not exist", e.jobID) } @@ -760,10 +764,21 @@ func (j *Job) load(ctx context.Context, txn *kv.Txn) error { var createdBy *CreatedByInfo if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error { - const stmt = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1" - row, err := j.registry.ex.QueryRowEx( - ctx, "load-job-query", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - stmt, j.ID()) + const ( + queryNoSessionID = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1" + queryWithSessionID = queryNoSessionID + " AND claim_session_id = $2" + ) + sess := sessiondata.InternalExecutorOverride{User: security.RootUserName()} + + var err error + var row tree.Datums + if j.sessionID == "" { + row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess, + queryNoSessionID, j.ID()) + } else { + row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess, + queryWithSessionID, j.ID(), j.sessionID.UnsafeBytes()) + } if err != nil { return err } diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index da517bf25b3d..d0d8adff72f4 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -676,10 +676,30 @@ func (r *Registry) CreateStartableJobWithTxn( // LoadJob loads an existing job with the given jobID from the system.jobs // table. +// +// WARNING: Avoid new uses of this function. The returned Job allows +// for mutation even if the instance no longer holds a valid claim on +// the job. +// +// TODO(ssd): Remove this API and replace it with a safer API. func (r *Registry) LoadJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) { return r.LoadJobWithTxn(ctx, jobID, nil) } +// LoadClaimedJob loads an existing job with the given jobID from the +// system.jobs table. The job must have already been claimed by this +// Registry. +func (r *Registry) LoadClaimedJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) { + j, err := r.getClaimedJob(jobID) + if err != nil { + return nil, err + } + if err := j.load(ctx, nil); err != nil { + return nil, err + } + return j, nil +} + // LoadJobWithTxn does the same as above, but using the transaction passed in // the txn argument. Passing a nil transaction is equivalent to calling LoadJob // in that a transaction will be automatically created. @@ -1347,6 +1367,21 @@ func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) { } } +func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) { + r.mu.Lock() + defer r.mu.Unlock() + + aj, ok := r.mu.adoptedJobs[jobID] + if !ok { + return nil, &JobNotFoundError{jobID: jobID} + } + return &Job{ + id: jobID, + sessionID: aj.sid, + registry: r, + }, nil +} + // RetryInitialDelay returns the value of retryInitialDelaySetting cluster setting, // in seconds, which is the initial delay in exponential-backoff delay calculation. func (r *Registry) RetryInitialDelay() float64 { diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index 6ff62dc8bcf6..e6ba4d5bcabf 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -178,6 +179,8 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF "job %d: with status '%s': expected session '%s' but found '%s'", j.ID(), statusString, j.sessionID, storedSession) } + } else { + log.VInfof(ctx, 1, "job %s: update called with no session ID", j.sessionID.String()) } if payload, err = UnmarshalPayload(row[1]); err != nil { return err