Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync controller: replace revision file with full diff each interval #1892

Merged
merged 6 commits into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/1892-skriss
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
backup sync controller: stop using `metadata/revision` file, do a full diff of bucket contents vs. cluster contents each sync interval
15 changes: 12 additions & 3 deletions pkg/apis/velero/v1/backup_storage_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,28 @@ const (
)

// TODO(2.0): remove the AccessMode field from BackupStorageLocationStatus.
// TODO(2.0): remove the LastSyncedRevision field from BackupStorageLocationStatus.

// BackupStorageLocationStatus describes the current status of a Velero BackupStorageLocation.
type BackupStorageLocationStatus struct {
// Phase is the current state of the BackupStorageLocation.
// +optional
Phase BackupStorageLocationPhase `json:"phase,omitempty"`

// +optional
LastSyncedRevision types.UID `json:"lastSyncedRevision,omitempty"`

// LastSyncedTime is the last time the contents of the location were synced into
// the cluster.
// +optional
// +nullable
LastSyncedTime metav1.Time `json:"lastSyncedTime,omitempty"`

// LastSyncedRevision is the value of the `metadata/revision` file in the backup
// storage location the last time the BSL's contents were synced into the cluster.
//
// Deprecated: this field is no longer updated or used for detecting changes to
// the location's contents and will be removed entirely in v2.0.
// +optional
LastSyncedRevision types.UID `json:"lastSyncedRevision,omitempty"`

// AccessMode is an unused field.
//
// Deprecated: there is now an AccessMode field on the Spec and this field
Expand Down
113 changes: 36 additions & 77 deletions pkg/controller/backup_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
kuberrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -66,10 +65,10 @@ func NewBackupSyncController(
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
logger logrus.FieldLogger,
) Interface {
if syncPeriod < time.Minute {
logger.Infof("Provided backup sync period %v is too short. Setting to 1 minute", syncPeriod)
if syncPeriod <= 0 {
nrb marked this conversation as resolved.
Show resolved Hide resolved
syncPeriod = time.Minute
}
logger.Infof("Backup sync period is %v", syncPeriod)

c := &backupSyncController{
genericController: newGenericController("backup-sync", logger),
Expand Down Expand Up @@ -98,33 +97,6 @@ func NewBackupSyncController(
return c
}

func shouldSync(location *velerov1api.BackupStorageLocation, now time.Time, backupStore persistence.BackupStore, log logrus.FieldLogger) (bool, string) {
log = log.WithFields(map[string]interface{}{
"lastSyncedRevision": location.Status.LastSyncedRevision,
"lastSyncedTime": location.Status.LastSyncedTime.Time.Format(time.RFC1123Z),
})

revision, err := backupStore.GetRevision()
if err != nil {
log.WithError(err).Debugf("Unable to get backup store's revision file, syncing (this is not an error if a v0.10+ backup has not yet been taken into this location)")
return true, ""
}
log = log.WithField("revision", revision)

if location.Status.LastSyncedTime.Add(time.Hour).Before(now) {
log.Debugf("Backup location hasn't been synced in more than %s, syncing", time.Hour)
return true, revision
}

if string(location.Status.LastSyncedRevision) != revision {
log.Debugf("Backup location hasn't been synced since its last modification, syncing")
return true, revision
}

log.Debugf("Backup location's contents haven't changed since last sync, not syncing")
return false, ""
}

// orderedBackupLocations returns a new slice with the default backup location first (if it exists),
// followed by the rest of the locations in no particular order.
func orderedBackupLocations(locations []*velerov1api.BackupStorageLocation, defaultLocationName string) []*velerov1api.BackupStorageLocation {
Expand Down Expand Up @@ -162,44 +134,50 @@ func (c *backupSyncController) run() {

for _, location := range locations {
log := c.logger.WithField("backupLocation", location.Name)
log.Debug("Checking backup location for backups to sync into cluster")

backupStore, err := c.newBackupStore(location, pluginManager, log)
if err != nil {
log.WithError(err).Error("Error getting backup store for this location")
continue
}

ok, revision := shouldSync(location, time.Now().UTC(), backupStore, log)
if !ok {
continue
}
log.Info("Syncing contents of backup store into cluster")

// get a list of all the backups that are stored in the backup storage location
res, err := backupStore.ListBackups()
if err != nil {
log.WithError(err).Error("Error listing backups in backup store")
continue
}
backupStoreBackups := sets.NewString(res...)
log.WithField("backupCount", len(backupStoreBackups)).Info("Got backups from backup store")
log.WithField("backupCount", len(backupStoreBackups)).Debug("Got backups from backup store")

for backupName := range backupStoreBackups {
log = log.WithField("backup", backupName)
log.Debug("Checking this backup to see if it needs to be synced into the cluster")
// get a list of all the backups that exist as custom resources in the cluster
clusterBackups, err := c.backupLister.Backups(c.namespace).List(labels.Everything())
skriss marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting backups from cluster, proceeding with sync into cluster")
} else {
log.WithField("backupCount", len(clusterBackups)).Debug("Got backups from cluster")
}

// use the controller's namespace when getting the backup because that's where we
// are syncing backups to, regardless of the namespace of the cloud backup.
backup, err := c.backupClient.Backups(c.namespace).Get(backupName, metav1.GetOptions{})
if err == nil {
log.Debug("Backup already exists in cluster")
continue
}
// get a list of backups that *are* in the backup storage location and *aren't* in the cluster
clusterBackupsSet := sets.NewString()
for _, b := range clusterBackups {
clusterBackupsSet.Insert(b.Name)
}
backupsToSync := backupStoreBackups.Difference(clusterBackupsSet)

if !kuberrs.IsNotFound(err) {
log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster")
}
if count := backupsToSync.Len(); count > 0 {
log.Infof("Found %v backups in the backup location that do not exist in the cluster and need to be synced", count)
} else {
log.Debug("No backups found in the backup location that need to be synced into the cluster")
}

// sync each backup
for backupName := range backupsToSync {
log = log.WithField("backup", backupName)
log.Info("Attempting to sync backup into cluster")

backup, err = backupStore.GetBackupMetadata(backupName)
backup, err := backupStore.GetBackupMetadata(backupName)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store")
continue
Expand All @@ -216,7 +194,8 @@ func (c *backupSyncController) run() {
backup.Labels = make(map[string]string)
}
backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation)
// process the regular velero backup

// attempt to create backup custom resource via API
backup, err = c.backupClient.Backups(backup.Namespace).Create(backup)
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
Expand All @@ -226,7 +205,7 @@ func (c *backupSyncController) run() {
log.WithError(errors.WithStack(err)).Error("Error syncing backup into cluster")
continue
default:
log.Debug("Synced backup into cluster")
log.Info("Successfully synced backup into cluster")
}

// process the pod volume backups from object store, if any
Expand All @@ -237,7 +216,7 @@ func (c *backupSyncController) run() {
}

for _, podVolumeBackup := range podVolumeBackups {
log = log.WithField("podVolumeBackup", podVolumeBackup.Name)
log := log.WithField("podVolumeBackup", podVolumeBackup.Name)
log.Debug("Checking this pod volume backup to see if it needs to be synced into the cluster")

for i, ownerRef := range podVolumeBackup.OwnerReferences {
Expand Down Expand Up @@ -270,11 +249,10 @@ func (c *backupSyncController) run() {

c.deleteOrphanedBackups(location.Name, backupStoreBackups, log)

// update the location's status's last-synced fields
// update the location's last-synced time field
patch := map[string]interface{}{
"status": map[string]interface{}{
"lastSyncedTime": time.Now().UTC(),
"lastSyncedRevision": revision,
"lastSyncedTime": time.Now().UTC(),
},
}

Expand All @@ -289,31 +267,12 @@ func (c *backupSyncController) run() {
types.MergePatchType,
patchBytes,
); err != nil {
log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time and revision")
log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time")
continue
}
}
}

func patchStorageLocation(backup *velerov1api.Backup, client velerov1client.BackupInterface, location string) error {
patch := map[string]interface{}{
"spec": map[string]interface{}{
"storageLocation": location,
},
}

patchBytes, err := json.Marshal(patch)
if err != nil {
return errors.WithStack(err)
}

if _, err := client.Patch(backup.Name, types.MergePatchType, patchBytes); err != nil {
return errors.WithStack(err)
}

return nil
}

// deleteOrphanedBackups deletes backup objects (CRDs) from Kubernetes that have the specified location
// and a phase of Completed, but no corresponding backup in object storage.
func (c *backupSyncController) deleteOrphanedBackups(locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) {
Expand Down
94 changes: 0 additions & 94 deletions pkg/controller/backup_sync_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
core "k8s.io/client-go/testing"
Expand Down Expand Up @@ -370,8 +367,6 @@ func TestBackupSyncControllerRun(t *testing.T) {
backupStore, ok := backupStores[location.Name]
require.True(t, ok, "no mock backup store for location %s", location.Name)

backupStore.On("GetRevision").Return("foo", nil)

var backupNames []string
for _, bucket := range test.cloudBuckets[location.Spec.ObjectStorage.Bucket] {
backupNames = append(backupNames, bucket.backup.Name)
Expand Down Expand Up @@ -706,95 +701,6 @@ func TestStorageLabelsInDeleteOrphanedBackups(t *testing.T) {
}
}

func TestShouldSync(t *testing.T) {
c := clock.NewFakeClock(time.Now())

tests := []struct {
name string
location *velerov1api.BackupStorageLocation
backupStoreRevision string
now time.Time
expectSync bool
expectedRevision string
}{
{
name: "BSL with no last-synced metadata should sync",
location: &velerov1api.BackupStorageLocation{},
backupStoreRevision: "foo",
now: c.Now(),
expectSync: true,
expectedRevision: "foo",
},
{
name: "BSL with unchanged revision last synced more than an hour ago should sync",
location: &velerov1api.BackupStorageLocation{
Status: velerov1api.BackupStorageLocationStatus{
LastSyncedRevision: types.UID("foo"),
LastSyncedTime: metav1.Time{Time: c.Now().Add(-61 * time.Minute)},
},
},
backupStoreRevision: "foo",
now: c.Now(),
expectSync: true,
expectedRevision: "foo",
},
{
name: "BSL with unchanged revision last synced less than an hour ago should not sync",
location: &velerov1api.BackupStorageLocation{
Status: velerov1api.BackupStorageLocationStatus{
LastSyncedRevision: types.UID("foo"),
LastSyncedTime: metav1.Time{Time: c.Now().Add(-59 * time.Minute)},
},
},
backupStoreRevision: "foo",
now: c.Now(),
expectSync: false,
},
{
name: "BSL with different revision than backup store last synced less than an hour ago should sync",
location: &velerov1api.BackupStorageLocation{
Status: velerov1api.BackupStorageLocationStatus{
LastSyncedRevision: types.UID("foo"),
LastSyncedTime: metav1.Time{Time: c.Now().Add(-time.Minute)},
},
},
backupStoreRevision: "bar",
now: c.Now(),
expectSync: true,
expectedRevision: "bar",
},
{
name: "BSL with different revision than backup store last synced more than an hour ago should sync",
location: &velerov1api.BackupStorageLocation{
Status: velerov1api.BackupStorageLocationStatus{
LastSyncedRevision: types.UID("foo"),
LastSyncedTime: metav1.Time{Time: c.Now().Add(-61 * time.Minute)},
},
},
backupStoreRevision: "bar",
now: c.Now(),
expectSync: true,
expectedRevision: "bar",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
backupStore := new(persistencemocks.BackupStore)
if test.backupStoreRevision != "" {
backupStore.On("GetRevision").Return(test.backupStoreRevision, nil)
} else {
backupStore.On("GetRevision").Return("", errors.New("object revision not found"))
}

shouldSync, rev := shouldSync(test.location, test.now, backupStore, velerotest.NewLogger())
assert.Equal(t, test.expectSync, shouldSync)
assert.Equal(t, test.expectedRevision, rev)
})
}

}

func getDeleteActions(actions []core.Action) []core.Action {
var deleteActions []core.Action
for _, action := range actions {
Expand Down
Loading