Skip to content

Commit

Permalink
bugfixes for BIAv2 controller work
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <[email protected]>
  • Loading branch information
sseago committed Mar 1, 2023
1 parent 2c42396 commit 206df46
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 183 deletions.
14 changes: 14 additions & 0 deletions design/biav2-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message ExecuteResponse {
bytes item = 1;
repeated generated.ResourceIdentifier additionalItems = 2;
string operationID = 3;
repeated generated.ResourceIdentifier itemsToUpdate = 4;
}
```
The BackupItemAction service gets two new rpc methods:
Expand Down Expand Up @@ -78,6 +79,19 @@ message OperationProgress {
}
```

In addition to the two new rpc methods added to the BackupItemAction interface, there is also a new `Name()` method. This one is only actually used internally by Velero to get the name that the plugin was registered with, but it still must be defined in a plugin which implements BackupItemActionV2 in order to implement the interface. It doesn't really matter what it returns, though, as this particular method is not delegated to the plugin via RPC calls. The new (and modified) interface methods for `BackupItemAction` are as follows:
```
type BackupItemAction interface {
...
Name() string
...
Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, []velero.ResourceIdentifier, error)
Progress(operationID string, backup *api.Backup) (velero.OperationProgress, error)
Cancel(operationID string, backup *api.Backup) error
...
}
```

A new PluginKind, `BackupItemActionV2`, will be created, and the backup process will be modified to use this plugin kind.
See [Plugin Versioning](plugin-versioning.md) for more details on implementation plans, including v1 adapters, etc.

