Skip to content

Commit

Permalink
BIAv2 async operations controller work
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <[email protected]>
  • Loading branch information
sseago committed Feb 14, 2023
1 parent 7139daf commit ae5bb59
Show file tree
Hide file tree
Showing 37 changed files with 1,741 additions and 171 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5849-sseago
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
BIAv2 async operations controller work
19 changes: 19 additions & 0 deletions config/crd/v1/bases/velero.io_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ spec:
type: string
nullable: true
type: array
itemOperationTimeout:
description: ItemOperationTimeout specifies the time used to wait
for asynchronous BackupItemAction operations The default value is
1 hour.
type: string
labelSelector:
description: LabelSelector is a metav1.LabelSelector to filter with
when adding individual objects to the backup. If empty or nil, all
Expand Down Expand Up @@ -415,6 +420,20 @@ spec:
status:
description: BackupStatus captures the current status of a Velero backup.
properties:
asyncBackupItemOperationsAttempted:
description: AsyncBackupItemOperationsAttempted is the total number
of attempted async BackupItemAction operations for this backup.
type: integer
asyncBackupItemOperationsCompleted:
description: AsyncBackupItemOperationsCompleted is the total number
of successfully completed async BackupItemAction operations for
this backup.
type: integer
asyncBackupItemOperationsFailed:
description: AsyncBackupItemOperationsFailed is the total number of
async BackupItemAction operations for this backup which ended with
an error.
type: integer
completionTimestamp:
description: CompletionTimestamp records the time a backup was completed.
Completion time is recorded even on failed backups. Completion time
Expand Down
5 changes: 5 additions & 0 deletions config/crd/v1/bases/velero.io_schedules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ spec:
type: string
nullable: true
type: array
itemOperationTimeout:
description: ItemOperationTimeout specifies the time used to wait
for asynchronous BackupItemAction operations The default value
is 1 hour.
type: string
labelSelector:
description: LabelSelector is a metav1.LabelSelector to filter
with when adding individual objects to the backup. If empty
Expand Down
4 changes: 2 additions & 2 deletions config/crd/v1/crds/crds.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions pkg/apis/velero/v1/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ type BackupSpec struct {
// The default value is 10 minute.
// +optional
CSISnapshotTimeout metav1.Duration `json:"csiSnapshotTimeout,omitempty"`

// ItemOperationTimeout specifies the time used to wait for asynchronous BackupItemAction operations
// The default value is 1 hour.
// +optional
ItemOperationTimeout metav1.Duration `json:"itemOperationTimeout,omitempty"`
}

// BackupHooks contains custom behaviors that should be executed at different phases of the backup.
Expand Down Expand Up @@ -351,6 +356,21 @@ type BackupStatus struct {
// completed CSI VolumeSnapshots for this backup.
// +optional
CSIVolumeSnapshotsCompleted int `json:"csiVolumeSnapshotsCompleted,omitempty"`

// AsyncBackupItemOperationsAttempted is the total number of attempted
// async BackupItemAction operations for this backup.
// +optional
AsyncBackupItemOperationsAttempted int `json:"asyncBackupItemOperationsAttempted,omitempty"`

// AsyncBackupItemOperationsCompleted is the total number of successfully completed
// async BackupItemAction operations for this backup.
// +optional
AsyncBackupItemOperationsCompleted int `json:"asyncBackupItemOperationsCompleted,omitempty"`

// AsyncBackupItemOperationsFailed is the total number of async
// BackupItemAction operations for this backup which ended with an error.
// +optional
AsyncBackupItemOperationsFailed int `json:"asyncBackupItemOperationsFailed,omitempty"`
}

// BackupProgress stores information about the progress of a Backup's execution.
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/velero/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

189 changes: 184 additions & 5 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
Expand Down Expand Up @@ -1165,6 +1166,10 @@ func (a *recordResourcesAction) Cancel(operationID string, backup *velerov1.Back
return nil
}

func (a *recordResourcesAction) Name() string {
return ""
}

