Skip to content

Commit

Permalink
bulkio: Hook backup job resume to notify schedules upon completion.
Browse files Browse the repository at this point in the history
Notify scheduled jobs system when backup job completes.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Aug 10, 2020
1 parent c285413 commit 179fc2c
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 70 deletions.
25 changes: 25 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
Expand Down Expand Up @@ -570,9 +571,27 @@ func (b *backupResumer) Resume(
}
}

b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg().InternalExecutor)

return nil
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, ex sqlutil.InternalExecutor,
) {
if b.job.CreatedBy() == nil || b.job.CreatedBy().Name != jobs.CreatedByScheduledJobs {
return
}
info := b.job.CreatedBy()

if err := jobs.NotifyJobTermination(
ctx, nil /* env */, *b.job.ID(), jobStatus, info.ID, ex, nil); err != nil {
log.Warningf(ctx,
"failed to notify schedule %d of completion of job %d; err=%s",
info.ID, *b.job.ID(), err)
}
}

func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {
details := b.job.Details().(jobspb.BackupDetails)
var backupManifest BackupManifest
Expand All @@ -593,6 +612,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
defer b.maybeNotifyScheduledJobCompletion(
ctx,
jobs.StatusFailed,
phs.(sql.PlanHookState).ExecCfg().InternalExecutor,
)

telemetry.Count("backup.total.failed")
telemetry.CountBucketed("backup.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()))
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,7 @@ func (se *scheduledBackupExecutor) ExecuteJob(

// NotifyJobTermination implements jobs.ScheduledJobExecutor interface.
func (se *scheduledBackupExecutor) NotifyJobTermination(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
md *jobs.JobMetadata,
sj *jobs.ScheduledJob,
txn *kv.Txn,
ctx context.Context, jobID int64, jobStatus jobs.Status, schedule *jobs.ScheduledJob, txn *kv.Txn,
) error {
return errors.New("unimplemented yet")
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/jobs/executor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,11 @@ func (e *inlineScheduledJobExecutor) ExecuteJob(

// NotifyJobTermination implements ScheduledJobExecutor interface.
func (e *inlineScheduledJobExecutor) NotifyJobTermination(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
schedule *ScheduledJob,
_ *kv.Txn,
ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, _ *kv.Txn,
) error {
// For now, only interested in failed status.
if md.Status == StatusFailed {
DefaultHandleFailedRun(schedule, md.ID, nil)
if jobStatus == StatusFailed {
DefaultHandleFailedRun(schedule, jobID, nil)
}
return nil
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/jobs/executor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ func TestInlineExecutorFailedJobsHandling(t *testing.T) {
require.NoError(t, j.Create(ctx, h.cfg.InternalExecutor, nil))

// Pretend we failed running; we expect job to be rescheduled.
md := &JobMetadata{
ID: 123,
Status: "failed",
}

require.NoError(t, NotifyJobTermination(ctx, h.cfg, h.env, md, j.ScheduleID(), nil))
require.NoError(t, NotifyJobTermination(
ctx, h.env, 123, StatusFailed, j.ScheduleID(), h.cfg.InternalExecutor, nil))

// Verify nextRun updated
loaded := h.loadSchedule(t, j.ScheduleID())
Expand Down
7 changes: 1 addition & 6 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,7 @@ func (n *recordScheduleExecutor) ExecuteJob(
}

func (n *recordScheduleExecutor) NotifyJobTermination(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
_ *JobMetadata,
_ *ScheduledJob,
_ *kv.Txn,
ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, txn *kv.Txn,
) error {
return nil
}
Expand Down
21 changes: 8 additions & 13 deletions pkg/jobs/scheduled_job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ type ScheduledJobExecutor interface {
// Modifications to the ScheduledJob object will be persisted.
NotifyJobTermination(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
jobID int64,
jobStatus Status,
schedule *ScheduledJob,
txn *kv.Txn,
) error
Expand Down Expand Up @@ -94,36 +93,32 @@ func DefaultHandleFailedRun(schedule *ScheduledJob, jobID int64, err error) {
// with the job status changes.
func NotifyJobTermination(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
jobID int64,
jobStatus Status,
scheduleID int64,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
if !md.Status.Terminal() {
return errors.Newf(
"job completion expects terminal state, found %s instead for job %d", md.Status, md.ID)
}

if env == nil {
env = scheduledjobs.ProdJobSchedulerEnv
}

// Get the executor for this schedule.
schedule, executor, err := lookupScheduleAndExecutor(
ctx, env, cfg.InternalExecutor, scheduleID, txn)
ctx, env, ex, scheduleID, txn)
if err != nil {
return err
}

// Delegate handling of the job termination to the executor.
err = executor.NotifyJobTermination(ctx, cfg, env, md, schedule, txn)
err = executor.NotifyJobTermination(ctx, jobID, jobStatus, schedule, txn)
if err != nil {
return err
}

// Update this schedule in case executor made changes to it.
return schedule.Update(ctx, cfg.InternalExecutor, txn)
return schedule.Update(ctx, ex, txn)
}

func lookupScheduleAndExecutor(
Expand Down
35 changes: 4 additions & 31 deletions pkg/jobs/scheduled_job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -39,14 +38,9 @@ func (s *statusTrackingExecutor) ExecuteJob(
}

func (s *statusTrackingExecutor) NotifyJobTermination(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
_ *ScheduledJob,
_ *kv.Txn,
ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, txn *kv.Txn,
) error {
s.counts[md.Status]++
s.counts[jobStatus]++
return nil
}

Expand All @@ -56,23 +50,6 @@ func newStatusTrackingExecutor() *statusTrackingExecutor {
return &statusTrackingExecutor{counts: make(map[Status]int)}
}

func TestNotifyJobTerminationExpectsTerminalState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, s := range []Status{
StatusPending, StatusRunning, StatusPaused, StatusReverting,
StatusCancelRequested, StatusPauseRequested,
} {
md := &JobMetadata{
ID: 123,
Status: s,
}
require.Error(t, NotifyJobTermination(
context.Background(), nil, nil, md, 321, nil))
}
}

func TestScheduledJobExecutorRegistration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -103,12 +80,8 @@ func TestJobTerminationNotification(t *testing.T) {

// Pretend it completes multiple runs with terminal statuses.
for _, s := range []Status{StatusCanceled, StatusFailed, StatusSucceeded} {
md := &JobMetadata{
ID: 123,
Status: s,
Payload: &jobspb.Payload{},
}
require.NoError(t, NotifyJobTermination(ctx, h.cfg, h.env, md, schedule.ScheduleID(), nil))
require.NoError(t, NotifyJobTermination(
ctx, h.env, 123, s, schedule.ScheduleID(), h.cfg.InternalExecutor, nil))
}

// Verify counts.
Expand Down

0 comments on commit 179fc2c

Please sign in to comment.