Skip to content

Commit

Permalink
Fix asoctl aborting import when an error occurs (#3212)
Browse files Browse the repository at this point in the history
* Split resource import from discovery of child resources
* Update ARM resource for modified interface
* Update resource importer
* Remove testing code
* Code gardening
  • Loading branch information
theunrepentantgeek authored Aug 29, 2023
1 parent 41622d9 commit 55e6ec8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 61 deletions.
115 changes: 61 additions & 54 deletions v2/cmd/asoctl/internal/importing/importable_arm_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
"github.com/Azure/azure-service-operator/v2/internal/controllers"
"github.com/Azure/azure-service-operator/v2/internal/genericarmclient"
"github.com/Azure/azure-service-operator/v2/internal/reflecthelpers"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/extensions"
"github.com/pkg/errors"
"github.com/vbauerster/mpb/v8"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/Azure/azure-service-operator/v2/pkg/genruntime/extensions"

"github.com/Azure/azure-service-operator/v2/internal/genericarmclient"

"github.com/Azure/azure-service-operator/v2/internal/reflecthelpers"

"github.com/Azure/azure-service-operator/v2/pkg/genruntime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
)

type importableARMResource struct {
Expand Down Expand Up @@ -68,7 +66,7 @@ func NewImportableARMResource(
}

// GroupKind returns the GroupKind of the resource being imported.
// (may be empty if the GK can't be determined)
// Returned value may be empty if the GK can't be determined.
func (i *importableARMResource) GroupKind() schema.GroupKind {
gk, _ := FindGroupKindForResourceType(i.armID.ResourceType.String())
return gk
Expand All @@ -92,13 +90,50 @@ func (i *importableARMResource) Resource() genruntime.MetaObject {

// Import imports this single resource.
// ctx is the context to use for the import.
// Returns a slice of child resources needing to be imported (if any), and/or an error.
// Both are returned to allow returning partial results in the case of a partial failure.
func (i *importableARMResource) Import(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error) {
var ref genruntime.ResourceReference
ref, err := i.importResource(ctx, i.armID)
// bar is a progress bar to update.
// Returns any errors that occur.
func (i *importableARMResource) Import(ctx context.Context, bar *mpb.Bar) error {
// Create an importable blank object into which we capture the current state of the resource
importable, err := i.createImportableObjectFromID(i.owner, i.armID)
if err != nil {
// Error doesn't need additional context
return err
}

loader := i.createImportFunction(importable)
result, err := loader(ctx, importable, i.owner)
if err != nil {
return nil, err
return err
}

if because, skipped := result.Skipped(); skipped {
gk := importable.GetObjectKind().GroupVersionKind().GroupKind()
return NewImportSkippedError(gk, i.armID.Name, because)
}

i.resource = importable
bar.SetCurrent(1)

return nil
}

// FindChildren returns any child resources that need to be imported.
// ctx allows for cancellation of the import.
// Returns any additional resources that also need to be imported, as well as any errors that occur.
// Partial success is allowed, but the caller should be notified of any errors.
func (i *importableARMResource) FindChildren(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error) {
if i.resource == nil {
// Nothing to do
return nil, nil
}

gvk := i.resource.GetObjectKind().GroupVersionKind()

ref := genruntime.ResourceReference{
Group: gvk.Group,
Kind: gvk.Kind,
Name: i.resource.GetName(),
ARMID: i.armID.String(),
}

var result []ImportableResource
Expand All @@ -120,56 +155,28 @@ func (i *importableARMResource) Import(ctx context.Context, bar *mpb.Bar) ([]Imp
total := int64(len(childTypes) + 1)
bar.SetTotal(total, false)

bar.SetCurrent(1)

// While we're looking for subresources, we need to treat any errors that occur as independent.
// Some potential subresource types can have limited accessibility (e.g. the subscriber may not
// be onboarded to a preview API), so we don't want to fail the entire import if we can't import
// a single candidate subresource type.
var errs []error
for _, subType := range childTypes {
subResources, err := i.importChildResources(ctx, ref, subType)
if err != nil {
gk, _ := FindGroupKindForResourceType(subType) // If this was going to error, it would have already
return nil, errors.Wrapf(err, "importing %s/%s for resource %s", gk.Group, gk.Kind, i.armID)
errs = append(errs, errors.Wrapf(err, "importing %s/%s", gk.Group, gk.Kind))
continue
}

result = append(result, subResources...)
bar.Increment()
}

return result, nil
}

// importResource imports the actual resource, returning a reference to the resource
func (i *importableARMResource) importResource(
ctx context.Context,
id *arm.ResourceID,
) (genruntime.ResourceReference, error) {
// Create an importable blank object into which we capture the current state of the resource
importable, err := i.createImportableObjectFromID(i.owner, id)
if err != nil {
// Error doesn't need additional context
return genruntime.ResourceReference{}, err
}

loader := i.createImportFunction(importable)
result, err := loader(ctx, importable, i.owner)
if err != nil {
return genruntime.ResourceReference{}, err
}

if because, skipped := result.Skipped(); skipped {
gk := importable.GetObjectKind().GroupVersionKind().GroupKind()
return genruntime.ResourceReference{}, NewImportSkippedError(gk, id.Name, because)
}

gvk := importable.GetObjectKind().GroupVersionKind()
i.resource = importable

ref := genruntime.ResourceReference{
Group: gvk.Group,
Kind: gvk.Kind,
Name: importable.GetName(),
ARMID: i.armID.String(),
}

return ref, nil
return result,
errors.Wrapf(
kerrors.NewAggregate(errs),
"importing childresources of %s",
i.armID)
}

func (i *importableARMResource) createImportFunction(
Expand Down
11 changes: 9 additions & 2 deletions v2/cmd/asoctl/internal/importing/importable_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ type ImportableResource interface {

// Import does the actual import, updating the Spec on the wrapped resource.
// ctx allows for cancellation of the import.
// If there are any additional resources that also need to be imported, they should be returned.
Import(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error)
// bar is a progress bar to update.
// Returns any errors that occur.
Import(ctx context.Context, bar *mpb.Bar) error

// FindChildren returns any child resources that need to be imported.
// ctx allows for cancellation of the import.
// Returns any additional resources that also need to be imported, as well as any errors that occur.
// Partial success is allowed, but the caller should be notified of any errors.
FindChildren(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error)
}

// importableResource is a core of common data and support methods for implementing ImportableResource
Expand Down
30 changes: 25 additions & 5 deletions v2/cmd/asoctl/internal/importing/resource_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,10 @@ func (ri *ResourceImporter) ImportResource(ctx context.Context, rsrc ImportableR
bar.SetTotal(bar.Current(), true)
}()

pending, err := rsrc.Import(ctx, bar)
err := rsrc.Import(ctx, bar)
if err != nil {
// Something went wrong importing this resource.
// If it was simply skipped, we keep track of it but don't need to return an error
var skipped *ImportSkippedError
if errors.As(err, &skipped) {
ri.log.V(1).Info(
Expand All @@ -209,15 +211,25 @@ func (ri *ResourceImporter) ImportResource(ctx context.Context, rsrc ImportableR
return errors.Errorf("failed during import of %s", rsrc.Name())
}

// Successfully imported the resource,
// so now we look for any child resources that need importing too

ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())

ri.report.AddSuccessfulImport(rsrc)
ri.Complete(rsrc)

ri.Complete(rsrc, pending)
return nil
pending, err := rsrc.FindChildren(ctx, bar)

// We always queue pending resources, even if there was an error looking
// for them, because we don't want an error in one part of a resource tree
// to impair importing other parts
ri.AddPending(pending)

return err
}

// DequeueResource returns the next resource to import.
Expand All @@ -240,13 +252,21 @@ func (ri *ResourceImporter) DequeueResource() (ImportableResource, bool) {
return importer, true
}

func (ri *ResourceImporter) Complete(importer ImportableResource, pending []ImportableResource) {
func (ri *ResourceImporter) Complete(importer ImportableResource) {
// Lock while we're modifying the maps
ri.lock.Lock()
defer ri.lock.Unlock()

// Add it to our map and our queue
// Add it to our map of completed importers
ri.imported[importer.Id()] = importer
}

func (ri *ResourceImporter) AddPending(pending []ImportableResource) {
// Lock while we're modifying the maps
ri.lock.Lock()
defer ri.lock.Unlock()

// Add to our queue of pending imports
for _, p := range pending {
ri.addImpl(p)
}
Expand Down

0 comments on commit 55e6ec8

Please sign in to comment.