diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index b67cc7d00ef..f4d55ed4ea1 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -23,7 +23,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" kuberrs "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -66,10 +65,10 @@ func NewBackupSyncController( newPluginManager func(logrus.FieldLogger) clientmgmt.Manager, logger logrus.FieldLogger, ) Interface { - if syncPeriod < time.Minute { - logger.Infof("Provided backup sync period %v is too short. Setting to 1 minute", syncPeriod) + if syncPeriod <= 0 { syncPeriod = time.Minute } + logger.Infof("Backup sync period is %v", syncPeriod) c := &backupSyncController{ genericController: newGenericController("backup-sync", logger), @@ -98,33 +97,6 @@ func NewBackupSyncController( return c } -func shouldSync(location *velerov1api.BackupStorageLocation, now time.Time, backupStore persistence.BackupStore, log logrus.FieldLogger) (bool, string) { - log = log.WithFields(map[string]interface{}{ - "lastSyncedRevision": location.Status.LastSyncedRevision, - "lastSyncedTime": location.Status.LastSyncedTime.Time.Format(time.RFC1123Z), - }) - - revision, err := backupStore.GetRevision() - if err != nil { - log.WithError(err).Debugf("Unable to get backup store's revision file, syncing (this is not an error if a v0.10+ backup has not yet been taken into this location)") - return true, "" - } - log = log.WithField("revision", revision) - - if location.Status.LastSyncedTime.Add(time.Hour).Before(now) { - log.Debugf("Backup location hasn't been synced in more than %s, syncing", time.Hour) - return true, revision - } - - if string(location.Status.LastSyncedRevision) != revision { - log.Debugf("Backup location hasn't been synced since its last modification, syncing") - return true, revision - } - - log.Debugf("Backup location's contents haven't changed since last sync, not syncing") - return false, "" -} - // orderedBackupLocations returns a new slice with the default backup location first (if it exists), // followed by the rest of the locations in no particular order. func orderedBackupLocations(locations []*velerov1api.BackupStorageLocation, defaultLocationName string) []*velerov1api.BackupStorageLocation { @@ -162,6 +134,7 @@ func (c *backupSyncController) run() { for _, location := range locations { log := c.logger.WithField("backupLocation", location.Name) + log.Info("Attempting to sync contents of backup storage location into cluster") backupStore, err := c.newBackupStore(location, pluginManager, log) if err != nil { @@ -169,37 +142,36 @@ func (c *backupSyncController) run() { continue } - ok, revision := shouldSync(location, time.Now().UTC(), backupStore, log) - if !ok { - continue - } - log.Info("Syncing contents of backup store into cluster") - + // get a list of all the backups that are stored in the backup storage location res, err := backupStore.ListBackups() if err != nil { log.WithError(err).Error("Error listing backups in backup store") continue } backupStoreBackups := sets.NewString(res...) - log.WithField("backupCount", len(backupStoreBackups)).Info("Got backups from backup store") + log.WithField("backupCount", len(backupStoreBackups)).Debug("Got backups from backup store") - for backupName := range backupStoreBackups { - log = log.WithField("backup", backupName) - log.Debug("Checking this backup to see if it needs to be synced into the cluster") + // get a list of all the backups that exist as custom resources in the cluster + clusterBackups, err := c.backupLister.Backups(c.namespace).List(labels.Everything()) + if err != nil { + log.WithError(errors.WithStack(err)).Error("Error getting backups from cluster, proceeding with sync into cluster") + } else { + log.WithField("backupCount", len(clusterBackups)).Debug("Got backups from cluster") + } - // use the controller's namespace when getting the backup because that's where we - // are syncing backups to, regardless of the namespace of the cloud backup. - backup, err := c.backupClient.Backups(c.namespace).Get(backupName, metav1.GetOptions{}) - if err == nil { - log.Debug("Backup already exists in cluster") - continue - } + // get a list of backups that *are* in the backup storage location and *aren't* in the cluster + clusterBackupsSet := sets.NewString() + for _, b := range clusterBackups { + clusterBackupsSet.Insert(b.Name) + } + backupsToSync := backupStoreBackups.Difference(clusterBackupsSet) - if !kuberrs.IsNotFound(err) { - log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster") - } + // sync each backup + for backupName := range backupsToSync { + log = log.WithField("backup", backupName) + log.Info("Attempting to sync backup into cluster") - backup, err = backupStore.GetBackupMetadata(backupName) + backup, err := backupStore.GetBackupMetadata(backupName) if err != nil { log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store") continue @@ -216,7 +188,8 @@ func (c *backupSyncController) run() { backup.Labels = make(map[string]string) } backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation) - // process the regular velero backup + + // attempt to create backup custom resource via API backup, err = c.backupClient.Backups(backup.Namespace).Create(backup) switch { case err != nil && kuberrs.IsAlreadyExists(err): @@ -226,7 +199,7 @@ func (c *backupSyncController) run() { log.WithError(errors.WithStack(err)).Error("Error syncing backup into cluster") continue default: - log.Debug("Synced backup into cluster") + log.Info("Successfully synced backup into cluster") } // process the pod volume backups from object store, if any @@ -270,11 +243,10 @@ func (c *backupSyncController) run() { c.deleteOrphanedBackups(location.Name, backupStoreBackups, log) - // update the location's status's last-synced fields + // update the location's last-synced time field patch := map[string]interface{}{ "status": map[string]interface{}{ - "lastSyncedTime": time.Now().UTC(), - "lastSyncedRevision": revision, + "lastSyncedTime": time.Now().UTC(), }, } @@ -289,31 +261,12 @@ func (c *backupSyncController) run() { types.MergePatchType, patchBytes, ); err != nil { - log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time and revision") + log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time") continue } } } -func patchStorageLocation(backup *velerov1api.Backup, client velerov1client.BackupInterface, location string) error { - patch := map[string]interface{}{ - "spec": map[string]interface{}{ - "storageLocation": location, - }, - } - - patchBytes, err := json.Marshal(patch) - if err != nil { - return errors.WithStack(err) - } - - if _, err := client.Patch(backup.Name, types.MergePatchType, patchBytes); err != nil { - return errors.WithStack(err) - } - - return nil -} - // deleteOrphanedBackups deletes backup objects (CRDs) from Kubernetes that have the specified location // and a phase of Completed, but no corresponding backup in object storage. func (c *backupSyncController) deleteOrphanedBackups(locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) { diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index e1bc9e06667..cceb580f18d 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -20,13 +20,10 @@ import ( "testing" "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" core "k8s.io/client-go/testing" @@ -706,95 +703,6 @@ func TestStorageLabelsInDeleteOrphanedBackups(t *testing.T) { } } -func TestShouldSync(t *testing.T) { - c := clock.NewFakeClock(time.Now()) - - tests := []struct { - name string - location *velerov1api.BackupStorageLocation - backupStoreRevision string - now time.Time - expectSync bool - expectedRevision string - }{ - { - name: "BSL with no last-synced metadata should sync", - location: &velerov1api.BackupStorageLocation{}, - backupStoreRevision: "foo", - now: c.Now(), - expectSync: true, - expectedRevision: "foo", - }, - { - name: "BSL with unchanged revision last synced more than an hour ago should sync", - location: &velerov1api.BackupStorageLocation{ - Status: velerov1api.BackupStorageLocationStatus{ - LastSyncedRevision: types.UID("foo"), - LastSyncedTime: metav1.Time{Time: c.Now().Add(-61 * time.Minute)}, - }, - }, - backupStoreRevision: "foo", - now: c.Now(), - expectSync: true, - expectedRevision: "foo", - }, - { - name: "BSL with unchanged revision last synced less than an hour ago should not sync", - location: &velerov1api.BackupStorageLocation{ - Status: velerov1api.BackupStorageLocationStatus{ - LastSyncedRevision: types.UID("foo"), - LastSyncedTime: metav1.Time{Time: c.Now().Add(-59 * time.Minute)}, - }, - }, - backupStoreRevision: "foo", - now: c.Now(), - expectSync: false, - }, - { - name: "BSL with different revision than backup store last synced less than an hour ago should sync", - location: &velerov1api.BackupStorageLocation{ - Status: velerov1api.BackupStorageLocationStatus{ - LastSyncedRevision: types.UID("foo"), - LastSyncedTime: metav1.Time{Time: c.Now().Add(-time.Minute)}, - }, - }, - backupStoreRevision: "bar", - now: c.Now(), - expectSync: true, - expectedRevision: "bar", - }, - { - name: "BSL with different revision than backup store last synced more than an hour ago should sync", - location: &velerov1api.BackupStorageLocation{ - Status: velerov1api.BackupStorageLocationStatus{ - LastSyncedRevision: types.UID("foo"), - LastSyncedTime: metav1.Time{Time: c.Now().Add(-61 * time.Minute)}, - }, - }, - backupStoreRevision: "bar", - now: c.Now(), - expectSync: true, - expectedRevision: "bar", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - backupStore := new(persistencemocks.BackupStore) - if test.backupStoreRevision != "" { - backupStore.On("GetRevision").Return(test.backupStoreRevision, nil) - } else { - backupStore.On("GetRevision").Return("", errors.New("object revision not found")) - } - - shouldSync, rev := shouldSync(test.location, test.now, backupStore, velerotest.NewLogger()) - assert.Equal(t, test.expectSync, shouldSync) - assert.Equal(t, test.expectedRevision, rev) - }) - } - -} - func getDeleteActions(actions []core.Action) []core.Action { var deleteActions []core.Action for _, action := range actions {