Skip to content

Commit

Permalink
jobs,changefeedccl: only load claimed jobs
Browse files Browse the repository at this point in the history
The changeFrontier updates the job multiple times during its run. It
does this via a *job.Job object that it explicitly loads at startup
using a JobID that is passed to it via its distflow spec.

However, when loading the job in this way, the sessionID that was
associated with the initial adoption of the job is not set. This means
that this change frontier can continue updating the job without error
even long after the session from the original adoption is
expired. This can potentially lead to a single changefeed job running
twice.

This change addresses that by introducing a new method, LoadClaimedJob
to the Registry, which consults the Registry's map of adopted job to
find the sessionID, and ensure that the sessionID is included in the
query to load the job.

Note that for this to be correct it _requires_ that the changeFrontier
is only ever scheduled on the sql instance that adopts the job.

This API should probably change further to make it much harder to
interact with jobs that haven't been adopted by your registry.
However, the goal of this change is to be backportable.

Release note (enterprise change): Fixes a bug that could have led to
duplicate instances of a single changefeed job running for prolonged
periods of time.
  • Loading branch information
stevendanna committed Sep 29, 2021
1 parent ab0c462 commit ec530a3
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ go_test(
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/storage",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadJob(ctx, cf.spec.JobID)
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
if err != nil {
cf.MoveToDraining(err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (b *changefeedResumer) resumeWithRetries(
}
// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID)
reloadedJob, reloadErr := execCfg.JobRegistry.LoadClaimedJob(ctx, jobID)
if reloadErr != nil {
if ctx.Err() != nil {
return ctx.Err()
Expand Down
67 changes: 67 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -2358,6 +2359,72 @@ func TestChangefeedRetryableError(t *testing.T) {
t.Run(`webhook`, webhookTest(testFn))
}

type alwaysAliveSession string

func (f alwaysAliveSession) ID() sqlliveness.SessionID { return sqlliveness.SessionID(f) }
func (f alwaysAliveSession) Expiration() hlc.Timestamp { return hlc.MaxTimestamp }
func (f alwaysAliveSession) RegisterCallbackForSessionExpiry(func(context.Context)) {}

func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set TestingKnobs to return a known session for easier
// comparison.
testSession := alwaysAliveSession("known-test-session")
adoptionInterval := 20 * time.Minute
sessionOverride := withKnobsFn(func(knobs *base.TestingKnobs) {
knobs.SQLLivenessKnobs = &sqlliveness.TestingKnobs{
SessionOverride: func(_ context.Context) (sqlliveness.Session, error) {
return testSession, nil
},
}
// This is a hack to avoid the job adoption loop from
// immediately re-adopting the job that is running. The job
// adoption loop basically just sets the claim ID, which will
// undo our deletion of the claim ID below.
knobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.Adopt = &adoptionInterval
})
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
knobs := f.Server().TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
errChan := make(chan error, 1)
knobs.HandleDistChangefeedError = func(err error) error {
errChan <- err
return err
}

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)
sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (1, 1)`)

cf := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
defer closeFeed(t, cf)

assertPayloads(t, cf, []string{
`foo: [1]->{"after": {"a": 1, "b": 1}}`,
})

// Mimic the claim dying and being cleaned up by
// another node.
jobID := cf.(cdctest.EnterpriseTestFeed).JobID()
sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID)

// Expect that the distflow fails since it can't
// update the checkpoint.
select {
case err := <-errChan:
require.Error(t, err)
// TODO(ssd): Replace this error in the jobs system with
// an error type we can check against.
require.Contains(t, err.Error(), fmt.Sprintf("expected session '%s' but found NULL", testSession.ID().String()))
case <-time.After(5 * time.Second):
t.Fatal("expected distflow to fail but it hasn't after 5 seconds")
}

}
RunRandomSinkTest(t, "fails as expected", testFn, feedTestNoTenants, sessionOverride)
}

// TestChangefeedDataTTL ensures that changefeeds fail with an error in the case
// where the feed has fallen behind the GC TTL of the table data.
func TestChangefeedDataTTL(t *testing.T) {
Expand Down
25 changes: 20 additions & 5 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,11 +741,15 @@ func (j *Job) runInTxn(

// JobNotFoundError is returned from load when the job does not exist.
type JobNotFoundError struct {
jobID jobspb.JobID
jobID jobspb.JobID
sessionID sqlliveness.SessionID
}

// Error makes JobNotFoundError an error.
func (e *JobNotFoundError) Error() string {
if e.sessionID != "" {
return fmt.Sprintf("job with ID %d does not exist with claim session id %q", e.jobID, e.sessionID.String())
}
return fmt.Sprintf("job with ID %d does not exist", e.jobID)
}

Expand All @@ -760,10 +764,21 @@ func (j *Job) load(ctx context.Context, txn *kv.Txn) error {
var createdBy *CreatedByInfo

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
const stmt = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1"
row, err := j.registry.ex.QueryRowEx(
ctx, "load-job-query", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, j.ID())
const (
queryNoSessionID = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1"
queryWithSessionID = queryNoSessionID + " AND claim_session_id = $2"
)
sess := sessiondata.InternalExecutorOverride{User: security.RootUserName()}

var err error
var row tree.Datums
if j.sessionID == "" {
row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess,
queryNoSessionID, j.ID())
} else {
row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess,
queryWithSessionID, j.ID(), j.sessionID.UnsafeBytes())
}
if err != nil {
return err
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,30 @@ func (r *Registry) CreateStartableJobWithTxn(

// LoadJob loads an existing job with the given jobID from the system.jobs
// table.
//
// WARNING: Avoid new uses of this function. The returned Job allows
// for mutation even if the instance no longer holds a valid claim on
// the job.
//
// TODO(ssd): Remove this API and replace it with a safer API.
func (r *Registry) LoadJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) {
return r.LoadJobWithTxn(ctx, jobID, nil)
}

// LoadClaimedJob loads an existing job with the given jobID from the
// system.jobs table. The job must have already been claimed by this
// Registry.
func (r *Registry) LoadClaimedJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) {
j, err := r.getClaimedJob(jobID)
if err != nil {
return nil, err
}
if err := j.load(ctx, nil); err != nil {
return nil, err
}
return j, nil
}

// LoadJobWithTxn does the same as above, but using the transaction passed in
// the txn argument. Passing a nil transaction is equivalent to calling LoadJob
// in that a transaction will be automatically created.
Expand Down Expand Up @@ -1347,6 +1367,21 @@ func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) {
}
}

func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) {
r.mu.Lock()
defer r.mu.Unlock()

aj, ok := r.mu.adoptedJobs[jobID]
if !ok {
return nil, &JobNotFoundError{jobID: jobID}
}
return &Job{
id: jobID,
sessionID: aj.sid,
registry: r,
}, nil
}

// RetryInitialDelay returns the value of retryInitialDelaySetting cluster setting,
// in seconds, which is the initial delay in exponential-backoff delay calculation.
func (r *Registry) RetryInitialDelay() float64 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -178,6 +179,8 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF
"job %d: with status '%s': expected session '%s' but found '%s'",
j.ID(), statusString, j.sessionID, storedSession)
}
} else {
log.VInfof(ctx, 1, "job %s: update called with no session ID", j.sessionID.String())
}
if payload, err = UnmarshalPayload(row[1]); err != nil {
return err
Expand Down

0 comments on commit ec530a3

Please sign in to comment.