Skip to content

Commit

Permalink
Merge #69204
Browse files Browse the repository at this point in the history
69204: jobs, backupccl: DROP SCHEDULE clears DependentID and releases PTS r=miretskiy a=adityamaru

jobs, backupccl: DROP SCHEDULE clears DependentID and releases PTS
This change teaches `DROP SCHEDULE` to clear the dependent ID
linkage from the dependent schedule.

It also releases the protected timestamp (if any) on the schedule
being dropped. This pts would have been written if chaining of pts
was enabled.

Release note: None

backupccl: fix SHOW CREATE SCHEDULE options
In the previous commit `DROP SCHEDULE` wipes the dependent
ID linkage. `SHOW CREATE SCHEDULE` on an incremental schedule
whose dependent full schedule has been dropped will now
print a `CREATE SCHEDULE` statement with the full backup
recurrence cleared. This means that if the user were to copy
paste the command, we would choose a full backup recurrence
based on the incremental recurrence.

This commit also fixes the schedule options that are printed
as the output of `SHOW CREATE SCHEDULE`. Previously, these
would be printed as the string representations of the enum
values, but this was not round-trippable.

Informs: #69129

Release note: None



Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Aug 22, 2021
2 parents 5d1148c + 2e809e4 commit b711b24
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 44 deletions.
32 changes: 32 additions & 0 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,38 @@ func parseWaitBehavior(wait string, details *jobspb.ScheduleDetails) error {
return nil
}

func parseOnPreviousRunningOption(
onPreviousRunning jobspb.ScheduleDetails_WaitBehavior,
) (string, error) {
var onPreviousRunningOption string
switch onPreviousRunning {
case jobspb.ScheduleDetails_WAIT:
onPreviousRunningOption = "WAIT"
case jobspb.ScheduleDetails_NO_WAIT:
onPreviousRunningOption = "START"
case jobspb.ScheduleDetails_SKIP:
onPreviousRunningOption = "SKIP"
default:
return onPreviousRunningOption, errors.Newf("%s is an invalid onPreviousRunning option", onPreviousRunning.String())
}
return onPreviousRunningOption, nil
}

func parseOnErrorOption(onError jobspb.ScheduleDetails_ErrorHandlingBehavior) (string, error) {
var onErrorOption string
switch onError {
case jobspb.ScheduleDetails_RETRY_SCHED:
onErrorOption = "RESCHEDULE"
case jobspb.ScheduleDetails_RETRY_SOON:
onErrorOption = "RETRY"
case jobspb.ScheduleDetails_PAUSE_SCHED:
onErrorOption = "PAUSE"
default:
return onErrorOption, errors.Newf("%s is an invalid onError option", onError.String())
}
return onErrorOption, nil
}