func (a *recordResourcesAction) ForResource(resource string) *recordResourcesAction {
a.selector.IncludedResources = append(a.selector.IncludedResources, resource)
return a
Expand Down Expand Up @@ -1474,6 +1479,10 @@ func (a *appliesToErrorAction) Cancel(operationID string, backup *velerov1.Backu
panic("not implemented")
}

func (a *appliesToErrorAction) Name() string {
return ""
}

// TestBackupActionModifications runs backups with backup item actions that make modifications
// to items in their Execute(...) methods and verifies that these modifications are
// persisted to the backup tarball. Verification is done by inspecting the file contents
Expand Down Expand Up @@ -2292,6 +2301,167 @@ func TestBackupWithSnapshots(t *testing.T) {
}
}

// TestBackupWithAsyncOperations runs backups which return operationIDs and
// verifies that the itemoperations are tracked as appropriate. Verification is done by
// looking at the backup request's itemOperationsList field.
func TestBackupWithAsyncOperations(t *testing.T) {
// completedOperationAction is a *pluggableAction, whose Execute(...)
// method returns an operationID which will always be done when calling Progress.
completedOperationAction := &pluggableAction{
executeFunc: func(item runtime.Unstructured, backup *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error) {
obj, ok := item.(*unstructured.Unstructured)
if !ok {
return nil, nil, "", errors.Errorf("unexpected type %T", item)
}

return obj, nil, obj.GetName() + "-1", nil
},
progressFunc: func(operationID string, backup *velerov1.Backup) (velero.OperationProgress, error) {
return velero.OperationProgress{
Completed: true,
Description: "Done!",
}, nil
},
}

// incompleteOperationAction is a *pluggableAction, whose Execute(...)
// method returns an operationID which will never be done when calling Progress.
incompleteOperationAction := &pluggableAction{
executeFunc: func(item runtime.Unstructured, backup *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error) {
obj, ok := item.(*unstructured.Unstructured)
if !ok {
return nil, nil, "", errors.Errorf("unexpected type %T", item)
}

return obj, nil, obj.GetName() + "-1", nil
},
progressFunc: func(operationID string, backup *velerov1.Backup) (velero.OperationProgress, error) {
return velero.OperationProgress{
Completed: false,
Description: "Working...",
}, nil
},
}

// noOperationAction is a *pluggableAction, whose Execute(...)
// method does not return an operationID.
noOperationAction := &pluggableAction{
executeFunc: func(item runtime.Unstructured, backup *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error) {
obj, ok := item.(*unstructured.Unstructured)
if !ok {
return nil, nil, "", errors.Errorf("unexpected type %T", item)
}

return obj, nil, "", nil
},
}

tests := []struct {
name string
req *Request
apiResources []*test.APIResource
actions []biav2.BackupItemAction
want []*itemoperation.BackupOperation
}{
{
name: "action that starts a short-running process records operation",
req: &Request{
Backup: defaultBackup().Result(),
},
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
),
},
actions: []biav2.BackupItemAction{
completedOperationAction,
},
want: []*itemoperation.BackupOperation{
{
Spec: itemoperation.BackupOperationSpec{
BackupName: "backup-1",
ResourceIdentifier: velero.ResourceIdentifier{
GroupResource: kuberesource.Pods,
Namespace: "ns-1",
Name: "pod-1"},
OperationID: "pod-1-1",
},
Status: itemoperation.OperationStatus{
Phase: "InProgress",
},
},
},
},
{
name: "action that starts a long-running process records operation",
req: &Request{
Backup: defaultBackup().Result(),
},
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-2").Result(),
),
},
actions: []biav2.BackupItemAction{
incompleteOperationAction,
},
want: []*itemoperation.BackupOperation{
{
Spec: itemoperation.BackupOperationSpec{
BackupName: "backup-1",
ResourceIdentifier: velero.ResourceIdentifier{
GroupResource: kuberesource.Pods,
Namespace: "ns-1",
Name: "pod-2"},
OperationID: "pod-2-1",
},
Status: itemoperation.OperationStatus{
Phase: "InProgress",
},
},
},
},
{
name: "action that has no operation doesn't record one",
req: &Request{
Backup: defaultBackup().Result(),
},
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-3").Result(),
),
},
actions: []biav2.BackupItemAction{
noOperationAction,
},
want: []*itemoperation.BackupOperation{},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
backupFile = bytes.NewBuffer([]byte{})
)

