diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 503276fe23f2..14bbc91e56f3 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -185,6 +185,7 @@ go_test( "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlliveness", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/storage", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index b509819120bd..b98a8aa799aa 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1086,7 +1086,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { - job, err := cf.flowCtx.Cfg.JobRegistry.LoadJob(ctx, cf.spec.JobID) + job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) if err != nil { cf.MoveToDraining(err) return diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a4f2808aeeb5..624b96a9094e 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -749,7 +749,7 @@ func (b *changefeedResumer) resumeWithRetries( } // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. - reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID) + reloadedJob, reloadErr := execCfg.JobRegistry.LoadClaimedJob(ctx, jobID) if reloadErr != nil { if ctx.Err() != nil { return ctx.Err() diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 5978f2ad8381..b24920fe2eb8 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -57,6 +57,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -2358,6 +2359,72 @@ func TestChangefeedRetryableError(t *testing.T) { t.Run(`webhook`, webhookTest(testFn)) } +type alwaysAliveSession string + +func (f alwaysAliveSession) ID() sqlliveness.SessionID { return sqlliveness.SessionID(f) } +func (f alwaysAliveSession) Expiration() hlc.Timestamp { return hlc.MaxTimestamp } +func (f alwaysAliveSession) RegisterCallbackForSessionExpiry(func(context.Context)) {} + +func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set TestingKnobs to return a known session for easier + // comparison. + testSession := alwaysAliveSession("known-test-session") + adoptionInterval := 20 * time.Minute + sessionOverride := withKnobsFn(func(knobs *base.TestingKnobs) { + knobs.SQLLivenessKnobs = &sqlliveness.TestingKnobs{ + SessionOverride: func(_ context.Context) (sqlliveness.Session, error) { + return testSession, nil + }, + } + // This is a hack to avoid the job adoption loop from + // immediately re-adopting the job that is running. The job + // adoption loop basically just sets the claim ID, which will + // undo our deletion of the claim ID below. + knobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.Adopt = &adoptionInterval + }) + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + knobs := f.Server().TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + errChan := make(chan error, 1) + knobs.HandleDistChangefeedError = func(err error) error { + errChan <- err + return err + } + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) + sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (1, 1)`) + + cf := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") + defer closeFeed(t, cf) + + assertPayloads(t, cf, []string{ + `foo: [1]->{"after": {"a": 1, "b": 1}}`, + }) + + // Mimic the claim dying and being cleaned up by + // another node. + jobID := cf.(cdctest.EnterpriseTestFeed).JobID() + sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID) + + // Expect that the distflow fails since it can't + // update the checkpoint. + select { + case err := <-errChan: + require.Error(t, err) + // TODO(ssd): Replace this error in the jobs system with + // an error type we can check against. + require.Contains(t, err.Error(), fmt.Sprintf("expected session '%s' but found NULL", testSession.ID().String())) + case <-time.After(5 * time.Second): + t.Fatal("expected distflow to fail but it hasn't after 5 seconds") + } + + } + RunRandomSinkTest(t, "fails as expected", testFn, feedTestNoTenants, sessionOverride) +} + // TestChangefeedDataTTL ensures that changefeeds fail with an error in the case // where the feed has fallen behind the GC TTL of the table data. func TestChangefeedDataTTL(t *testing.T) { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 0f3f5be77e1c..2cc2f0988ca2 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -741,11 +741,15 @@ func (j *Job) runInTxn( // JobNotFoundError is returned from load when the job does not exist. type JobNotFoundError struct { - jobID jobspb.JobID + jobID jobspb.JobID + sessionID sqlliveness.SessionID } // Error makes JobNotFoundError an error. func (e *JobNotFoundError) Error() string { + if e.sessionID != "" { + return fmt.Sprintf("job with ID %d does not exist with claim session id %q", e.jobID, e.sessionID.String()) + } return fmt.Sprintf("job with ID %d does not exist", e.jobID) } @@ -760,10 +764,21 @@ func (j *Job) load(ctx context.Context, txn *kv.Txn) error { var createdBy *CreatedByInfo if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error { - const stmt = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1" - row, err := j.registry.ex.QueryRowEx( - ctx, "load-job-query", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - stmt, j.ID()) + const ( + queryNoSessionID = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1" + queryWithSessionID = queryNoSessionID + " AND claim_session_id = $2" + ) + sess := sessiondata.InternalExecutorOverride{User: security.RootUserName()} + + var err error + var row tree.Datums + if j.sessionID == "" { + row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess, + queryNoSessionID, j.ID()) + } else { + row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess, + queryWithSessionID, j.ID(), j.sessionID.UnsafeBytes()) + } if err != nil { return err } diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index da517bf25b3d..d0d8adff72f4 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -676,10 +676,30 @@ func (r *Registry) CreateStartableJobWithTxn( // LoadJob loads an existing job with the given jobID from the system.jobs // table. +// +// WARNING: Avoid new uses of this function. The returned Job allows +// for mutation even if the instance no longer holds a valid claim on +// the job. +// +// TODO(ssd): Remove this API and replace it with a safer API. func (r *Registry) LoadJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) { return r.LoadJobWithTxn(ctx, jobID, nil) } +// LoadClaimedJob loads an existing job with the given jobID from the +// system.jobs table. The job must have already been claimed by this +// Registry. +func (r *Registry) LoadClaimedJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) { + j, err := r.getClaimedJob(jobID) + if err != nil { + return nil, err + } + if err := j.load(ctx, nil); err != nil { + return nil, err + } + return j, nil +} + // LoadJobWithTxn does the same as above, but using the transaction passed in // the txn argument. Passing a nil transaction is equivalent to calling LoadJob // in that a transaction will be automatically created. @@ -1347,6 +1367,21 @@ func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) { } } +func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) { + r.mu.Lock() + defer r.mu.Unlock() + + aj, ok := r.mu.adoptedJobs[jobID] + if !ok { + return nil, &JobNotFoundError{jobID: jobID} + } + return &Job{ + id: jobID, + sessionID: aj.sid, + registry: r, + }, nil +} + // RetryInitialDelay returns the value of retryInitialDelaySetting cluster setting, // in seconds, which is the initial delay in exponential-backoff delay calculation. func (r *Registry) RetryInitialDelay() float64 { diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index 6ff62dc8bcf6..e6ba4d5bcabf 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -178,6 +179,8 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF "job %d: with status '%s': expected session '%s' but found '%s'", j.ID(), statusString, j.sessionID, storedSession) } + } else { + log.VInfof(ctx, 1, "job %s: update called with no session ID", j.sessionID.String()) } if payload, err = UnmarshalPayload(row[1]); err != nil { return err