func makeScheduleDetails(opts map[string]string) (jobspb.ScheduleDetails, error) {
var details jobspb.ScheduleDetails
if v, ok := opts[optOnExecFailure]; ok {
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,18 +968,22 @@ func constructExpectedScheduledBackupNode(
require.NoError(t, err)
firstRun, err := tree.MakeDTimestampTZ(sj.ScheduledRunTime(), time.Microsecond)
require.NoError(t, err)
wait, err := parseOnPreviousRunningOption(sj.ScheduleDetails().Wait)
require.NoError(t, err)
onError, err := parseOnErrorOption(sj.ScheduleDetails().OnError)
require.NoError(t, err)
scheduleOptions := tree.KVOptions{
tree.KVOption{
Key: optFirstRun,
Value: firstRun,
},
tree.KVOption{
Key: optOnExecFailure,
Value: tree.NewDString(sj.ScheduleDetails().OnError.String()),
Value: tree.NewDString(onError),
},
tree.KVOption{
Key: optOnPreviousRunning,
Value: tree.NewDString(sj.ScheduleDetails().Wait.String()),
Value: tree.NewDString(wait),
},
}
sb := &tree.ScheduledBackup{
Expand Down
81 changes: 79 additions & 2 deletions pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement(
fullBackup.Recurrence = tree.NewDString(recurrence)
recurrence = dependentSchedule.ScheduleExpr()
}
} else {
// If sj does not have a dependent schedule and is an incremental backup
// schedule, this is only possible if the dependent full schedule has been
// dropped.
// In this case we set the recurrence to sj's ScheduleExpr() but we leave
// the full backup recurrence empty so that it is decided by the scheduler.
if backupNode.AppendToLatest {
fullBackup.AlwaysFull = false
fullBackup.Recurrence = nil
}
}

// Pick first_run to be the sooner of the scheduled run time on sj and its
Expand Down Expand Up @@ -241,18 +251,26 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement(
return "", err
}

wait, err := parseOnPreviousRunningOption(sj.ScheduleDetails().Wait)
if err != nil {
return "", err
}
onError, err := parseOnErrorOption(sj.ScheduleDetails().OnError)
if err != nil {
return "", err
}
scheduleOptions := tree.KVOptions{
tree.KVOption{
Key: optFirstRun,
Value: firstRun,
},
tree.KVOption{
Key: optOnExecFailure,
Value: tree.NewDString(sj.ScheduleDetails().OnError.String()),
Value: tree.NewDString(onError),
},
tree.KVOption{
Key: optOnPreviousRunning,
Value: tree.NewDString(sj.ScheduleDetails().Wait.String()),
Value: tree.NewDString(wait),
},
}

Expand Down Expand Up @@ -354,6 +372,65 @@ func extractBackupStatement(sj *jobs.ScheduledJob) (*annotatedBackupStatement, e
return nil, errors.Newf("unexpect node type %T", node)
}

var _ jobs.ScheduledJobController = &scheduledBackupExecutor{}

func unlinkDependentSchedule(
ctx context.Context,
scheduleControllerEnv scheduledjobs.ScheduleControllerEnv,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
args *ScheduledBackupExecutionArgs,
) error {
if args.DependentScheduleID == 0 {
return nil
}

// Load the dependent schedule.
dependentSj, dependentArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn,
scheduleControllerEnv.InternalExecutor().(*sql.InternalExecutor), args.DependentScheduleID)
if err != nil {
if jobs.HasScheduledJobNotFoundError(err) {
log.Warningf(ctx, "failed to resolve dependent schedule %d", args.DependentScheduleID)
return nil
}
return errors.Wrapf(err, "failed to resolve dependent schedule %d", args.DependentScheduleID)
}

// Clear the DependentID field since we are dropping the record associated
// with it.
dependentArgs.DependentScheduleID = 0
any, err := pbtypes.MarshalAny(dependentArgs)
if err != nil {
return err
}
dependentSj.SetExecutionDetails(dependentSj.ExecutorType(), jobspb.ExecutionArguments{Args: any})
return dependentSj.Update(ctx, scheduleControllerEnv.InternalExecutor(), txn)
}

// OnDrop implements the ScheduledJobController interface.
// The method is responsible for releasing the pts record stored on the schedule
// if schedules.backup.gc_protection.enabled = true.
// It is also responsible for unlinking the dependent schedule by clearing the
// DependentID.
func (e *scheduledBackupExecutor) OnDrop(
ctx context.Context,
scheduleControllerEnv scheduledjobs.ScheduleControllerEnv,
env scheduledjobs.JobSchedulerEnv,
sj *jobs.ScheduledJob,
txn *kv.Txn,
) error {
args := &ScheduledBackupExecutionArgs{}
if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, args); err != nil {
return errors.Wrap(err, "un-marshaling args")
}

if err := unlinkDependentSchedule(ctx, scheduleControllerEnv, env, txn, args); err != nil {
return errors.Wrap(err, "failed to unlink dependent schedule")
}
return releaseProtectedTimestamp(ctx, txn, scheduleControllerEnv.PTSProvider(),
args.ProtectedTimestampRecord)
}

func init() {
jobs.RegisterScheduledJobExecutorFactory(
tree.ScheduledBackupExecutor.InternalName(),
Expand Down
134 changes: 95 additions & 39 deletions pkg/ccl/backupccl/schedule_pts_chaining_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,28 @@ func (th *testHelper) createSchedules(t *testing.T, name string) (int64, int64,
}
}

func checkPTSRecord(
ctx context.Context,
t *testing.T,
th *testHelper,
id uuid.UUID,
schedule *jobs.ScheduledJob,
timestamp hlc.Timestamp,
) {
var ptsRecord *ptpb.Record
var err error
require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider.
GetRecord(context.Background(), txn, id)
require.NoError(t, err)
return nil
}))
encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10))
require.Equal(t, encodedScheduleID, ptsRecord.Meta)
require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType)
require.Equal(t, timestamp, ptsRecord.Timestamp)
}

