Skip to content

Commit

Permalink
Schedule SkipImmediately
Browse files Browse the repository at this point in the history
Signed-off-by: Tiger Kaovilai <[email protected]>
  • Loading branch information
kaovilai committed Dec 1, 2023
1 parent b33d446 commit 70e1b0c
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 18 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7169-kaovilai
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `--skip-immediately` flag to server, install, and schedule commands.
16 changes: 16 additions & 0 deletions config/crd/v1/bases/velero.io_schedules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ spec:
description: Schedule is a Cron expression defining when to run the
Backup.
type: string
skipImmediately:
default: false
description: 'SkipImmediately specifies whether to skip backup if
schedule is due immediately from `schedule.status.lastBackup` timestamp
when schedule is unpaused or if schedule is new. If true, backup
will be skipped immediately when schedule is unpaused if it is due
based on .Status.LastBackupTimestamp or schedule is new, and will
run at next schedule time. If false, backup will not be skipped
immediately when schedule is unpaused, but will run at next schedule
time. If empty, will follow server configuration (default: false).'
type: boolean
template:
description: Template is the definition of the Backup to be run on
the provided schedule
Expand Down Expand Up @@ -549,6 +560,11 @@ spec:
format: date-time
nullable: true
type: string
lastSkipped:
description: LastSkipped is the last time a Schedule was skipped
format: date-time
nullable: true
type: string
phase:
description: Phase is the current phase of the Schedule
enum:
Expand Down
2 changes: 1 addition & 1 deletion config/crd/v1/crds/crds.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions pkg/apis/velero/v1/schedule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type ScheduleSpec struct {
// Paused specifies whether the schedule is paused or not
// +optional
Paused bool `json:"paused,omitempty"`

// SkipImmediately specifies whether to skip backup if schedule is due immediately from `schedule.status.lastBackup` timestamp when schedule is unpaused or if schedule is new.
// If true, backup will be skipped immediately when schedule is unpaused if it is due based on .Status.LastBackupTimestamp or schedule is new, and will run at next schedule time.
// If false, backup will not be skipped immediately when schedule is unpaused, but will run at next schedule time.
// If empty, will follow server configuration (default: false).
// +optional
SkipImmediately *bool `json:"skipImmediately,omitempty"`
}

// SchedulePhase is a string representation of the lifecycle phase
Expand Down Expand Up @@ -75,6 +82,11 @@ type ScheduleStatus struct {
// +nullable
LastBackup *metav1.Time `json:"lastBackup,omitempty"`

// LastSkipped is the last time a Schedule was skipped
// +optional
// +nullable
LastSkipped *metav1.Time `json:"lastSkipped,omitempty"`

// ValidationErrors is a slice of all validation errors (if
// applicable)
// +optional
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/velero/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/builder/schedule_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,9 @@ func (b *ScheduleBuilder) Template(spec velerov1api.BackupSpec) *ScheduleBuilder
b.object.Spec.Template = spec
return b
}