Expand Down
211 changes: 133 additions & 78 deletions design/general-progress-monitoring.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion internal/delete/delete_item_action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func InvokeDeleteActions(ctx *Context) error {

// Process individual items from the backup
for _, item := range items {
itemPath := archive.GetItemFilePath(dir, resource, namespace, item, "")
itemPath := archive.GetItemFilePath(dir, resource, namespace, item)

// obj is the Unstructured item from the backup
obj, err := archive.Unmarshal(ctx.Filesystem, itemPath)
Expand Down
7 changes: 6 additions & 1 deletion pkg/archive/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ import (
)

// GetItemFilePath returns an item's file path once extracted from a Velero backup archive.
func GetItemFilePath(rootDir, groupResource, namespace, name, versionPath string) string {
func GetItemFilePath(rootDir, groupResource, namespace, name string) string {
return GetVersionedItemFilePath(rootDir, groupResource, namespace, name, "")
}

// GetVersionedItemFilePath returns an item's file path once extracted from a Velero backup archive, with version included.
func GetVersionedItemFilePath(rootDir, groupResource, namespace, name, versionPath string) string {
return filepath.Join(rootDir, velerov1api.ResourcesDir, groupResource, versionPath, GetScopeDir(namespace), namespace, name+".json")
}

Expand Down
19 changes: 14 additions & 5 deletions pkg/archive/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,27 @@ import (
)

func TestGetItemFilePath(t *testing.T) {
res := GetItemFilePath("root", "resource", "", "item", "")
res := GetItemFilePath("root", "resource", "", "item")
assert.Equal(t, "root/resources/resource/cluster/item.json", res)

res = GetItemFilePath("root", "resource", "namespace", "item", "")
res = GetItemFilePath("root", "resource", "namespace", "item")
assert.Equal(t, "root/resources/resource/namespaces/namespace/item.json", res)

res = GetItemFilePath("root", "resource", "namespace", "item", "v1")
res = GetItemFilePath("", "resource", "", "item")
assert.Equal(t, "resources/resource/cluster/item.json", res)

res = GetVersionedItemFilePath("root", "resource", "", "item", "")
assert.Equal(t, "root/resources/resource/cluster/item.json", res)

res = GetVersionedItemFilePath("root", "resource", "namespace", "item", "")
assert.Equal(t, "root/resources/resource/namespaces/namespace/item.json", res)

res = GetVersionedItemFilePath("root", "resource", "namespace", "item", "v1")
assert.Equal(t, "root/resources/resource/v1/namespaces/namespace/item.json", res)

res = GetItemFilePath("root", "resource", "", "item", "v1")
res = GetVersionedItemFilePath("root", "resource", "", "item", "v1")
assert.Equal(t, "root/resources/resource/v1/cluster/item.json", res)

res = GetItemFilePath("", "resource", "", "item", "")
res = GetVersionedItemFilePath("", "resource", "", "item", "")
assert.Equal(t, "resources/resource/cluster/item.json", res)
}
2 changes: 1 addition & 1 deletion pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
}

func getFileForArchive(namespace, name, groupResource, versionPath string, itemBytes []byte) FileForArchive {
filePath := archive.GetItemFilePath("", groupResource, namespace, name, versionPath)
filePath := archive.GetVersionedItemFilePath("", groupResource, namespace, name, versionPath)
hdr := &tar.Header{
Name: filePath,
Size: int64(len(itemBytes)),
Expand Down
24 changes: 8 additions & 16 deletions pkg/controller/async_backup_operations_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
Expand Down Expand Up @@ -133,7 +131,7 @@ func (m *BackupItemOperationsMap) UpdateForBackup(backupStore persistence.Backup
type asyncBackupOperationsReconciler struct {
client.Client
logger logrus.FieldLogger
clock clock.Clock
clock clocks.WithTickerAndDelayedExecution
frequency time.Duration
itemOperationsMap *BackupItemOperationsMap
newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager
Expand All @@ -152,7 +150,7 @@ func NewAsyncBackupOperationsReconciler(
abor := &asyncBackupOperationsReconciler{
Client: client,
logger: logger,
clock: clock.RealClock{},
clock: clocks.RealClock{},
frequency: frequency,
itemOperationsMap: &BackupItemOperationsMap{operations: make(map[string]*operationsForBackup)},
newPluginManager: newPluginManager,
Expand All @@ -168,17 +166,7 @@ func NewAsyncBackupOperationsReconciler(
func (c *asyncBackupOperationsReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.Backup{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
return false
},
DeleteFunc: func(de event.DeleteEvent) bool {
return false
},
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
})).
For(&velerov1api.Backup{}, builder.WithPredicates(kube.FalsePredicate{})).
Watches(s, nil).
Complete(c)
}
Expand Down Expand Up @@ -444,6 +432,10 @@ func getBackupItemOperationProgress(
operation.Status.OperationUnits = operationProgress.OperationUnits
changes = true
}
if operation.Status.Description != operationProgress.Description {
operation.Status.Description = operationProgress.Description
changes = true
}
started := metav1.NewTime(operationProgress.Started)
if operation.Status.Started == nil || *(operation.Status.Started) != started {
operation.Status.Started = &started
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/async_backup_operations_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
testclocks "k8s.io/utils/clock/testing"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -51,7 +51,7 @@ var (
bia = &biav2mocks.BackupItemAction{}
)

func mockAsyncBackupOperationsReconciler(fakeClient kbclient.Client, fakeClock *clock.FakeClock, freq time.Duration) (*asyncBackupOperationsReconciler, *BackupItemOperationsMap) {
func mockAsyncBackupOperationsReconciler(fakeClient kbclient.Client, fakeClock *testclocks.FakeClock, freq time.Duration) (*asyncBackupOperationsReconciler, *BackupItemOperationsMap) {
abor, biaMap := NewAsyncBackupOperationsReconciler(
logrus.StandardLogger(),
fakeClient,
Expand All @@ -65,7 +65,7 @@ func mockAsyncBackupOperationsReconciler(fakeClient kbclient.Client, fakeClock *
}

func TestAsyncBackupOperationsReconcile(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
fakeClock := testclocks.NewFakeClock(time.Now())
metav1Now := metav1.NewTime(fakeClock.Now())

defaultBackupLocation := builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "default").Result()
Expand Down
8 changes: 2 additions & 6 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,18 +766,14 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
case logCounter.GetCount(logrus.ErrorLevel) > 0:
if inProgressOperations {
backup.Status.Phase = velerov1api.BackupPhaseWaitingForPluginOperationsPartiallyFailed
} else if backup.Status.AsyncBackupItemOperationsAttempted > 0 {
backup.Status.Phase = velerov1api.BackupPhaseFinalizingAfterPluginOperationsPartiallyFailed
} else {
backup.Status.Phase = velerov1api.BackupPhasePartiallyFailed
backup.Status.Phase = velerov1api.BackupPhaseFinalizingAfterPluginOperationsPartiallyFailed
}
default:
if inProgressOperations {
backup.Status.Phase = velerov1api.BackupPhaseWaitingForPluginOperations
} else if backup.Status.AsyncBackupItemOperationsAttempted > 0 {
backup.Status.Phase = velerov1api.BackupPhaseFinalizingAfterPluginOperations
} else {
backup.Status.Phase = velerov1api.BackupPhaseCompleted
backup.Status.Phase = velerov1api.BackupPhaseFinalizingAfterPluginOperations
}
}
// Mark completion timestamp before serializing and uploading.
Expand Down
110 changes: 52 additions & 58 deletions pkg/controller/backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func TestProcessBackupCompletions(t *testing.T) {
backupExists bool
existenceCheckError error
}{
// Completed
// FinalizingAfterPluginOperations
{
name: "backup with no backup location gets the default",
backup: defaultBackup().Result(),
Expand Down Expand Up @@ -633,12 +633,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.True(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -669,12 +668,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.False(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -708,12 +706,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.True(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -745,12 +742,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.False(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
Expiration: &metav1.Time{now.Add(10 * time.Minute)},
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
Expiration: &metav1.Time{now.Add(10 * time.Minute)},
StartTimestamp: &timestamp,
},
},
},
Expand Down Expand Up @@ -782,12 +778,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.True(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -820,12 +815,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.False(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -858,12 +852,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.True(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -896,12 +889,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.True(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -934,12 +926,11 @@ func TestProcessBackupCompletions(t *testing.T) {
DefaultVolumesToFsBackup: boolptr.False(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseCompleted,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
CompletionTimestamp: &timestamp,
Expiration: &timestamp,
Phase: velerov1api.BackupPhaseFinalizingAfterPluginOperations,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
},
},
},
Expand Down Expand Up @@ -1087,13 +1078,16 @@ func TestProcessBackupCompletions(t *testing.T) {

// Ensure we have a CompletionTimestamp when uploading and that the backup name matches the backup in the object store.
// Failures will display the bytes in buf.
hasNameAndCompletionTimestamp := func(info persistence.BackupInfo) bool {
hasNameAndCompletionTimestampIfCompleted := func(info persistence.BackupInfo) bool {
buf := new(bytes.Buffer)
buf.ReadFrom(info.Metadata)
return info.Name == test.backup.Name &&
strings.Contains(buf.String(), `"completionTimestamp": "2006-01-02T22:04:05Z"`)
(!(strings.Contains(buf.String(), `"phase": "Completed"`) ||
strings.Contains(buf.String(), `"phase": "Failed"`) ||
strings.Contains(buf.String(), `"phase": "PartiallyFailed"`)) ||
strings.Contains(buf.String(), `"completionTimestamp": "2006-01-02T22:04:05Z"`))
}
backupStore.On("PutBackup", mock.MatchedBy(hasNameAndCompletionTimestamp)).Return(nil)
backupStore.On("PutBackup", mock.MatchedBy(hasNameAndCompletionTimestampIfCompleted)).Return(nil)

// add the test's backup to the informer/lister store
require.NotNil(t, test.backup)
Expand Down
Loading

0 comments on commit 206df46

Please sign in to comment.