func TestScheduleBackupChainsProtectedTimestampRecords(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -93,21 +115,6 @@ INSERT INTO t values (1), (10), (100);
}

ctx := context.Background()
checkPTSRecord := func(t *testing.T, id uuid.UUID, schedule *jobs.ScheduledJob,
timestamp hlc.Timestamp) {
var ptsRecord *ptpb.Record
var err error
require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider.
GetRecord(context.Background(), txn, id)
require.NoError(t, err)
return nil
}))
encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10))
require.Equal(t, encodedScheduleID, ptsRecord.Meta)
require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType)
require.Equal(t, timestamp, ptsRecord.Timestamp)
}

fullID, incID, cleanupSchedules := th.createSchedules(t, "foo")
defer cleanupSchedules()
Expand All @@ -130,14 +137,14 @@ INSERT INTO t values (1), (10), (100);
// Check that there is a PTS record on the incremental schedule.
ptsOnIncID := incArgs.ProtectedTimestampRecord
require.NotNil(t, ptsOnIncID)
checkPTSRecord(t, *ptsOnIncID, incSchedule,
checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()})

// Force inc backup to execute.
runSchedule(t, incSchedule)

// Check that the pts record was updated to the inc backups' EndTime.
checkPTSRecord(t, *ptsOnIncID, incSchedule,
checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[1].Round(time.Microsecond).UnixNano()})

// Pause the incSchedule so that it doesn't run when we forward the env time
Expand Down Expand Up @@ -167,7 +174,7 @@ INSERT INTO t values (1), (10), (100);
require.True(t, errors.Is(err, protectedts.ErrNotExists))
return nil
}))
checkPTSRecord(t, *incArgs.ProtectedTimestampRecord, incSchedule,
checkPTSRecord(ctx, t, th, *incArgs.ProtectedTimestampRecord, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[2].Round(time.Microsecond).UnixNano()})
}

Expand Down Expand Up @@ -210,21 +217,6 @@ INSERT INTO t values (1), (10), (100);
}

ctx := context.Background()
checkPTSRecord := func(t *testing.T, id uuid.UUID, schedule *jobs.ScheduledJob,
timestamp hlc.Timestamp) {
var ptsRecord *ptpb.Record
var err error
require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider.
GetRecord(context.Background(), txn, id)
require.NoError(t, err)
return nil
}))
encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10))
require.Equal(t, encodedScheduleID, ptsRecord.Meta)
require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType)
require.Equal(t, timestamp, ptsRecord.Timestamp)
}

t.Run("inc schedule is dropped", func(t *testing.T) {
fullID, incID, cleanupSchedules := th.createSchedules(t, "foo")
Expand Down Expand Up @@ -255,30 +247,94 @@ INSERT INTO t values (1), (10), (100);
runSchedule(t, fullSchedule)

// Check that there is no PTS record on the full schedule.
require.Nil(t, fullArgs.ProtectedTimestampRecord)

// Check that there is a PTS record on the incremental schedule.
incSchedule := th.loadSchedule(t, incID)
_, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil,
th.server.InternalExecutor().(*sql.InternalExecutor), incID)
require.NoError(t, err)
require.Nil(t, fullArgs.ProtectedTimestampRecord)

// Check that there is a PTS record on the incremental schedule.
ptsOnIncID := incArgs.ProtectedTimestampRecord
require.NotNil(t, ptsOnIncID)
checkPTSRecord(t, *ptsOnIncID, incSchedule,
checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()})

th.sqlDB.Exec(t, `DROP SCHEDULE $1`, fullID)

