diff --git a/pkg/ccl/backupccl/create_scheduled_backup.go b/pkg/ccl/backupccl/create_scheduled_backup.go index edcf263ebfd9..e8aa1275c82a 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup.go +++ b/pkg/ccl/backupccl/create_scheduled_backup.go @@ -114,6 +114,38 @@ func parseWaitBehavior(wait string, details *jobspb.ScheduleDetails) error { return nil } +func parseOnPreviousRunningOption( + onPreviousRunning jobspb.ScheduleDetails_WaitBehavior, +) (string, error) { + var onPreviousRunningOption string + switch onPreviousRunning { + case jobspb.ScheduleDetails_WAIT: + onPreviousRunningOption = "WAIT" + case jobspb.ScheduleDetails_NO_WAIT: + onPreviousRunningOption = "START" + case jobspb.ScheduleDetails_SKIP: + onPreviousRunningOption = "SKIP" + default: + return onPreviousRunningOption, errors.Newf("%s is an invalid onPreviousRunning option", onPreviousRunning.String()) + } + return onPreviousRunningOption, nil +} + +func parseOnErrorOption(onError jobspb.ScheduleDetails_ErrorHandlingBehavior) (string, error) { + var onErrorOption string + switch onError { + case jobspb.ScheduleDetails_RETRY_SCHED: + onErrorOption = "RESCHEDULE" + case jobspb.ScheduleDetails_RETRY_SOON: + onErrorOption = "RETRY" + case jobspb.ScheduleDetails_PAUSE_SCHED: + onErrorOption = "PAUSE" + default: + return onErrorOption, errors.Newf("%s is an invalid onError option", onError.String()) + } + return onErrorOption, nil +} + func makeScheduleDetails(opts map[string]string) (jobspb.ScheduleDetails, error) { var details jobspb.ScheduleDetails if v, ok := opts[optOnExecFailure]; ok { diff --git a/pkg/ccl/backupccl/create_scheduled_backup_test.go b/pkg/ccl/backupccl/create_scheduled_backup_test.go index 09be85a748cb..0903ba25d18b 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup_test.go +++ b/pkg/ccl/backupccl/create_scheduled_backup_test.go @@ -968,6 +968,10 @@ func constructExpectedScheduledBackupNode( require.NoError(t, err) firstRun, err := tree.MakeDTimestampTZ(sj.ScheduledRunTime(), time.Microsecond) require.NoError(t, err) + wait, err := parseOnPreviousRunningOption(sj.ScheduleDetails().Wait) + require.NoError(t, err) + onError, err := parseOnErrorOption(sj.ScheduleDetails().OnError) + require.NoError(t, err) scheduleOptions := tree.KVOptions{ tree.KVOption{ Key: optFirstRun, @@ -975,11 +979,11 @@ func constructExpectedScheduledBackupNode( }, tree.KVOption{ Key: optOnExecFailure, - Value: tree.NewDString(sj.ScheduleDetails().OnError.String()), + Value: tree.NewDString(onError), }, tree.KVOption{ Key: optOnPreviousRunning, - Value: tree.NewDString(sj.ScheduleDetails().Wait.String()), + Value: tree.NewDString(wait), }, } sb := &tree.ScheduledBackup{ diff --git a/pkg/ccl/backupccl/schedule_exec.go b/pkg/ccl/backupccl/schedule_exec.go index 944c744f5d82..1dd0422b897e 100644 --- a/pkg/ccl/backupccl/schedule_exec.go +++ b/pkg/ccl/backupccl/schedule_exec.go @@ -214,6 +214,16 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement( fullBackup.Recurrence = tree.NewDString(recurrence) recurrence = dependentSchedule.ScheduleExpr() } + } else { + // If sj does not have a dependent schedule and is an incremental backup + // schedule, this is only possible if the dependent full schedule has been + // dropped. + // In this case we set the recurrence to sj's ScheduleExpr() but we leave + // the full backup recurrence empty so that it is decided by the scheduler. + if backupNode.AppendToLatest { + fullBackup.AlwaysFull = false + fullBackup.Recurrence = nil + } } // Pick first_run to be the sooner of the scheduled run time on sj and its @@ -241,6 +251,14 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement( return "", err } + wait, err := parseOnPreviousRunningOption(sj.ScheduleDetails().Wait) + if err != nil { + return "", err + } + onError, err := parseOnErrorOption(sj.ScheduleDetails().OnError) + if err != nil { + return "", err + } scheduleOptions := tree.KVOptions{ tree.KVOption{ Key: optFirstRun, @@ -248,11 +266,11 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement( }, tree.KVOption{ Key: optOnExecFailure, - Value: tree.NewDString(sj.ScheduleDetails().OnError.String()), + Value: tree.NewDString(onError), }, tree.KVOption{ Key: optOnPreviousRunning, - Value: tree.NewDString(sj.ScheduleDetails().Wait.String()), + Value: tree.NewDString(wait), }, } @@ -354,6 +372,65 @@ func extractBackupStatement(sj *jobs.ScheduledJob) (*annotatedBackupStatement, e return nil, errors.Newf("unexpect node type %T", node) } +var _ jobs.ScheduledJobController = &scheduledBackupExecutor{} + +func unlinkDependentSchedule( + ctx context.Context, + scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, + txn *kv.Txn, + args *ScheduledBackupExecutionArgs, +) error { + if args.DependentScheduleID == 0 { + return nil + } + + // Load the dependent schedule. + dependentSj, dependentArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, + scheduleControllerEnv.InternalExecutor().(*sql.InternalExecutor), args.DependentScheduleID) + if err != nil { + if jobs.HasScheduledJobNotFoundError(err) { + log.Warningf(ctx, "failed to resolve dependent schedule %d", args.DependentScheduleID) + return nil + } + return errors.Wrapf(err, "failed to resolve dependent schedule %d", args.DependentScheduleID) + } + + // Clear the DependentID field since we are dropping the record associated + // with it. + dependentArgs.DependentScheduleID = 0 + any, err := pbtypes.MarshalAny(dependentArgs) + if err != nil { + return err + } + dependentSj.SetExecutionDetails(dependentSj.ExecutorType(), jobspb.ExecutionArguments{Args: any}) + return dependentSj.Update(ctx, scheduleControllerEnv.InternalExecutor(), txn) +} + +// OnDrop implements the ScheduledJobController interface. +// The method is responsible for releasing the pts record stored on the schedule +// if schedules.backup.gc_protection.enabled = true. +// It is also responsible for unlinking the dependent schedule by clearing the +// DependentID. +func (e *scheduledBackupExecutor) OnDrop( + ctx context.Context, + scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, + sj *jobs.ScheduledJob, + txn *kv.Txn, +) error { + args := &ScheduledBackupExecutionArgs{} + if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, args); err != nil { + return errors.Wrap(err, "un-marshaling args") + } + + if err := unlinkDependentSchedule(ctx, scheduleControllerEnv, env, txn, args); err != nil { + return errors.Wrap(err, "failed to unlink dependent schedule") + } + return releaseProtectedTimestamp(ctx, txn, scheduleControllerEnv.PTSProvider(), + args.ProtectedTimestampRecord) +} + func init() { jobs.RegisterScheduledJobExecutorFactory( tree.ScheduledBackupExecutor.InternalName(), diff --git a/pkg/ccl/backupccl/schedule_pts_chaining_test.go b/pkg/ccl/backupccl/schedule_pts_chaining_test.go index aa051f47fcf4..052dfb28e76e 100644 --- a/pkg/ccl/backupccl/schedule_pts_chaining_test.go +++ b/pkg/ccl/backupccl/schedule_pts_chaining_test.go @@ -54,6 +54,28 @@ func (th *testHelper) createSchedules(t *testing.T, name string) (int64, int64, } } +func checkPTSRecord( + ctx context.Context, + t *testing.T, + th *testHelper, + id uuid.UUID, + schedule *jobs.ScheduledJob, + timestamp hlc.Timestamp, +) { + var ptsRecord *ptpb.Record + var err error + require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider. + GetRecord(context.Background(), txn, id) + require.NoError(t, err) + return nil + })) + encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10)) + require.Equal(t, encodedScheduleID, ptsRecord.Meta) + require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType) + require.Equal(t, timestamp, ptsRecord.Timestamp) +} + func TestScheduleBackupChainsProtectedTimestampRecords(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -93,21 +115,6 @@ INSERT INTO t values (1), (10), (100); } ctx := context.Background() - checkPTSRecord := func(t *testing.T, id uuid.UUID, schedule *jobs.ScheduledJob, - timestamp hlc.Timestamp) { - var ptsRecord *ptpb.Record - var err error - require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider. - GetRecord(context.Background(), txn, id) - require.NoError(t, err) - return nil - })) - encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10)) - require.Equal(t, encodedScheduleID, ptsRecord.Meta) - require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType) - require.Equal(t, timestamp, ptsRecord.Timestamp) - } fullID, incID, cleanupSchedules := th.createSchedules(t, "foo") defer cleanupSchedules() @@ -130,14 +137,14 @@ INSERT INTO t values (1), (10), (100); // Check that there is a PTS record on the incremental schedule. ptsOnIncID := incArgs.ProtectedTimestampRecord require.NotNil(t, ptsOnIncID) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()}) // Force inc backup to execute. runSchedule(t, incSchedule) // Check that the pts record was updated to the inc backups' EndTime. - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[1].Round(time.Microsecond).UnixNano()}) // Pause the incSchedule so that it doesn't run when we forward the env time @@ -167,7 +174,7 @@ INSERT INTO t values (1), (10), (100); require.True(t, errors.Is(err, protectedts.ErrNotExists)) return nil })) - checkPTSRecord(t, *incArgs.ProtectedTimestampRecord, incSchedule, + checkPTSRecord(ctx, t, th, *incArgs.ProtectedTimestampRecord, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[2].Round(time.Microsecond).UnixNano()}) } @@ -210,21 +217,6 @@ INSERT INTO t values (1), (10), (100); } ctx := context.Background() - checkPTSRecord := func(t *testing.T, id uuid.UUID, schedule *jobs.ScheduledJob, - timestamp hlc.Timestamp) { - var ptsRecord *ptpb.Record - var err error - require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider. - GetRecord(context.Background(), txn, id) - require.NoError(t, err) - return nil - })) - encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10)) - require.Equal(t, encodedScheduleID, ptsRecord.Meta) - require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType) - require.Equal(t, timestamp, ptsRecord.Timestamp) - } t.Run("inc schedule is dropped", func(t *testing.T) { fullID, incID, cleanupSchedules := th.createSchedules(t, "foo") @@ -255,30 +247,94 @@ INSERT INTO t values (1), (10), (100); runSchedule(t, fullSchedule) // Check that there is no PTS record on the full schedule. + require.Nil(t, fullArgs.ProtectedTimestampRecord) + + // Check that there is a PTS record on the incremental schedule. incSchedule := th.loadSchedule(t, incID) _, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil, th.server.InternalExecutor().(*sql.InternalExecutor), incID) require.NoError(t, err) - require.Nil(t, fullArgs.ProtectedTimestampRecord) - - // Check that there is a PTS record on the incremental schedule. ptsOnIncID := incArgs.ProtectedTimestampRecord require.NotNil(t, ptsOnIncID) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()}) th.sqlDB.Exec(t, `DROP SCHEDULE $1`, fullID) // Run the inc schedule. runSchedule(t, incSchedule) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[1].Round(time.Microsecond).UnixNano()}) clearSuccessfulJobForSchedule(t, incSchedule) // Run it again. incSchedule = th.loadSchedule(t, incID) runSchedule(t, incSchedule) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[2].Round(time.Microsecond).UnixNano()}) }) } + +func TestDropScheduleReleasePTSRecord(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + th, cleanup := newTestHelper(t) + defer cleanup() + + th.sqlDB.Exec(t, `SET CLUSTER SETTING schedules.backup.gc_protection.enabled = true`) + th.sqlDB.Exec(t, ` +CREATE DATABASE db; +USE db; +CREATE TABLE t(a int); +INSERT INTO t values (1), (10), (100); +`) + + backupAsOfTimes := make([]time.Time, 0) + th.cfg.TestingKnobs.(*jobs.TestingKnobs).OverrideAsOfClause = func(clause *tree.AsOfClause) { + backupAsOfTime := th.cfg.DB.Clock().PhysicalTime() + expr, err := tree.MakeDTimestampTZ(backupAsOfTime, time.Microsecond) + require.NoError(t, err) + clause.Expr = expr + backupAsOfTimes = append(backupAsOfTimes, backupAsOfTime) + } + + runSchedule := func(t *testing.T, schedule *jobs.ScheduledJob) { + th.env.SetTime(schedule.NextRun().Add(time.Second)) + require.NoError(t, th.executeSchedules()) + th.waitForSuccessfulScheduledJob(t, schedule.ScheduleID()) + } + + ctx := context.Background() + + fullID, incID, cleanupSchedules := th.createSchedules(t, "foo") + defer cleanupSchedules() + defer func() { backupAsOfTimes = backupAsOfTimes[:0] }() + + fullSchedule := th.loadSchedule(t, fullID) + // Force full backup to execute (this unpauses incremental, which should be a no-op). + runSchedule(t, fullSchedule) + + incSchedule := th.loadSchedule(t, incID) + _, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil, + th.server.InternalExecutor().(*sql.InternalExecutor), incID) + require.NoError(t, err) + ptsOnIncID := incArgs.ProtectedTimestampRecord + require.NotNil(t, ptsOnIncID) + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, + hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()}) + + th.sqlDB.Exec(t, `DROP SCHEDULE $1`, incID) + + // Ensure that the pts record on the incremental schedule has been released + // by the DROP. + var numRows int + th.sqlDB.QueryRow(t, `SELECT num_records FROM system.protected_ts_meta`).Scan(&numRows) + require.Zero(t, numRows) + + // Also ensure that the full schedule doesn't have DependentID set anymore. + _, fullArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil, + th.server.InternalExecutor().(*sql.InternalExecutor), fullID) + require.NoError(t, err) + require.Zero(t, fullArgs.DependentScheduleID) +} diff --git a/pkg/jobs/scheduled_job_executor.go b/pkg/jobs/scheduled_job_executor.go index 26461b24a81d..6626a51b0441 100644 --- a/pkg/jobs/scheduled_job_executor.go +++ b/pkg/jobs/scheduled_job_executor.go @@ -58,6 +58,15 @@ type ScheduledJobExecutor interface { schedule *ScheduledJob, ex sqlutil.InternalExecutor) (string, error) } +// ScheduledJobController is an interface describing hooks that will execute +// when controlling a scheduled job. +type ScheduledJobController interface { + // OnDrop runs before the passed in `schedule` is dropped as part of a `DROP + // SCHEDULE` query. + OnDrop(ctx context.Context, scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, schedule *ScheduledJob, txn *kv.Txn) error +} + // ScheduledJobExecutorFactory is a callback to create a ScheduledJobExecutor. type ScheduledJobExecutorFactory = func() (ScheduledJobExecutor, error) diff --git a/pkg/scheduledjobs/BUILD.bazel b/pkg/scheduledjobs/BUILD.bazel index e57b3b88024c..87164e407736 100644 --- a/pkg/scheduledjobs/BUILD.bazel +++ b/pkg/scheduledjobs/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/base", "//pkg/kv", + "//pkg/kv/kvserver/protectedts", "//pkg/security", "//pkg/settings/cluster", "//pkg/sql/sqlutil", diff --git a/pkg/scheduledjobs/env.go b/pkg/scheduledjobs/env.go index 9643cec16f74..b4c88986ed36 100644 --- a/pkg/scheduledjobs/env.go +++ b/pkg/scheduledjobs/env.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -72,3 +73,35 @@ func (e *prodJobSchedulerEnvImpl) Now() time.Time { func (e *prodJobSchedulerEnvImpl) NowExpr() string { return "current_timestamp()" } + +// ScheduleControllerEnv is an environment for controlling (DROP, PAUSE) +// scheduled jobs. +type ScheduleControllerEnv interface { + InternalExecutor() sqlutil.InternalExecutor + PTSProvider() protectedts.Provider +} + +// ProdScheduleControllerEnvImpl is the production implementation of +// ScheduleControllerEnv. +type ProdScheduleControllerEnvImpl struct { + pts protectedts.Provider + ie sqlutil.InternalExecutor +} + +// MakeProdScheduleControllerEnv returns a ProdScheduleControllerEnvImpl +// instance. +func MakeProdScheduleControllerEnv( + pts protectedts.Provider, ie sqlutil.InternalExecutor, +) *ProdScheduleControllerEnvImpl { + return &ProdScheduleControllerEnvImpl{pts: pts, ie: ie} +} + +// InternalExecutor implements the ScheduleControllerEnv interface. +func (c *ProdScheduleControllerEnvImpl) InternalExecutor() sqlutil.InternalExecutor { + return c.ie +} + +// PTSProvider implements the ScheduleControllerEnv interface. +func (c *ProdScheduleControllerEnvImpl) PTSProvider() protectedts.Provider { + return c.pts +} diff --git a/pkg/sql/control_schedules.go b/pkg/sql/control_schedules.go index b14731e9c4c3..022b0d11f5d1 100644 --- a/pkg/sql/control_schedules.go +++ b/pkg/sql/control_schedules.go @@ -68,7 +68,7 @@ func loadSchedule(params runParams, scheduleID tree.Datum) (*jobs.ScheduledJob, "load-schedule", params.EvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, fmt.Sprintf( - "SELECT schedule_id, schedule_expr FROM %s WHERE schedule_id = $1", + "SELECT schedule_id, schedule_expr, executor_type, execution_args FROM %s WHERE schedule_id = $1", env.ScheduledJobsTableName(), ), scheduleID) @@ -143,6 +143,20 @@ func (n *controlSchedulesNode) startExec(params runParams) error { err = updateSchedule(params, schedule) } case tree.DropSchedule: + var ex jobs.ScheduledJobExecutor + ex, err = jobs.GetScheduledJobExecutor(schedule.ExecutorType()) + if err != nil { + return errors.Wrap(err, "failed to get scheduled job executor during drop") + } + if controller, ok := ex.(jobs.ScheduledJobController); ok { + scheduleControllerEnv := scheduledjobs.MakeProdScheduleControllerEnv( + params.ExecCfg().ProtectedTimestampProvider, params.ExecCfg().InternalExecutor) + if err := controller.OnDrop(params.ctx, scheduleControllerEnv, + scheduledjobs.ProdJobSchedulerEnv, schedule, + params.extendedEvalCtx.Txn); err != nil { + return errors.Wrap(err, "failed to run OnDrop") + } + } err = deleteSchedule(params, schedule.ScheduleID()) default: err = errors.AssertionFailedf("unhandled command %s", n.command)