// SkipImmediately sets the Schedule's SkipImmediately.
func (b *ScheduleBuilder) SkipImmediately(skip *bool) *ScheduleBuilder {
b.object.Spec.SkipImmediately = skip
return b
}
3 changes: 3 additions & 0 deletions pkg/cmd/cli/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ example: "@every 2h30m".`,

type CreateOptions struct {
BackupOptions *backup.CreateOptions
SkipOptions *SkipOptions
Schedule string
UseOwnerReferencesInBackup bool
Paused bool
Expand All @@ -95,6 +96,7 @@ func NewCreateOptions() *CreateOptions {

func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
o.BackupOptions.BindFlags(flags)
o.SkipOptions.BindFlags(flags)
flags.StringVar(&o.Schedule, "schedule", o.Schedule, "A cron expression specifying a recurring schedule for this backup to run")
flags.BoolVar(&o.UseOwnerReferencesInBackup, "use-owner-references-in-backup", o.UseOwnerReferencesInBackup, "Specifies whether to use OwnerReferences on backups created by this Schedule. Notice: if set to true, when schedule is deleted, backups will be deleted too.")
flags.BoolVar(&o.Paused, "paused", o.Paused, "Specifies whether the newly created schedule is paused or not.")
Expand Down Expand Up @@ -160,6 +162,7 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
Schedule: o.Schedule,
UseOwnerReferencesInBackup: &o.UseOwnerReferencesInBackup,
Paused: o.Paused,
SkipImmediately: o.SkipOptions.SkipImmediately.Value,
},
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/cmd/cli/schedule/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
Expand All @@ -36,6 +37,7 @@ import (
// NewPauseCommand creates the command for pause
func NewPauseCommand(f client.Factory, use string) *cobra.Command {
o := cli.NewSelectOptions("pause", "schedule")
pauseOpts := NewPauseOptions()

c := &cobra.Command{
Use: use,
Expand All @@ -54,16 +56,31 @@ func NewPauseCommand(f client.Factory, use string) *cobra.Command {
Run: func(c *cobra.Command, args []string) {
cmd.CheckError(o.Complete(args))
cmd.CheckError(o.Validate())
cmd.CheckError(runPause(f, o, true))
cmd.CheckError(runPause(f, o, true, pauseOpts.SkipOptions.SkipImmediately.Value))
},
}

o.BindFlags(c.Flags())
pauseOpts.BindFlags(c.Flags())

return c
}

func runPause(f client.Factory, o *cli.SelectOptions, paused bool) error {
type PauseOptions struct {
SkipOptions *SkipOptions
}

func NewPauseOptions() *PauseOptions {
return &PauseOptions{
SkipOptions: NewSkipOptions(),
}
}

func (o *PauseOptions) BindFlags(flags *pflag.FlagSet) {
o.SkipOptions.BindFlags(flags)
}

func runPause(f client.Factory, o *cli.SelectOptions, paused bool, skipImmediately *bool) error {
crClient, err := f.KubebuilderClient()
if err != nil {
return err
Expand Down Expand Up @@ -120,6 +137,7 @@ func runPause(f client.Factory, o *cli.SelectOptions, paused bool) error {
continue
}
schedule.Spec.Paused = paused
schedule.Spec.SkipImmediately = skipImmediately
if err := crClient.Update(context.TODO(), schedule); err != nil {
return errors.Wrapf(err, "failed to update schedule %s", schedule.Name)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/cmd/cli/schedule/skip_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package schedule

import (
"github.com/spf13/pflag"
"github.com/vmware-tanzu/velero/pkg/cmd/util/flag"
)

type SkipOptions struct {
SkipImmediately flag.OptionalBool
}

func NewSkipOptions() *SkipOptions {
return &SkipOptions{}
}

func (o *SkipOptions) BindFlags(flags *pflag.FlagSet) {
f := flags.VarPF(&o.SkipImmediately, "skip-immediately", "", "Skip the next scheduled backup immediately")
f.NoOptDefVal = "" // default to nil so server options can take precedence
}
5 changes: 3 additions & 2 deletions pkg/cmd/cli/schedule/unpause.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// NewUnpauseCommand creates the command for unpause
func NewUnpauseCommand(f client.Factory, use string) *cobra.Command {
o := cli.NewSelectOptions("pause", "schedule")

pauseOpts := NewPauseOptions()
c := &cobra.Command{
Use: use,
Short: "Unpause schedules",
Expand All @@ -45,11 +45,12 @@ func NewUnpauseCommand(f client.Factory, use string) *cobra.Command {
Run: func(c *cobra.Command, args []string) {
cmd.CheckError(o.Complete(args))
cmd.CheckError(o.Validate())
cmd.CheckError(runPause(f, o, false))
cmd.CheckError(runPause(f, o, false, pauseOpts.SkipOptions.SkipImmediately.Value))
},
}

o.BindFlags(c.Flags())
pauseOpts.BindFlags(c.Flags())

return c
}
5 changes: 4 additions & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type serverConfig struct {
maxConcurrentK8SConnections int
defaultSnapshotMoveData bool
disableInformerCache bool
scheduleSkipImmediately bool
}

func NewCommand(f client.Factory) *cobra.Command {
Expand Down Expand Up @@ -163,6 +164,7 @@ func NewCommand(f client.Factory) *cobra.Command {
maxConcurrentK8SConnections: defaultMaxConcurrentK8SConnections,
defaultSnapshotMoveData: false,
disableInformerCache: defaultDisableInformerCache,
scheduleSkipImmediately: false,
}
)

Expand Down Expand Up @@ -235,6 +237,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().IntVar(&config.maxConcurrentK8SConnections, "max-concurrent-k8s-connections", config.maxConcurrentK8SConnections, "Max concurrent connections number that Velero can create with kube-apiserver. Default is 30.")
command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.")
command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).")
command.Flags().BoolVar(&config.scheduleSkipImmediately, "schedule-skip-immediately", config.scheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).")

return command
}
Expand Down Expand Up @@ -915,7 +918,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}

if _, ok := enabledRuntimeControllers[controller.Schedule]; ok {
if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil {
if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics, s.config.scheduleSkipImmediately).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
}
}
Expand Down
33 changes: 25 additions & 8 deletions pkg/controller/schedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ type scheduleReconciler struct {
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
metrics *metrics.ServerMetrics
skipImmediately bool
}

func NewScheduleReconciler(
namespace string,
logger logrus.FieldLogger,
client client.Client,
metrics *metrics.ServerMetrics,
skipImmediately bool,
) *scheduleReconciler {
return &scheduleReconciler{
Client: client,
namespace: namespace,
logger: logger,
clock: clocks.RealClock{},
metrics: metrics,
skipImmediately: skipImmediately,
}
}

Expand Down Expand Up @@ -99,11 +102,18 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
return ctrl.Result{}, errors.Wrapf(err, "error getting schedule %s", req.String())
}

c.metrics.InitSchedule(schedule.Name)

original := schedule.DeepCopy()

if schedule.Spec.SkipImmediately == nil {
schedule.Spec.SkipImmediately = &c.skipImmediately
}
if schedule.Spec.SkipImmediately != nil && *schedule.Spec.SkipImmediately {
*schedule.Spec.SkipImmediately = false
schedule.Status.LastSkipped = &metav1.Time{Time: c.clock.Now()}
}

// validation - even if the item is Enabled, we can't trust it
// so re-validate
currentPhase := schedule.Status.Phase
Expand All @@ -117,7 +127,9 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

// update status if it's changed
if currentPhase != schedule.Status.Phase {
if currentPhase != schedule.Status.Phase ||
original.Status.LastSkipped != schedule.Status.LastSkipped ||
original.Status.LastBackup != schedule.Status.LastBackup {
if err := c.Patch(ctx, schedule, client.MergeFrom(original)); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error updating phase of schedule %s to %s", req.String(), schedule.Status.Phase)
}
Expand All @@ -132,13 +144,15 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// If there are backup created by this schedule still in New or InProgress state,
// skip current backup creation to avoid running overlap backups.
// As the schedule must be validated before checking whether it's due, we cannot put the checking log in Predicate
if c.ifDue(schedule, cronSchedule) && !c.checkIfBackupInNewOrProgress(schedule) {
due, nextRunTime := c.ifDue(schedule, cronSchedule)
durationTillNextRun := nextRunTime.Sub(c.clock.Now())
if due && !c.checkIfBackupInNewOrProgress(schedule) {
if err := c.submitBackup(ctx, schedule); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error submit backup for schedule %s", req.String())
return ctrl.Result{RequeueAfter: durationTillNextRun}, errors.Wrapf(err, "error submit backup for schedule %s", req.String())
}
}

return ctrl.Result{}, nil
return ctrl.Result{RequeueAfter: durationTillNextRun}, nil
}

func parseCronSchedule(itm *velerov1.Schedule, logger logrus.FieldLogger) (cron.Schedule, []string) {
Expand Down Expand Up @@ -208,16 +222,16 @@ func (c *scheduleReconciler) checkIfBackupInNewOrProgress(schedule *velerov1.Sch
}

// ifDue check whether schedule is due to create a new backup.
func (c *scheduleReconciler) ifDue(schedule *velerov1.Schedule, cronSchedule cron.Schedule) bool {
func (c *scheduleReconciler) ifDue(schedule *velerov1.Schedule, cronSchedule cron.Schedule) (bool, time.Time) {
isDue, nextRunTime := getNextRunTime(schedule, cronSchedule, c.clock.Now())
log := c.logger.WithField("schedule", kube.NamespaceAndName(schedule))

if !isDue {
log.WithField("nextRunTime", nextRunTime).Debug("Schedule is not due, skipping")
return false
return false, nextRunTime
}

return true
return true, nextRunTime
}

// submitBackup create a backup from schedule.
Expand Down Expand Up @@ -249,6 +263,9 @@ func getNextRunTime(schedule *velerov1.Schedule, cronSchedule cron.Schedule, asO
} else {
lastBackupTime = schedule.CreationTimestamp.Time
}
if schedule.Status.LastSkipped != nil && schedule.Status.LastSkipped.After(lastBackupTime) {
lastBackupTime = schedule.Status.LastSkipped.Time
}

nextRunTime := cronSchedule.Next(lastBackupTime)

Expand Down
Loading

0 comments on commit 70e1b0c

Please sign in to comment.