Skip to content

Commit

Permalink
Add async reconciler interface and refactor async service to get exis…
Browse files Browse the repository at this point in the history
…ting resources
  • Loading branch information
Jont828 committed Dec 1, 2021
1 parent 69f3426 commit 070d46f
Show file tree
Hide file tree
Showing 24 changed files with 628 additions and 826 deletions.
65 changes: 51 additions & 14 deletions azure/services/async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ import (
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)

// Service is an implementation of the AsyncReconciler interface. It handles asynchronous creation and deletion of resources.
type Service struct {
Scope FutureScope
Creator
Deleter
}

// New creates a new async service.
func New(scope FutureScope, createClient Creator, deleteClient Deleter) *Service {
return &Service{
Scope: scope,
Creator: createClient,
Deleter: deleteClient,
}
}

// processOngoingOperation is a helper function that will process an ongoing operation to check if it is done.
// If it is not done, it will return a transient error.
func processOngoingOperation(ctx context.Context, scope FutureScope, client FutureHandler, resourceName string, serviceName string) (interface{}, error) {
Expand Down Expand Up @@ -69,61 +85,82 @@ func processOngoingOperation(ctx context.Context, scope FutureScope, client Futu
}

// CreateResource implements the logic for creating a resource Asynchronously.
func CreateResource(ctx context.Context, scope FutureScope, client Creator, spec azure.ResourceSpecGetter, serviceName string) (interface{}, error) {
func (s *Service) CreateResource(ctx context.Context, spec azure.ResourceSpecGetter, serviceName string) (interface{}, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "async.Service.CreateResource")
defer done()

resourceName := spec.ResourceName()
rgName := spec.ResourceGroupName()

// Check if there is an ongoing long running operation.
future := scope.GetLongRunningOperationState(resourceName, serviceName)
future := s.Scope.GetLongRunningOperationState(resourceName, serviceName)
if future != nil {
return processOngoingOperation(ctx, scope, client, resourceName, serviceName)
return processOngoingOperation(ctx, s.Scope, s.Creator, resourceName, serviceName)
}

// No long running operation is active, so create the resource.
scope.V(2).Info("creating resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
result, sdkFuture, err := client.CreateOrUpdateAsync(ctx, spec)

// Get the resource if it already exists, and use it to construct the desired resource parameters.
var existingResource interface{}
if existing, err := s.Creator.Get(ctx, spec); err != nil && !azure.ResourceNotFound(err) {
return nil, errors.Wrapf(err, "failed to get existing resource %s/%s (service: %s)", rgName, resourceName, serviceName)
} else if err == nil {
existingResource = existing
s.Scope.V(2).Info("successfully got existing resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
}

// Construct parameters using the resource spec and information from the existing resource, if there is one.
parameters, err := spec.Parameters(existingResource)
if err != nil {
return nil, errors.Wrapf(err, "failed to get desired parameters for resource %s/%s (service: %s)", rgName, resourceName, serviceName)
} else if parameters == nil {
// Nothing to do, don't create or update the resource and return the existing resource.
s.Scope.V(2).Info("resource up to date", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
return existingResource, nil
}

// Create or update the resource with the desired parameters.
s.Scope.V(2).Info("creating resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
result, sdkFuture, err := s.Creator.CreateOrUpdateAsync(ctx, spec, parameters)
if sdkFuture != nil {
future, err := converters.SDKToFuture(sdkFuture, infrav1.PutFuture, serviceName, resourceName, rgName)
if err != nil {
return nil, errors.Wrapf(err, "failed to create resource %s/%s (service: %s)", rgName, resourceName, serviceName)
}
scope.SetLongRunningOperationState(future)
s.Scope.SetLongRunningOperationState(future)
return nil, azure.WithTransientError(azure.NewOperationNotDoneError(future), retryAfter(sdkFuture))
} else if err != nil {
return nil, errors.Wrapf(err, "failed to create resource %s/%s (service: %s)", rgName, resourceName, serviceName)
}

scope.V(2).Info("successfully created resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
s.Scope.V(2).Info("successfully created resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
return result, nil
}

// DeleteResource implements the logic for deleting a resource Asynchronously.
func DeleteResource(ctx context.Context, scope FutureScope, client Deleter, spec azure.ResourceSpecGetter, serviceName string) error {
func (s *Service) DeleteResource(ctx context.Context, spec azure.ResourceSpecGetter, serviceName string) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "async.Service.DeleteResource")
defer done()

resourceName := spec.ResourceName()
rgName := spec.ResourceGroupName()

// Check if there is an ongoing long running operation.
future := scope.GetLongRunningOperationState(resourceName, serviceName)
future := s.Scope.GetLongRunningOperationState(resourceName, serviceName)
if future != nil {
_, err := processOngoingOperation(ctx, scope, client, resourceName, serviceName)
_, err := processOngoingOperation(ctx, s.Scope, s.Deleter, resourceName, serviceName)
return err
}

// No long running operation is active, so delete the resource.
scope.V(2).Info("deleting resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
sdkFuture, err := client.DeleteAsync(ctx, spec)
s.Scope.V(2).Info("deleting resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
sdkFuture, err := s.Deleter.DeleteAsync(ctx, spec)
if sdkFuture != nil {
future, err := converters.SDKToFuture(sdkFuture, infrav1.DeleteFuture, serviceName, resourceName, rgName)
if err != nil {
return errors.Wrapf(err, "failed to delete resource %s/%s (service: %s)", rgName, resourceName, serviceName)
}
scope.SetLongRunningOperationState(future)
s.Scope.SetLongRunningOperationState(future)
return azure.WithTransientError(azure.NewOperationNotDoneError(future), retryAfter(sdkFuture))
} else if err != nil {
if azure.ResourceNotFound(err) {
Expand All @@ -133,7 +170,7 @@ func DeleteResource(ctx context.Context, scope FutureScope, client Deleter, spec
return errors.Wrapf(err, "failed to delete resource %s/%s (service: %s)", rgName, resourceName, serviceName)
}

scope.V(2).Info("successfully deleted resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
s.Scope.V(2).Info("successfully deleted resource", "service", serviceName, "resource", resourceName, "resourceGroup", rgName)
return nil
}

Expand Down
108 changes: 92 additions & 16 deletions azure/services/async/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2019-05-01/resources"
"github.com/Azure/go-autorest/autorest"
azureautorest "github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/mock/gomock"
. "github.com/onsi/gomega"
"k8s.io/klog/v2/klogr"
Expand Down Expand Up @@ -58,8 +57,11 @@ var (
ResourceGroup: "test-group",
Data: "ZmFrZSBiNjQgZnV0dXJlIGRhdGEK",
}
fakeError = autorest.NewErrorWithResponse("", "", &http.Response{StatusCode: 500}, "Internal Server Error")
errCtxExceeded = errors.New("ctx exceeded")
fakeExistingResource = resources.GenericResource{}
fakeResourceParameters = resources.GenericResource{}
fakeInternalError = autorest.NewErrorWithResponse("", "", &http.Response{StatusCode: 500}, "Internal Server Error")
fakeNotFoundError = autorest.NewErrorWithResponse("", "", &http.Response{StatusCode: 404}, "Not Found")
errCtxExceeded = errors.New("ctx exceeded")
)

// TestProcessOngoingOperation tests the processOngoingOperation function.
Expand Down Expand Up @@ -101,7 +103,7 @@ func TestProcessOngoingOperation(t *testing.T) {
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockFutureHandlerMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
s.GetLongRunningOperationState("test-resource", "test-service").Return(&validDeleteFuture)
c.IsDone(gomockinternal.AContext(), gomock.AssignableToTypeOf(&azureautorest.Future{})).Return(false, fakeError)
c.IsDone(gomockinternal.AContext(), gomock.AssignableToTypeOf(&azureautorest.Future{})).Return(false, fakeInternalError)
},
},
{
Expand All @@ -118,15 +120,15 @@ func TestProcessOngoingOperation(t *testing.T) {
{
name: "operation is done",
expectedError: "",
expectedResult: resources.Group{Name: to.StringPtr("test-resource")},
expectedResult: &fakeExistingResource,
resourceName: "test-resource",
serviceName: "test-service",
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockFutureHandlerMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
s.GetLongRunningOperationState("test-resource", "test-service").Return(&validDeleteFuture)
c.IsDone(gomockinternal.AContext(), gomock.AssignableToTypeOf(&azureautorest.Future{})).Return(true, nil)
s.DeleteLongRunningOperationState("test-resource", "test-service")
c.Result(gomockinternal.AContext(), gomock.AssignableToTypeOf(&azureautorest.Future{}), infrav1.DeleteFuture).Return(resources.Group{Name: to.StringPtr("test-resource")}, nil)
c.Result(gomockinternal.AContext(), gomock.AssignableToTypeOf(&azureautorest.Future{}), infrav1.DeleteFuture).Return(&fakeExistingResource, nil)
},
},
}
Expand Down Expand Up @@ -191,7 +193,63 @@ func TestCreateResource(t *testing.T) {
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return("test-resource", nil, nil)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(&fakeExistingResource, nil)
r.Parameters(&fakeExistingResource).Return(&fakeResourceParameters, nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{}), &fakeResourceParameters).Return("test-resource", nil, nil)
},
},
{
name: "error occurs while running async get",
expectedError: "failed to get existing resource test-group/test-resource (service: test-service)",
serviceName: "test-service",
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockCreatorMockRecorder, r *mock_azure.MockResourceSpecGetterMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, fakeInternalError)
},
},
{
name: "async get returns not found",
expectedError: "",
serviceName: "test-service",
expectedResult: &fakeExistingResource,
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockCreatorMockRecorder, r *mock_azure.MockResourceSpecGetterMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, fakeNotFoundError)
r.Parameters(nil).Return(&fakeResourceParameters, nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{}), &fakeResourceParameters).Return(&fakeExistingResource, nil, nil)
},
},
{
name: "error occurs while running async spec parameters",
expectedError: "failed to get desired parameters for resource test-group/test-resource (service: test-service)",
serviceName: "test-service",
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockCreatorMockRecorder, r *mock_azure.MockResourceSpecGetterMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(&fakeExistingResource, nil)
r.Parameters(&fakeExistingResource).Return(nil, fakeInternalError)
},
},
{
name: "async spec parameters returns nil",
expectedError: "",
serviceName: "test-service",
expectedResult: &fakeExistingResource,
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockCreatorMockRecorder, r *mock_azure.MockResourceSpecGetterMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(&fakeExistingResource, nil)
r.Parameters(&fakeExistingResource).Return(nil, nil)
},
},
{
Expand All @@ -203,7 +261,9 @@ func TestCreateResource(t *testing.T) {
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, nil, fakeError)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(&fakeExistingResource, nil)
r.Parameters(&fakeExistingResource).Return(&fakeResourceParameters, nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{}), &fakeResourceParameters).Return(nil, nil, fakeInternalError)
},
},
{
Expand All @@ -215,7 +275,9 @@ func TestCreateResource(t *testing.T) {
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, &azureautorest.Future{}, errCtxExceeded)
c.Get(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(&fakeExistingResource, nil)
r.Parameters(&fakeExistingResource).Return(&fakeResourceParameters, nil)
c.CreateOrUpdateAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{}), &fakeResourceParameters).Return(nil, &azureautorest.Future{}, errCtxExceeded)
s.SetLongRunningOperationState(gomock.AssignableToTypeOf(&infrav1.Future{}))
},
},
Expand All @@ -230,12 +292,13 @@ func TestCreateResource(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
scopeMock := mock_async.NewMockFutureScope(mockCtrl)
clientMock := mock_async.NewMockCreator(mockCtrl)
creatorMock := mock_async.NewMockCreator(mockCtrl)
specMock := mock_azure.NewMockResourceSpecGetter(mockCtrl)

tc.expect(scopeMock.EXPECT(), clientMock.EXPECT(), specMock.EXPECT())
tc.expect(scopeMock.EXPECT(), creatorMock.EXPECT(), specMock.EXPECT())

result, err := CreateResource(context.TODO(), scopeMock, clientMock, specMock, tc.serviceName)
s := New(scopeMock, creatorMock, nil)
result, err := s.CreateResource(context.TODO(), specMock, tc.serviceName)
if tc.expectedError != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tc.expectedError))
Expand Down Expand Up @@ -279,6 +342,18 @@ func TestDeleteResource(t *testing.T) {
c.DeleteAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, nil)
},
},
{
name: "delete async returns not found",
expectedError: "",
serviceName: "test-service",
expect: func(s *mock_async.MockFutureScopeMockRecorder, c *mock_async.MockDeleterMockRecorder, r *mock_azure.MockResourceSpecGetterMockRecorder) {
s.V(gomock.AssignableToTypeOf(2)).AnyTimes().Return(klogr.New())
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.DeleteAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, fakeNotFoundError)
},
},
{
name: "error occurs while running async delete",
expectedError: "failed to delete resource test-group/test-resource (service: test-service)",
Expand All @@ -288,7 +363,7 @@ func TestDeleteResource(t *testing.T) {
r.ResourceName().Return("test-resource")
r.ResourceGroupName().Return("test-group")
s.GetLongRunningOperationState("test-resource", "test-service").Return(nil)
c.DeleteAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, fakeError)
c.DeleteAsync(gomockinternal.AContext(), gomock.AssignableToTypeOf(&mock_azure.MockResourceSpecGetter{})).Return(nil, fakeInternalError)
},
},
{
Expand All @@ -315,12 +390,13 @@ func TestDeleteResource(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
scopeMock := mock_async.NewMockFutureScope(mockCtrl)
clientMock := mock_async.NewMockDeleter(mockCtrl)
deleterMock := mock_async.NewMockDeleter(mockCtrl)
specMock := mock_azure.NewMockResourceSpecGetter(mockCtrl)

tc.expect(scopeMock.EXPECT(), clientMock.EXPECT(), specMock.EXPECT())
tc.expect(scopeMock.EXPECT(), deleterMock.EXPECT(), specMock.EXPECT())

err := DeleteResource(context.TODO(), scopeMock, clientMock, specMock, tc.serviceName)
s := New(scopeMock, nil, deleterMock)
err := s.DeleteResource(context.TODO(), specMock, tc.serviceName)
if tc.expectedError != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tc.expectedError))
Expand Down
9 changes: 8 additions & 1 deletion azure/services/async/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,18 @@ type FutureHandler interface {
// Creator is a client that can create or update a resource asynchronously.
type Creator interface {
FutureHandler
CreateOrUpdateAsync(ctx context.Context, spec azure.ResourceSpecGetter) (interface{}, azureautorest.FutureAPI, error)
CreateOrUpdateAsync(ctx context.Context, spec azure.ResourceSpecGetter, existingResource interface{}) (interface{}, azureautorest.FutureAPI, error)
Get(ctx context.Context, spec azure.ResourceSpecGetter) (interface{}, error)
}

// Deleter is a client that can delete a resource asynchronously.
type Deleter interface {
FutureHandler
DeleteAsync(ctx context.Context, spec azure.ResourceSpecGetter) (azureautorest.FutureAPI, error)
}

// Reconciler is a generic interface used to perform asynchronous reconciliation of Azure resources.
type Reconciler interface {
CreateResource(ctx context.Context, spec azure.ResourceSpecGetter, serviceName string) (interface{}, error)
DeleteResource(ctx context.Context, spec azure.ResourceSpecGetter, serviceName string) error
}
Loading

0 comments on commit 070d46f

Please sign in to comment.