Skip to content

Commit

Permalink
bulkio: Make incremental scheduled backup wait for full backup.
Browse files Browse the repository at this point in the history
Add ability to record schedule groups: set of related schedules.
Use this functionality to makean incremental schedule wait
until the full one completes before it begins its execution.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Aug 19, 2020
1 parent 43f737f commit f04cc3c
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 254 deletions.
276 changes: 153 additions & 123 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ message ScheduledBackupExecutionArgs {
}
BackupType backup_type = 1;
string backup_statement = 2;
int64 unpause_on_success = 3;
}

// RestoreProgress is the information that the RestoreData processor sends back
Expand Down
50 changes: 36 additions & 14 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -32,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
Expand Down Expand Up @@ -571,24 +572,45 @@ func (b *backupResumer) Resume(
}
}

b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg().InternalExecutor)

b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg())
return nil
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, ex sqlutil.InternalExecutor,
ctx context.Context, jobStatus jobs.Status, exec *sql.ExecutorConfig,
) {
if b.job.CreatedBy() == nil || b.job.CreatedBy().Name != jobs.CreatedByScheduledJobs {
return
}
info := b.job.CreatedBy()
env := scheduledjobs.ProdJobSchedulerEnv

if err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Do not rely on b.job containing created_by_id. Query it directly.
datums, err := exec.InternalExecutor.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.NodeUser},
fmt.Sprintf(
"SELECT created_by_id FROM %s WHERE id=$1 AND created_by_type=$2",
env.SystemJobsTableName()),
*b.job.ID(), jobs.CreatedByScheduledJobs)

if err != nil {
return errors.Wrap(err, "schedule info lookup")
}
if datums == nil {
// Not a scheduled backup.
return nil
}

if err := jobs.NotifyJobTermination(
ctx, nil /* env */, *b.job.ID(), jobStatus, info.ID, ex, nil); err != nil {
log.Warningf(ctx,
"failed to notify schedule %d of completion of job %d; err=%s",
info.ID, *b.job.ID(), err)
scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, *b.job.ID(), jobStatus, scheduleID, exec.InternalExecutor, txn); err != nil {
log.Warningf(ctx,
"failed to notify schedule %d of completion of job %d; err=%s",
scheduleID, *b.job.ID(), err)
}
return nil
}); err != nil {
log.Errorf(ctx, "maybeNotifySchedule error: %v", err)
}
}

Expand All @@ -615,7 +637,7 @@ func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) err
defer b.maybeNotifyScheduledJobCompletion(
ctx,
jobs.StatusFailed,
phs.(sql.PlanHookState).ExecCfg().InternalExecutor,
phs.(sql.PlanHookState).ExecCfg(),
)

telemetry.Count("backup.total.failed")
Expand Down
122 changes: 62 additions & 60 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -152,19 +151,6 @@ func computeScheduleRecurrence(
return &scheduleRecurrence{cron, frequency}, nil
}

var humanDurations = map[time.Duration]string{
time.Hour: "hour",
24 * time.Hour: "day",
7 * 24 * time.Hour: "week",
}

func (r *scheduleRecurrence) Humanize() string {
if d, ok := humanDurations[r.frequency]; ok {
return "every " + d
}
return "every " + r.frequency.String()
}

var forceFullBackup *scheduleRecurrence

