Skip to content

Commit

Permalink
Refactor NAT gateway spec into async service interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Jont828 committed Nov 15, 2021
1 parent feba723 commit e8a206b
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 135 deletions.
12 changes: 8 additions & 4 deletions azure/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/groups"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/natgateways"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/vnetpeerings"
"sigs.k8s.io/cluster-api-provider-azure/util/futures"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
Expand Down Expand Up @@ -230,14 +231,17 @@ func (s *ClusterScope) RouteTableSpecs() []azure.RouteTableSpec {
}

// NatGatewaySpecs returns the node nat gateway.
func (s *ClusterScope) NatGatewaySpecs() []azure.NatGatewaySpec {
natGateways := []azure.NatGatewaySpec{}
func (s *ClusterScope) NatGatewaySpecs() []azure.ResourceSpecGetter {
natGateways := []azure.ResourceSpecGetter{}

// We ignore the control plane nat gateway, as we will always use a LB to enable egress on the control plane.
for _, subnet := range s.NodeSubnets() {
if subnet.IsNatGatewayEnabled() {
natGateways = append(natGateways, azure.NatGatewaySpec{
Name: subnet.NatGateway.Name,
natGateways = append(natGateways, &natgateways.NatGatewaySpec{
Name: subnet.NatGateway.Name,
ResourceGroup: s.ResourceGroup(),
SubscriptionID: s.SubscriptionID(),
Location: s.Location(),
NatGatewayIP: infrav1.PublicIPSpec{
Name: subnet.NatGateway.NatGatewayIP.Name,
},
Expand Down
134 changes: 113 additions & 21 deletions azure/services/natgateways/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ package natgateways

import (
"context"
"encoding/json"

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2021-02-01/network"
"github.com/Azure/go-autorest/autorest"
azureautorest "github.com/Azure/go-autorest/autorest/azure"
"github.com/pkg/errors"

infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/util/reconciler"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)

// client wraps go-sdk.
type client interface {
Get(context.Context, string, string) (network.NatGateway, error)
CreateOrUpdate(context.Context, string, string, network.NatGateway) error
Delete(context.Context, string, string) error
CreateOrUpdateAsync(context.Context, azure.ResourceSpecGetter) (interface{}, azureautorest.FutureAPI, error)
DeleteAsync(context.Context, azure.ResourceSpecGetter) (azureautorest.FutureAPI, error)
IsDone(context.Context, azureautorest.FutureAPI) (bool, error)
Result(context.Context, azureautorest.FutureAPI, string) (interface{}, error)
}

// azureClient contains the Azure go-sdk Client.
Expand All @@ -55,42 +62,127 @@ func netNatGatewaysClient(subscriptionID string, baseURI string, authorizer auto

// Get gets the specified nat gateway.
func (ac *azureClient) Get(ctx context.Context, resourceGroupName, natGatewayName string) (network.NatGateway, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "natgateways.AzureClient.Get")
ctx, _, done := tele.StartSpanWithLogger(ctx, "natgateways.azureClient.Get")
defer done()

return ac.natgateways.Get(ctx, resourceGroupName, natGatewayName, "")
}

// CreateOrUpdate create or updates a nat gateway in a specified resource group.
func (ac *azureClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, natGatewayName string, natGateway network.NatGateway) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "natgateways.AzureClient.CreateOrUpdate")
defer done()
// CreateOrUpdateAsync creates or updates a Nat Gateway asynchronously.
// It sends a PUT request to Azure and if accepted without error, the func will return a Future which can be used to track the ongoing
// progress of the operation.
func (ac *azureClient) CreateOrUpdateAsync(ctx context.Context, spec azure.ResourceSpecGetter) (interface{}, azureautorest.FutureAPI, error) {
ctx, span := tele.Tracer().Start(ctx, "natgateways.azureClient.CreateOrUpdateAsync")
defer span.End()

var existingNatGateway interface{}

if existing, err := ac.Get(ctx, spec.ResourceGroupName(), spec.ResourceName()); err != nil && !azure.ResourceNotFound(err) {
return nil, nil, errors.Wrapf(err, "failed to get Nat Gateway %s for %s in %s", spec.ResourceName(), spec.OwnerResourceName(), spec.ResourceGroupName())
} else if err == nil {
existingNatGateway = existing
}

params, err := spec.Parameters(existingNatGateway)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get desired parameters for Nat Gateway %s", spec.ResourceName())
}

future, err := ac.natgateways.CreateOrUpdate(ctx, resourceGroupName, natGatewayName, natGateway)
natGateway, ok := params.(network.NatGateway)
if !ok {
if params == nil {
// nothing to do here.
return existingNatGateway, nil, nil
}
return nil, nil, errors.Errorf("%T is not a network.NatGateway", params)
}

future, err := ac.natgateways.CreateOrUpdate(ctx, spec.ResourceGroupName(), spec.ResourceName(), natGateway)
if err != nil {
return err
return nil, nil, err
}

ctx, cancel := context.WithTimeout(ctx, reconciler.DefaultAzureCallTimeout)
defer cancel()

err = future.WaitForCompletionRef(ctx, ac.natgateways.Client)
if err != nil {
return err
// if an error occurs, return the future.
// this means the long-running operation didn't finish in the specified timeout.
return nil, &future, err
}
_, err = future.Result(ac.natgateways)
return err

result, err := future.Result(ac.natgateways)

// if the operation completed, return a nil future
return result, nil, err
}

// Delete deletes the specified nat gateway.
func (ac *azureClient) Delete(ctx context.Context, resourceGroupName, natGatewayName string) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "natgateways.AzureClient.Delete")
defer done()
// DeleteAsync deletes a Nat Gateway asynchronously. DeleteAsync sends a DELETE
// request to Azure and if accepted without error, the func will return a Future which can be used to track the ongoing
// progress of the operation.
func (ac *azureClient) DeleteAsync(ctx context.Context, spec azure.ResourceSpecGetter) (azureautorest.FutureAPI, error) {
ctx, span := tele.Tracer().Start(ctx, "natgateways.azureClient.Delete")
defer span.End()

future, err := ac.natgateways.Delete(ctx, resourceGroupName, natGatewayName)
future, err := ac.natgateways.Delete(ctx, spec.ResourceGroupName(), spec.ResourceName())
if err != nil {
return err
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, reconciler.DefaultAzureCallTimeout)
defer cancel()

err = future.WaitForCompletionRef(ctx, ac.natgateways.Client)
if err != nil {
return err
// if an error occurs, return the future.
// this means the long-running operation didn't finish in the specified timeout.
return &future, err
}
_, err = future.Result(ac.natgateways)
return err
// if the operation completed, return a nil future.
return nil, err
}

// IsDone returns true if the long-running operation has completed.
func (ac *azureClient) IsDone(ctx context.Context, future azureautorest.FutureAPI) (bool, error) {
ctx, span := tele.Tracer().Start(ctx, "natgateways.azureClient.IsDone")
defer span.End()

done, err := future.DoneWithContext(ctx, ac.natgateways)
if err != nil {
return false, errors.Wrap(err, "failed checking if the operation was complete")
}

return done, nil
}

// Result fetches the result of a long-running operation future.
func (ac *azureClient) Result(ctx context.Context, futureData azureautorest.FutureAPI, futureType string) (interface{}, error) {
if futureData == nil {
return nil, errors.Errorf("cannot get result from nil future")
}
var result func(client network.NatGatewaysClient) (natGateway network.NatGateway, err error)

switch futureType {
case infrav1.PutFuture:
var future *network.NatGatewaysCreateOrUpdateFuture
jsonData, err := futureData.MarshalJSON()
if err != nil {
return nil, errors.Wrap(err, "failed to marshal future")
}
if err := json.Unmarshal(jsonData, &future); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal future data")
}
result = (*future).Result

case infrav1.DeleteFuture:
// Delete does not return a result Nat Gateway
return nil, nil

default:
return nil, errors.Errorf("unknown future type %q", futureType)
}

return result(ac.natgateways)
}
Loading

0 comments on commit e8a206b

Please sign in to comment.