From a1b858e641cafedfb588aa5860cfc6caace79109 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 20 Aug 2021 15:31:02 -0400 Subject: [PATCH 1/2] jobs, backupccl: DROP SCHEDULE clears DependentID and releases PTS This change teaches `DROP SCHEDULE` to clear the dependent ID linkage from the dependent schedule. It also releases the protected timestamp (if any) on the schedule being dropped. This pts would have been written if chaining of pts was enabled. Release note: None --- pkg/ccl/backupccl/schedule_exec.go | 59 ++++++++ .../backupccl/schedule_pts_chaining_test.go | 134 +++++++++++++----- pkg/jobs/scheduled_job_executor.go | 9 ++ pkg/scheduledjobs/BUILD.bazel | 1 + pkg/scheduledjobs/env.go | 33 +++++ pkg/sql/control_schedules.go | 16 ++- 6 files changed, 212 insertions(+), 40 deletions(-) diff --git a/pkg/ccl/backupccl/schedule_exec.go b/pkg/ccl/backupccl/schedule_exec.go index 944c744f5d82..090500d14acc 100644 --- a/pkg/ccl/backupccl/schedule_exec.go +++ b/pkg/ccl/backupccl/schedule_exec.go @@ -354,6 +354,65 @@ func extractBackupStatement(sj *jobs.ScheduledJob) (*annotatedBackupStatement, e return nil, errors.Newf("unexpect node type %T", node) } +var _ jobs.ScheduledJobController = &scheduledBackupExecutor{} + +func unlinkDependentSchedule( + ctx context.Context, + scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, + txn *kv.Txn, + args *ScheduledBackupExecutionArgs, +) error { + if args.DependentScheduleID == 0 { + return nil + } + + // Load the dependent schedule. + dependentSj, dependentArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, + scheduleControllerEnv.InternalExecutor().(*sql.InternalExecutor), args.DependentScheduleID) + if err != nil { + if jobs.HasScheduledJobNotFoundError(err) { + log.Warningf(ctx, "failed to resolve dependent schedule %d", args.DependentScheduleID) + return nil + } + return errors.Wrapf(err, "failed to resolve dependent schedule %d", args.DependentScheduleID) + } + + // Clear the DependentID field since we are dropping the record associated + // with it. + dependentArgs.DependentScheduleID = 0 + any, err := pbtypes.MarshalAny(dependentArgs) + if err != nil { + return err + } + dependentSj.SetExecutionDetails(dependentSj.ExecutorType(), jobspb.ExecutionArguments{Args: any}) + return dependentSj.Update(ctx, scheduleControllerEnv.InternalExecutor(), txn) +} + +// OnDrop implements the ScheduledJobController interface. +// The method is responsible for releasing the pts record stored on the schedule +// if schedules.backup.gc_protection.enabled = true. +// It is also responsible for unlinking the dependent schedule by clearing the +// DependentID. +func (e *scheduledBackupExecutor) OnDrop( + ctx context.Context, + scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, + sj *jobs.ScheduledJob, + txn *kv.Txn, +) error { + args := &ScheduledBackupExecutionArgs{} + if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, args); err != nil { + return errors.Wrap(err, "un-marshaling args") + } + + if err := unlinkDependentSchedule(ctx, scheduleControllerEnv, env, txn, args); err != nil { + return errors.Wrap(err, "failed to unlink dependent schedule") + } + return releaseProtectedTimestamp(ctx, txn, scheduleControllerEnv.PTSProvider(), + args.ProtectedTimestampRecord) +} + func init() { jobs.RegisterScheduledJobExecutorFactory( tree.ScheduledBackupExecutor.InternalName(), diff --git a/pkg/ccl/backupccl/schedule_pts_chaining_test.go b/pkg/ccl/backupccl/schedule_pts_chaining_test.go index aa051f47fcf4..052dfb28e76e 100644 --- a/pkg/ccl/backupccl/schedule_pts_chaining_test.go +++ b/pkg/ccl/backupccl/schedule_pts_chaining_test.go @@ -54,6 +54,28 @@ func (th *testHelper) createSchedules(t *testing.T, name string) (int64, int64, } } +func checkPTSRecord( + ctx context.Context, + t *testing.T, + th *testHelper, + id uuid.UUID, + schedule *jobs.ScheduledJob, + timestamp hlc.Timestamp, +) { + var ptsRecord *ptpb.Record + var err error + require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider. + GetRecord(context.Background(), txn, id) + require.NoError(t, err) + return nil + })) + encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10)) + require.Equal(t, encodedScheduleID, ptsRecord.Meta) + require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType) + require.Equal(t, timestamp, ptsRecord.Timestamp) +} + func TestScheduleBackupChainsProtectedTimestampRecords(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -93,21 +115,6 @@ INSERT INTO t values (1), (10), (100); } ctx := context.Background() - checkPTSRecord := func(t *testing.T, id uuid.UUID, schedule *jobs.ScheduledJob, - timestamp hlc.Timestamp) { - var ptsRecord *ptpb.Record - var err error - require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider. - GetRecord(context.Background(), txn, id) - require.NoError(t, err) - return nil - })) - encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10)) - require.Equal(t, encodedScheduleID, ptsRecord.Meta) - require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType) - require.Equal(t, timestamp, ptsRecord.Timestamp) - } fullID, incID, cleanupSchedules := th.createSchedules(t, "foo") defer cleanupSchedules() @@ -130,14 +137,14 @@ INSERT INTO t values (1), (10), (100); // Check that there is a PTS record on the incremental schedule. ptsOnIncID := incArgs.ProtectedTimestampRecord require.NotNil(t, ptsOnIncID) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()}) // Force inc backup to execute. runSchedule(t, incSchedule) // Check that the pts record was updated to the inc backups' EndTime. - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[1].Round(time.Microsecond).UnixNano()}) // Pause the incSchedule so that it doesn't run when we forward the env time @@ -167,7 +174,7 @@ INSERT INTO t values (1), (10), (100); require.True(t, errors.Is(err, protectedts.ErrNotExists)) return nil })) - checkPTSRecord(t, *incArgs.ProtectedTimestampRecord, incSchedule, + checkPTSRecord(ctx, t, th, *incArgs.ProtectedTimestampRecord, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[2].Round(time.Microsecond).UnixNano()}) } @@ -210,21 +217,6 @@ INSERT INTO t values (1), (10), (100); } ctx := context.Background() - checkPTSRecord := func(t *testing.T, id uuid.UUID, schedule *jobs.ScheduledJob, - timestamp hlc.Timestamp) { - var ptsRecord *ptpb.Record - var err error - require.NoError(t, th.server.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ptsRecord, err = th.server.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider. - GetRecord(context.Background(), txn, id) - require.NoError(t, err) - return nil - })) - encodedScheduleID := []byte(strconv.FormatInt(schedule.ScheduleID(), 10)) - require.Equal(t, encodedScheduleID, ptsRecord.Meta) - require.Equal(t, jobsprotectedts.GetMetaType(jobsprotectedts.Schedules), ptsRecord.MetaType) - require.Equal(t, timestamp, ptsRecord.Timestamp) - } t.Run("inc schedule is dropped", func(t *testing.T) { fullID, incID, cleanupSchedules := th.createSchedules(t, "foo") @@ -255,30 +247,94 @@ INSERT INTO t values (1), (10), (100); runSchedule(t, fullSchedule) // Check that there is no PTS record on the full schedule. + require.Nil(t, fullArgs.ProtectedTimestampRecord) + + // Check that there is a PTS record on the incremental schedule. incSchedule := th.loadSchedule(t, incID) _, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil, th.server.InternalExecutor().(*sql.InternalExecutor), incID) require.NoError(t, err) - require.Nil(t, fullArgs.ProtectedTimestampRecord) - - // Check that there is a PTS record on the incremental schedule. ptsOnIncID := incArgs.ProtectedTimestampRecord require.NotNil(t, ptsOnIncID) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()}) th.sqlDB.Exec(t, `DROP SCHEDULE $1`, fullID) // Run the inc schedule. runSchedule(t, incSchedule) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[1].Round(time.Microsecond).UnixNano()}) clearSuccessfulJobForSchedule(t, incSchedule) // Run it again. incSchedule = th.loadSchedule(t, incID) runSchedule(t, incSchedule) - checkPTSRecord(t, *ptsOnIncID, incSchedule, + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, hlc.Timestamp{WallTime: backupAsOfTimes[2].Round(time.Microsecond).UnixNano()}) }) } + +func TestDropScheduleReleasePTSRecord(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + th, cleanup := newTestHelper(t) + defer cleanup() + + th.sqlDB.Exec(t, `SET CLUSTER SETTING schedules.backup.gc_protection.enabled = true`) + th.sqlDB.Exec(t, ` +CREATE DATABASE db; +USE db; +CREATE TABLE t(a int); +INSERT INTO t values (1), (10), (100); +`) + + backupAsOfTimes := make([]time.Time, 0) + th.cfg.TestingKnobs.(*jobs.TestingKnobs).OverrideAsOfClause = func(clause *tree.AsOfClause) { + backupAsOfTime := th.cfg.DB.Clock().PhysicalTime() + expr, err := tree.MakeDTimestampTZ(backupAsOfTime, time.Microsecond) + require.NoError(t, err) + clause.Expr = expr + backupAsOfTimes = append(backupAsOfTimes, backupAsOfTime) + } + + runSchedule := func(t *testing.T, schedule *jobs.ScheduledJob) { + th.env.SetTime(schedule.NextRun().Add(time.Second)) + require.NoError(t, th.executeSchedules()) + th.waitForSuccessfulScheduledJob(t, schedule.ScheduleID()) + } + + ctx := context.Background() + + fullID, incID, cleanupSchedules := th.createSchedules(t, "foo") + defer cleanupSchedules() + defer func() { backupAsOfTimes = backupAsOfTimes[:0] }() + + fullSchedule := th.loadSchedule(t, fullID) + // Force full backup to execute (this unpauses incremental, which should be a no-op). + runSchedule(t, fullSchedule) + + incSchedule := th.loadSchedule(t, incID) + _, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil, + th.server.InternalExecutor().(*sql.InternalExecutor), incID) + require.NoError(t, err) + ptsOnIncID := incArgs.ProtectedTimestampRecord + require.NotNil(t, ptsOnIncID) + checkPTSRecord(ctx, t, th, *ptsOnIncID, incSchedule, + hlc.Timestamp{WallTime: backupAsOfTimes[0].Round(time.Microsecond).UnixNano()}) + + th.sqlDB.Exec(t, `DROP SCHEDULE $1`, incID) + + // Ensure that the pts record on the incremental schedule has been released + // by the DROP. + var numRows int + th.sqlDB.QueryRow(t, `SELECT num_records FROM system.protected_ts_meta`).Scan(&numRows) + require.Zero(t, numRows) + + // Also ensure that the full schedule doesn't have DependentID set anymore. + _, fullArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, th.env, nil, + th.server.InternalExecutor().(*sql.InternalExecutor), fullID) + require.NoError(t, err) + require.Zero(t, fullArgs.DependentScheduleID) +} diff --git a/pkg/jobs/scheduled_job_executor.go b/pkg/jobs/scheduled_job_executor.go index 26461b24a81d..6626a51b0441 100644 --- a/pkg/jobs/scheduled_job_executor.go +++ b/pkg/jobs/scheduled_job_executor.go @@ -58,6 +58,15 @@ type ScheduledJobExecutor interface { schedule *ScheduledJob, ex sqlutil.InternalExecutor) (string, error) } +// ScheduledJobController is an interface describing hooks that will execute +// when controlling a scheduled job. +type ScheduledJobController interface { + // OnDrop runs before the passed in `schedule` is dropped as part of a `DROP + // SCHEDULE` query. + OnDrop(ctx context.Context, scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, schedule *ScheduledJob, txn *kv.Txn) error +} + // ScheduledJobExecutorFactory is a callback to create a ScheduledJobExecutor. type ScheduledJobExecutorFactory = func() (ScheduledJobExecutor, error) diff --git a/pkg/scheduledjobs/BUILD.bazel b/pkg/scheduledjobs/BUILD.bazel index e57b3b88024c..87164e407736 100644 --- a/pkg/scheduledjobs/BUILD.bazel +++ b/pkg/scheduledjobs/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/base", "//pkg/kv", + "//pkg/kv/kvserver/protectedts", "//pkg/security", "//pkg/settings/cluster", "//pkg/sql/sqlutil", diff --git a/pkg/scheduledjobs/env.go b/pkg/scheduledjobs/env.go index 9643cec16f74..b4c88986ed36 100644 --- a/pkg/scheduledjobs/env.go +++ b/pkg/scheduledjobs/env.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -72,3 +73,35 @@ func (e *prodJobSchedulerEnvImpl) Now() time.Time { func (e *prodJobSchedulerEnvImpl) NowExpr() string { return "current_timestamp()" } + +// ScheduleControllerEnv is an environment for controlling (DROP, PAUSE) +// scheduled jobs. +type ScheduleControllerEnv interface { + InternalExecutor() sqlutil.InternalExecutor + PTSProvider() protectedts.Provider +} + +// ProdScheduleControllerEnvImpl is the production implementation of +// ScheduleControllerEnv. +type ProdScheduleControllerEnvImpl struct { + pts protectedts.Provider + ie sqlutil.InternalExecutor +} + +// MakeProdScheduleControllerEnv returns a ProdScheduleControllerEnvImpl +// instance. +func MakeProdScheduleControllerEnv( + pts protectedts.Provider, ie sqlutil.InternalExecutor, +) *ProdScheduleControllerEnvImpl { + return &ProdScheduleControllerEnvImpl{pts: pts, ie: ie} +} + +// InternalExecutor implements the ScheduleControllerEnv interface. +func (c *ProdScheduleControllerEnvImpl) InternalExecutor() sqlutil.InternalExecutor { + return c.ie +} + +// PTSProvider implements the ScheduleControllerEnv interface. +func (c *ProdScheduleControllerEnvImpl) PTSProvider() protectedts.Provider { + return c.pts +} diff --git a/pkg/sql/control_schedules.go b/pkg/sql/control_schedules.go index b14731e9c4c3..022b0d11f5d1 100644 --- a/pkg/sql/control_schedules.go +++ b/pkg/sql/control_schedules.go @@ -68,7 +68,7 @@ func loadSchedule(params runParams, scheduleID tree.Datum) (*jobs.ScheduledJob, "load-schedule", params.EvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, fmt.Sprintf( - "SELECT schedule_id, schedule_expr FROM %s WHERE schedule_id = $1", + "SELECT schedule_id, schedule_expr, executor_type, execution_args FROM %s WHERE schedule_id = $1", env.ScheduledJobsTableName(), ), scheduleID) @@ -143,6 +143,20 @@ func (n *controlSchedulesNode) startExec(params runParams) error { err = updateSchedule(params, schedule) } case tree.DropSchedule: + var ex jobs.ScheduledJobExecutor + ex, err = jobs.GetScheduledJobExecutor(schedule.ExecutorType()) + if err != nil { + return errors.Wrap(err, "failed to get scheduled job executor during drop") + } + if controller, ok := ex.(jobs.ScheduledJobController); ok { + scheduleControllerEnv := scheduledjobs.MakeProdScheduleControllerEnv( + params.ExecCfg().ProtectedTimestampProvider, params.ExecCfg().InternalExecutor) + if err := controller.OnDrop(params.ctx, scheduleControllerEnv, + scheduledjobs.ProdJobSchedulerEnv, schedule, + params.extendedEvalCtx.Txn); err != nil { + return errors.Wrap(err, "failed to run OnDrop") + } + } err = deleteSchedule(params, schedule.ScheduleID()) default: err = errors.AssertionFailedf("unhandled command %s", n.command) From 2e809e44769508b583753e27544f8d585fc0ea98 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 20 Aug 2021 17:35:54 -0400 Subject: [PATCH 2/2] backupccl: fix `SHOW CREATE SCHEDULE` options In the previous commit `DROP SCHEDULE` wipes the dependent ID linkage. `SHOW CREATE SCHEDULE` on an incremental schedule whose dependent full schedule has been dropped will now print a `CREATE SCHEDULE` statement with the full backup recurrence cleared. This means that if the user were to copy paste the command, we would choose a full backup recurrence based on the incremental recurrence. This commit also fixes the schedule options that are printed as the output of `SHOW CREATE SCHEDULE`. Previously, these would be printed as the string representations of the enum values, but this was not round-trippable. Informs: https://github.com/cockroachdb/cockroach/issues/69129 Release note: None --- pkg/ccl/backupccl/create_scheduled_backup.go | 32 +++++++++++++++++++ .../backupccl/create_scheduled_backup_test.go | 8 +++-- pkg/ccl/backupccl/schedule_exec.go | 22 +++++++++++-- 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/backupccl/create_scheduled_backup.go b/pkg/ccl/backupccl/create_scheduled_backup.go index edcf263ebfd9..e8aa1275c82a 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup.go +++ b/pkg/ccl/backupccl/create_scheduled_backup.go @@ -114,6 +114,38 @@ func parseWaitBehavior(wait string, details *jobspb.ScheduleDetails) error { return nil } +func parseOnPreviousRunningOption( + onPreviousRunning jobspb.ScheduleDetails_WaitBehavior, +) (string, error) { + var onPreviousRunningOption string + switch onPreviousRunning { + case jobspb.ScheduleDetails_WAIT: + onPreviousRunningOption = "WAIT" + case jobspb.ScheduleDetails_NO_WAIT: + onPreviousRunningOption = "START" + case jobspb.ScheduleDetails_SKIP: + onPreviousRunningOption = "SKIP" + default: + return onPreviousRunningOption, errors.Newf("%s is an invalid onPreviousRunning option", onPreviousRunning.String()) + } + return onPreviousRunningOption, nil +} + +func parseOnErrorOption(onError jobspb.ScheduleDetails_ErrorHandlingBehavior) (string, error) { + var onErrorOption string + switch onError { + case jobspb.ScheduleDetails_RETRY_SCHED: + onErrorOption = "RESCHEDULE" + case jobspb.ScheduleDetails_RETRY_SOON: + onErrorOption = "RETRY" + case jobspb.ScheduleDetails_PAUSE_SCHED: + onErrorOption = "PAUSE" + default: + return onErrorOption, errors.Newf("%s is an invalid onError option", onError.String()) + } + return onErrorOption, nil +} + func makeScheduleDetails(opts map[string]string) (jobspb.ScheduleDetails, error) { var details jobspb.ScheduleDetails if v, ok := opts[optOnExecFailure]; ok { diff --git a/pkg/ccl/backupccl/create_scheduled_backup_test.go b/pkg/ccl/backupccl/create_scheduled_backup_test.go index 09be85a748cb..0903ba25d18b 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup_test.go +++ b/pkg/ccl/backupccl/create_scheduled_backup_test.go @@ -968,6 +968,10 @@ func constructExpectedScheduledBackupNode( require.NoError(t, err) firstRun, err := tree.MakeDTimestampTZ(sj.ScheduledRunTime(), time.Microsecond) require.NoError(t, err) + wait, err := parseOnPreviousRunningOption(sj.ScheduleDetails().Wait) + require.NoError(t, err) + onError, err := parseOnErrorOption(sj.ScheduleDetails().OnError) + require.NoError(t, err) scheduleOptions := tree.KVOptions{ tree.KVOption{ Key: optFirstRun, @@ -975,11 +979,11 @@ func constructExpectedScheduledBackupNode( }, tree.KVOption{ Key: optOnExecFailure, - Value: tree.NewDString(sj.ScheduleDetails().OnError.String()), + Value: tree.NewDString(onError), }, tree.KVOption{ Key: optOnPreviousRunning, - Value: tree.NewDString(sj.ScheduleDetails().Wait.String()), + Value: tree.NewDString(wait), }, } sb := &tree.ScheduledBackup{ diff --git a/pkg/ccl/backupccl/schedule_exec.go b/pkg/ccl/backupccl/schedule_exec.go index 090500d14acc..1dd0422b897e 100644 --- a/pkg/ccl/backupccl/schedule_exec.go +++ b/pkg/ccl/backupccl/schedule_exec.go @@ -214,6 +214,16 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement( fullBackup.Recurrence = tree.NewDString(recurrence) recurrence = dependentSchedule.ScheduleExpr() } + } else { + // If sj does not have a dependent schedule and is an incremental backup + // schedule, this is only possible if the dependent full schedule has been + // dropped. + // In this case we set the recurrence to sj's ScheduleExpr() but we leave + // the full backup recurrence empty so that it is decided by the scheduler. + if backupNode.AppendToLatest { + fullBackup.AlwaysFull = false + fullBackup.Recurrence = nil + } } // Pick first_run to be the sooner of the scheduled run time on sj and its @@ -241,6 +251,14 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement( return "", err } + wait, err := parseOnPreviousRunningOption(sj.ScheduleDetails().Wait) + if err != nil { + return "", err + } + onError, err := parseOnErrorOption(sj.ScheduleDetails().OnError) + if err != nil { + return "", err + } scheduleOptions := tree.KVOptions{ tree.KVOption{ Key: optFirstRun, @@ -248,11 +266,11 @@ func (e *scheduledBackupExecutor) GetCreateScheduleStatement( }, tree.KVOption{ Key: optOnExecFailure, - Value: tree.NewDString(sj.ScheduleDetails().OnError.String()), + Value: tree.NewDString(onError), }, tree.KVOption{ Key: optOnPreviousRunning, - Value: tree.NewDString(sj.ScheduleDetails().Wait.String()), + Value: tree.NewDString(wait), }, }