func pickFullRecurrenceFromIncremental(inc *scheduleRecurrence) *scheduleRecurrence {
Expand Down Expand Up @@ -319,75 +305,85 @@ func doCreateBackupSchedules(

ex := p.ExecCfg().InternalExecutor
return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Create FULL backup schedule.
fullFirstRun := firstRun
if eval.isEnterpriseUser && fullFirstRun == nil && fullRecurrencePicked {
// The enterprise user did not indicate preference when to run full backups,
// and we picked the schedule ourselves.
// Run full backup immediately so that we do not wind up waiting for a long
// time before the first full backup runs. Without full backup, we can't
// execute incrementals.
now := env.Now()
fullFirstRun = &now
}

if err := createBackupSchedule(
ctx, env, p.User(), fullScheduleName, fullRecurrence,
fullFirstRun, details, backupNode, resultsCh, ex, txn,
); err != nil {
return err
}
unpauseOnSuccessID := jobs.InvalidScheduleID

// If needed, create incremental.
if incRecurrence != nil {
backupNode.AppendToLatest = true
inc, err := makeBackupSchedule(
env, p.User(), fullScheduleName+": INCREMENTAL",
incRecurrence, details, unpauseOnSuccessID, backupNode)

if err := createBackupSchedule(
ctx, env, p.User(), fullScheduleName+": INCREMENTAL", incRecurrence,
firstRun, details, backupNode, resultsCh, ex, txn,
); err != nil {
if err != nil {
return err
}
// Incremental is paused until FULL completes.
inc.Pause("Waiting for initial backup to complete")

if err := inc.Create(ctx, ex, txn); err != nil {
return err
}
if err := emitSchedule(inc, tree.AsString(backupNode), resultsCh); err != nil {
return err
}
unpauseOnSuccessID = inc.ScheduleID()
}

return nil
})
// Create FULL backup schedule.
backupNode.AppendToLatest = false
fullBackupStmt := tree.AsString(backupNode)
full, err := makeBackupSchedule(
env, p.User(), fullScheduleName,
fullRecurrence, details, unpauseOnSuccessID, backupNode)
if err != nil {
return err
}

if firstRun != nil {
full.SetNextRun(*firstRun)
} else if eval.isEnterpriseUser && fullRecurrencePicked {
// The enterprise user did not indicate preference when to run full backups,
// and we picked the schedule ourselves.
// Run full backup immediately so that we do not wind up waiting for a long
// time before the first full backup runs. Without full backup, we can't
// execute incremental.
full.SetNextRun(env.Now())
}

// Create the schedule (we need its ID to create incremental below).
if err := full.Create(ctx, ex, txn); err != nil {
return err
}
return emitSchedule(full, fullBackupStmt, resultsCh)
})
}

func createBackupSchedule(
ctx context.Context,
func makeBackupSchedule(
env scheduledjobs.JobSchedulerEnv,
owner string,
name string,
recurrence *scheduleRecurrence,
firstRun *time.Time,
details jobspb.ScheduleDetails,
unpauseOnSuccess int64,
backupNode *tree.Backup,
resultsCh chan<- tree.Datums,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
) (*jobs.ScheduledJob, error) {
sj := jobs.NewScheduledJob(env)
sj.SetScheduleName(name)
sj.SetOwner(owner)

// Prepare arguments for scheduled backup execution.
args := &ScheduledBackupExecutionArgs{}
args := &ScheduledBackupExecutionArgs{UnpauseOnSuccess: unpauseOnSuccess}
if backupNode.AppendToLatest {
args.BackupType = ScheduledBackupExecutionArgs_INCREMENTAL
} else {
args.BackupType = ScheduledBackupExecutionArgs_FULL
}

if err := sj.SetSchedule(recurrence.cron); err != nil {
return err
return nil, err
}

sj.SetScheduleDetails(details)
if firstRun != nil {
sj.SetNextRun(*firstRun)
}

// TODO(yevgeniy): Validate backup schedule:
// * Verify targets exist. Provide a way for user to override this via option.
Expand All @@ -398,21 +394,25 @@ func createBackupSchedule(
args.BackupStatement = tree.AsString(backupNode)
any, err := pbtypes.MarshalAny(args)
if err != nil {
return err
return nil, err
}
sj.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: any},
)

// Create the schedule.
if err := sj.Create(ctx, ex, txn); err != nil {
return err
}
return sj, nil
}

func emitSchedule(sj *jobs.ScheduledJob, backupStmt string, resultsCh chan<- tree.Datums) error {
var nextRun tree.Datum
status := "ACTIVE"
if sj.IsPaused() {
nextRun = tree.DNull
status = "PAUSED"
if reason := sj.LastChangeReason(); reason != "" {
status += ": " + reason
}
} else {
next, err := tree.MakeDTimestampTZ(sj.NextRun(), time.Microsecond)
if err != nil {
Expand All @@ -423,10 +423,11 @@ func createBackupSchedule(

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(sj.ScheduleID())),
tree.NewDString(name),
tree.NewDString(sj.ScheduleName()),
tree.NewDString(status),
nextRun,
tree.NewDString(recurrence.Humanize()),
tree.NewDString(tree.AsString(backupNode)),
tree.NewDString(sj.ScheduleExpr()),
tree.NewDString(backupStmt),
}
return nil
}
Expand Down Expand Up @@ -507,8 +508,9 @@ func makeScheduledBackupEval(
var scheduledBackupHeader = sqlbase.ResultColumns{
{Name: "schedule_id", Typ: types.Int},
{Name: "name", Typ: types.String},
{Name: "next_run", Typ: types.TimestampTZ},
{Name: "frequency", Typ: types.String},
{Name: "status", Typ: types.String},
{Name: "first_run", Typ: types.TimestampTZ},
{Name: "schedule", Typ: types.String},
{Name: "backup_stmt", Typ: types.String},
}

Expand Down
Loading

0 comments on commit f04cc3c

Please sign in to comment.