for _, resource := range tc.apiResources {
h.addItems(t, resource)
}

err := h.backupper.Backup(h.log, tc.req, backupFile, tc.actions, nil)
assert.NoError(t, err)

resultOper := *tc.req.GetItemOperationsList()
// set want Created times so it won't fail the assert.Equal test
for i, wantOper := range tc.want {
wantOper.Status.Created = resultOper[i].Status.Created
}
assert.Equal(t, tc.want, *tc.req.GetItemOperationsList())
})
}
}

// TestBackupWithInvalidHooks runs backups with invalid hook specifications and verifies
// that an error is returned.
func TestBackupWithInvalidHooks(t *testing.T) {
Expand Down Expand Up @@ -2740,11 +2910,12 @@ func TestBackupWithPodVolume(t *testing.T) {
}
}

// pluggableAction is a backup item action that can be plugged with an Execute
// function body at runtime.
// pluggableAction is a backup item action that can be plugged with Execute
// and Progress function bodies at runtime.
type pluggableAction struct {
selector velero.ResourceSelector
executeFunc func(runtime.Unstructured, *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error)
selector velero.ResourceSelector
executeFunc func(runtime.Unstructured, *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error)
progressFunc func(string, *velerov1.Backup) (velero.OperationProgress, error)
}

func (a *pluggableAction) Execute(item runtime.Unstructured, backup *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error) {
Expand All @@ -2760,13 +2931,21 @@ func (a *pluggableAction) AppliesTo() (velero.ResourceSelector, error) {
}

func (a *pluggableAction) Progress(operationID string, backup *velerov1.Backup) (velero.OperationProgress, error) {
return velero.OperationProgress{}, nil
if a.progressFunc == nil {
return velero.OperationProgress{}, nil
}

return a.progressFunc(operationID, backup)
}

func (a *pluggableAction) Cancel(operationID string, backup *velerov1.Backup) error {
return nil
}

func (a *pluggableAction) Name() string {
return ""
}

type harness struct {
*test.APIServer
backupper *kubernetesBackupper
Expand Down
31 changes: 28 additions & 3 deletions pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import (
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/features"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
vsv1 "github.com/vmware-tanzu/velero/pkg/plugin/velero/volumesnapshotter/v1"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
Expand Down Expand Up @@ -322,12 +324,35 @@ func (ib *itemBackupper) executeActions(
}
log.Info("Executing custom action")

// Note: we're ignoring the operationID returned from Execute for now, it will be used
// with the async plugin action implementation
updatedItem, additionalItemIdentifiers, _, err := action.Execute(obj, ib.backupRequest.Backup)
updatedItem, additionalItemIdentifiers, operationID, err := action.Execute(obj, ib.backupRequest.Backup)
if err != nil {
return nil, errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}

// If async plugin started async operation, add it to the ItemOperations list
if operationID != "" {
resourceIdentifier := velero.ResourceIdentifier{
GroupResource: groupResource,
Namespace: namespace,
Name: name,
}
now := metav1.Now()
newOperation := itemoperation.BackupOperation{
Spec: itemoperation.BackupOperationSpec{
BackupName: ib.backupRequest.Backup.Name,
BackupUID: string(ib.backupRequest.Backup.UID),
BackupItemAction: action.Name(),
ResourceIdentifier: resourceIdentifier,
OperationID: operationID,
},
Status: itemoperation.OperationStatus{
Phase: itemoperation.OperationPhaseInProgress,
Created: &now,
},
}
itemOperList := ib.backupRequest.GetItemOperationsList()
*itemOperList = append(*itemOperList, &newOperation)
}
u := &unstructured.Unstructured{Object: updatedItem.UnstructuredContent()}
mustInclude := u.GetAnnotations()[mustIncludeAdditionalItemAnnotation] == "true"

Expand Down
Loading

0 comments on commit ae5bb59

Please sign in to comment.