// Run the inc schedule.
runSchedule(t, incSchedule)
checkPTSRecord(t, *ptsOnIncID, incSchedule,
checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[1].Round(time.Microsecond).UnixNano()})
clearSuccessfulJobForSchedule(t, incSchedule)

// Run it again.
incSchedule = th.loadSchedule(t, incID)
runSchedule(t, incSchedule)
checkPTSRecord(t, *ptsOnIncID, incSchedule,
checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[2].Round(time.Microsecond).UnixNano()})
})
}

func TestDropScheduleReleasePTSRecord(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelper(t)
defer cleanup()

th.sqlDB.Exec(t, `SET CLUSTER SETTING schedules.backup.gc_protection.enabled = true`)
th.sqlDB.Exec(t, `
CREATE DATABASE db;
USE db;
CREATE TABLE t(a int);
INSERT INTO t values (1), (10), (100);
`)

backupAsOfTimes := make([]time.Time, 0)
th.cfg.TestingKnobs.(*jobs.TestingKnobs).OverrideAsOfClause = func(clause *tree.AsOfClause) {
backupAsOfTime := th.cfg.DB.Clock().PhysicalTime()
expr, err := tree.MakeDTimestampTZ(backupAsOfTime, time.Microsecond)
require.NoError(t, err)
clause.Expr = expr
backupAsOfTimes = append(backupAsOfTimes, backupAsOfTime)
}

runSchedule := func(t *testing.T, schedule *jobs.ScheduledJob) {
th.env.SetTime(schedule.NextRun().Add(time.Second))
require.NoError(t, th.executeSchedules())
th.waitForSuccessfulScheduledJob(t, schedule.ScheduleID())
}

ctx := context.Background()

fullID, incID, cleanupSchedules := th.createSchedules(t, "foo")
defer cleanupSchedules()
defer func() { backupAsOfTimes = backupAsOfTimes[:0] }()

fullSchedule := th.loadSchedule(t, fullID)
// Force full backup to execute (this unpauses incremental, which should be a no-op).
runSchedule(t, fullSchedule)

incSchedule := th.loadSchedule(t, incID)
_, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil,
th.server.InternalExecutor().(*sql.InternalExecutor), incID)
require.NoError(t, err)
ptsOnIncID := incArgs.ProtectedTimestampRecord
require.NotNil(t, ptsOnIncID)
checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule,
hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()})

th.sqlDB.Exec(t, `DROP SCHEDULE $1`, incID)

// Ensure that the pts record on the incremental schedule has been released
// by the DROP.
var numRows int
th.sqlDB.QueryRow(t, `SELECT num_records FROM system.protected_ts_meta`).Scan(&numRows)
require.Zero(t, numRows)

// Also ensure that the full schedule doesn't have DependentID set anymore.
_, fullArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil,
th.server.InternalExecutor().(*sql.InternalExecutor), fullID)
require.NoError(t, err)
require.Zero(t, fullArgs.DependentScheduleID)
}
9 changes: 9 additions & 0 deletions pkg/jobs/scheduled_job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ type ScheduledJobExecutor interface {
schedule *ScheduledJob, ex sqlutil.InternalExecutor) (string, error)
}

// ScheduledJobController is an interface describing hooks that will execute
// when controlling a scheduled job.
type ScheduledJobController interface {
// OnDrop runs before the passed in `schedule` is dropped as part of a `DROP
// SCHEDULE` query.
OnDrop(ctx context.Context, scheduleControllerEnv scheduledjobs.ScheduleControllerEnv,
env scheduledjobs.JobSchedulerEnv, schedule *ScheduledJob, txn *kv.Txn) error
}

// ScheduledJobExecutorFactory is a callback to create a ScheduledJobExecutor.
type ScheduledJobExecutorFactory = func() (ScheduledJobExecutor, error)

Expand Down
1 change: 1 addition & 0 deletions pkg/scheduledjobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts",
"//pkg/security",
"//pkg/settings/cluster",
"//pkg/sql/sqlutil",
Expand Down
Loading

0 comments on commit b711b24

Please sign in to comment.