From 179fc2c4ba18cd5c141c7e74a3f0bff23207d08b Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 10 Aug 2020 11:21:28 -0400 Subject: [PATCH] bulkio: Hook backup job resume to notify schedules upon completion. Notify scheduled jobs system when backup job completes. Release Notes: None --- pkg/ccl/backupccl/backup_job.go | 25 ++++++++++++++++++ pkg/ccl/backupccl/schedule_exec.go | 7 +---- pkg/jobs/executor_impl.go | 11 +++----- pkg/jobs/executor_impl_test.go | 8 ++---- pkg/jobs/job_scheduler_test.go | 7 +---- pkg/jobs/scheduled_job_executor.go | 21 ++++++--------- pkg/jobs/scheduled_job_executor_test.go | 35 +++---------------------- 7 files changed, 44 insertions(+), 70 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 64cc343db653..4371ac13d250 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/storage/cloudimpl" @@ -570,9 +571,27 @@ func (b *backupResumer) Resume( } } + b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg().InternalExecutor) + return nil } +func (b *backupResumer) maybeNotifyScheduledJobCompletion( + ctx context.Context, jobStatus jobs.Status, ex sqlutil.InternalExecutor, +) { + if b.job.CreatedBy() == nil || b.job.CreatedBy().Name != jobs.CreatedByScheduledJobs { + return + } + info := b.job.CreatedBy() + + if err := jobs.NotifyJobTermination( + ctx, nil /* env */, *b.job.ID(), jobStatus, info.ID, ex, nil); err != nil { + log.Warningf(ctx, + "failed to notify schedule %d of completion of job %d; err=%s", + info.ID, *b.job.ID(), err) + } +} + func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { details := b.job.Details().(jobspb.BackupDetails) var backupManifest BackupManifest @@ -593,6 +612,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { + defer b.maybeNotifyScheduledJobCompletion( + ctx, + jobs.StatusFailed, + phs.(sql.PlanHookState).ExecCfg().InternalExecutor, + ) + telemetry.Count("backup.total.failed") telemetry.CountBucketed("backup.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds())) diff --git a/pkg/ccl/backupccl/schedule_exec.go b/pkg/ccl/backupccl/schedule_exec.go index 3222d1db9357..6931b347dd67 100644 --- a/pkg/ccl/backupccl/schedule_exec.go +++ b/pkg/ccl/backupccl/schedule_exec.go @@ -114,12 +114,7 @@ func (se *scheduledBackupExecutor) ExecuteJob( // NotifyJobTermination implements jobs.ScheduledJobExecutor interface. func (se *scheduledBackupExecutor) NotifyJobTermination( - ctx context.Context, - cfg *scheduledjobs.JobExecutionConfig, - env scheduledjobs.JobSchedulerEnv, - md *jobs.JobMetadata, - sj *jobs.ScheduledJob, - txn *kv.Txn, + ctx context.Context, jobID int64, jobStatus jobs.Status, schedule *jobs.ScheduledJob, txn *kv.Txn, ) error { return errors.New("unimplemented yet") } diff --git a/pkg/jobs/executor_impl.go b/pkg/jobs/executor_impl.go index 7b87c2743911..51818c84e68b 100644 --- a/pkg/jobs/executor_impl.go +++ b/pkg/jobs/executor_impl.go @@ -69,16 +69,11 @@ func (e *inlineScheduledJobExecutor) ExecuteJob( // NotifyJobTermination implements ScheduledJobExecutor interface. func (e *inlineScheduledJobExecutor) NotifyJobTermination( - _ context.Context, - _ *scheduledjobs.JobExecutionConfig, - _ scheduledjobs.JobSchedulerEnv, - md *JobMetadata, - schedule *ScheduledJob, - _ *kv.Txn, + ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, _ *kv.Txn, ) error { // For now, only interested in failed status. - if md.Status == StatusFailed { - DefaultHandleFailedRun(schedule, md.ID, nil) + if jobStatus == StatusFailed { + DefaultHandleFailedRun(schedule, jobID, nil) } return nil } diff --git a/pkg/jobs/executor_impl_test.go b/pkg/jobs/executor_impl_test.go index 5fe047069112..77ae942859d2 100644 --- a/pkg/jobs/executor_impl_test.go +++ b/pkg/jobs/executor_impl_test.go @@ -58,12 +58,8 @@ func TestInlineExecutorFailedJobsHandling(t *testing.T) { require.NoError(t, j.Create(ctx, h.cfg.InternalExecutor, nil)) // Pretend we failed running; we expect job to be rescheduled. - md := &JobMetadata{ - ID: 123, - Status: "failed", - } - - require.NoError(t, NotifyJobTermination(ctx, h.cfg, h.env, md, j.ScheduleID(), nil)) + require.NoError(t, NotifyJobTermination( + ctx, h.env, 123, StatusFailed, j.ScheduleID(), h.cfg.InternalExecutor, nil)) // Verify nextRun updated loaded := h.loadSchedule(t, j.ScheduleID()) diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 9743dcbf7d08..b8696fe2efe8 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -231,12 +231,7 @@ func (n *recordScheduleExecutor) ExecuteJob( } func (n *recordScheduleExecutor) NotifyJobTermination( - _ context.Context, - _ *scheduledjobs.JobExecutionConfig, - _ scheduledjobs.JobSchedulerEnv, - _ *JobMetadata, - _ *ScheduledJob, - _ *kv.Txn, + ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, txn *kv.Txn, ) error { return nil } diff --git a/pkg/jobs/scheduled_job_executor.go b/pkg/jobs/scheduled_job_executor.go index 9684a99b465c..2dd39ea6ed58 100644 --- a/pkg/jobs/scheduled_job_executor.go +++ b/pkg/jobs/scheduled_job_executor.go @@ -40,9 +40,8 @@ type ScheduledJobExecutor interface { // Modifications to the ScheduledJob object will be persisted. NotifyJobTermination( ctx context.Context, - cfg *scheduledjobs.JobExecutionConfig, - env scheduledjobs.JobSchedulerEnv, - md *JobMetadata, + jobID int64, + jobStatus Status, schedule *ScheduledJob, txn *kv.Txn, ) error @@ -94,36 +93,32 @@ func DefaultHandleFailedRun(schedule *ScheduledJob, jobID int64, err error) { // with the job status changes. func NotifyJobTermination( ctx context.Context, - cfg *scheduledjobs.JobExecutionConfig, env scheduledjobs.JobSchedulerEnv, - md *JobMetadata, + jobID int64, + jobStatus Status, scheduleID int64, + ex sqlutil.InternalExecutor, txn *kv.Txn, ) error { - if !md.Status.Terminal() { - return errors.Newf( - "job completion expects terminal state, found %s instead for job %d", md.Status, md.ID) - } - if env == nil { env = scheduledjobs.ProdJobSchedulerEnv } // Get the executor for this schedule. schedule, executor, err := lookupScheduleAndExecutor( - ctx, env, cfg.InternalExecutor, scheduleID, txn) + ctx, env, ex, scheduleID, txn) if err != nil { return err } // Delegate handling of the job termination to the executor. - err = executor.NotifyJobTermination(ctx, cfg, env, md, schedule, txn) + err = executor.NotifyJobTermination(ctx, jobID, jobStatus, schedule, txn) if err != nil { return err } // Update this schedule in case executor made changes to it. - return schedule.Update(ctx, cfg.InternalExecutor, txn) + return schedule.Update(ctx, ex, txn) } func lookupScheduleAndExecutor( diff --git a/pkg/jobs/scheduled_job_executor_test.go b/pkg/jobs/scheduled_job_executor_test.go index 58963b4354aa..dfac665f5e61 100644 --- a/pkg/jobs/scheduled_job_executor_test.go +++ b/pkg/jobs/scheduled_job_executor_test.go @@ -14,7 +14,6 @@ import ( "context" "testing" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -39,14 +38,9 @@ func (s *statusTrackingExecutor) ExecuteJob( } func (s *statusTrackingExecutor) NotifyJobTermination( - _ context.Context, - _ *scheduledjobs.JobExecutionConfig, - _ scheduledjobs.JobSchedulerEnv, - md *JobMetadata, - _ *ScheduledJob, - _ *kv.Txn, + ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, txn *kv.Txn, ) error { - s.counts[md.Status]++ + s.counts[jobStatus]++ return nil } @@ -56,23 +50,6 @@ func newStatusTrackingExecutor() *statusTrackingExecutor { return &statusTrackingExecutor{counts: make(map[Status]int)} } -func TestNotifyJobTerminationExpectsTerminalState(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - for _, s := range []Status{ - StatusPending, StatusRunning, StatusPaused, StatusReverting, - StatusCancelRequested, StatusPauseRequested, - } { - md := &JobMetadata{ - ID: 123, - Status: s, - } - require.Error(t, NotifyJobTermination( - context.Background(), nil, nil, md, 321, nil)) - } -} - func TestScheduledJobExecutorRegistration(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -103,12 +80,8 @@ func TestJobTerminationNotification(t *testing.T) { // Pretend it completes multiple runs with terminal statuses. for _, s := range []Status{StatusCanceled, StatusFailed, StatusSucceeded} { - md := &JobMetadata{ - ID: 123, - Status: s, - Payload: &jobspb.Payload{}, - } - require.NoError(t, NotifyJobTermination(ctx, h.cfg, h.env, md, schedule.ScheduleID(), nil)) + require.NoError(t, NotifyJobTermination( + ctx, h.env, 123, s, schedule.ScheduleID(), h.cfg.InternalExecutor, nil)) } // Verify counts.