Skip to content

Commit

Permalink
Merge pull request #1838 from Jont828/async-peerings
Browse files Browse the repository at this point in the history
Make virtual network peerings reconcile/delete async
  • Loading branch information
k8s-ci-robot authored Nov 9, 2021
2 parents 60025b0 + 0579d71 commit ed4521f
Show file tree
Hide file tree
Showing 9 changed files with 556 additions and 475 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/conditions_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ const (
ResourceGroupReadyCondition clusterv1.ConditionType = "ResourceGroupReady"
// VNetReadyCondition means the virtual network exists and is ready to be used.
VNetReadyCondition clusterv1.ConditionType = "VNetReady"
// VnetPeeringReadyCondition means the virtual network peerings exist and are ready to be used.
VnetPeeringReadyCondition clusterv1.ConditionType = "VnetPeeringReady"
// SecurityGroupsReadyCondition means the security groups exist and are ready to be used.
SecurityGroupsReadyCondition clusterv1.ConditionType = "SecurityGroupsReady"
// RouteTablesReadyCondition means the route tables exist and are ready to be used.
Expand Down
14 changes: 9 additions & 5 deletions azure/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/groups"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/vnetpeerings"
"sigs.k8s.io/cluster-api-provider-azure/util/futures"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)
Expand Down Expand Up @@ -303,23 +304,24 @@ func (s *ClusterScope) GroupSpec() azure.ResourceSpecGetter {
}

// VnetPeeringSpecs returns the virtual network peering specs.
func (s *ClusterScope) VnetPeeringSpecs() []azure.VnetPeeringSpec {
peeringSpecs := make([]azure.VnetPeeringSpec, 2*len(s.Vnet().Peerings))

func (s *ClusterScope) VnetPeeringSpecs() []azure.ResourceSpecGetter {
peeringSpecs := make([]azure.ResourceSpecGetter, 2*len(s.Vnet().Peerings))
for i, peering := range s.Vnet().Peerings {
forwardPeering := azure.VnetPeeringSpec{
forwardPeering := &vnetpeerings.VnetPeeringSpec{
PeeringName: azure.GenerateVnetPeeringName(s.Vnet().Name, peering.RemoteVnetName),
SourceVnetName: s.Vnet().Name,
SourceResourceGroup: s.Vnet().ResourceGroup,
RemoteVnetName: peering.RemoteVnetName,
RemoteResourceGroup: peering.ResourceGroup,
SubscriptionID: s.SubscriptionID(),
}
reversePeering := azure.VnetPeeringSpec{
reversePeering := &vnetpeerings.VnetPeeringSpec{
PeeringName: azure.GenerateVnetPeeringName(peering.RemoteVnetName, s.Vnet().Name),
SourceVnetName: peering.RemoteVnetName,
SourceResourceGroup: peering.ResourceGroup,
RemoteVnetName: s.Vnet().Name,
RemoteResourceGroup: s.Vnet().ResourceGroup,
SubscriptionID: s.SubscriptionID(),
}
peeringSpecs[i*2] = forwardPeering
peeringSpecs[i*2+1] = reversePeering
Expand Down Expand Up @@ -597,6 +599,7 @@ func (s *ClusterScope) PatchObject(ctx context.Context) error {
conditions.WithConditions(
infrav1.ResourceGroupReadyCondition,
infrav1.NetworkInfrastructureReadyCondition,
infrav1.VnetPeeringReadyCondition,
),
)

Expand All @@ -607,6 +610,7 @@ func (s *ClusterScope) PatchObject(ctx context.Context) error {
clusterv1.ReadyCondition,
infrav1.ResourceGroupReadyCondition,
infrav1.NetworkInfrastructureReadyCondition,
infrav1.VnetPeeringReadyCondition,
}})
}

Expand Down
123 changes: 107 additions & 16 deletions azure/services/vnetpeerings/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ package vnetpeerings

import (
"context"
"encoding/json"

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2021-02-01/network"
"github.com/Azure/go-autorest/autorest"
azureautorest "github.com/Azure/go-autorest/autorest/azure"
"github.com/pkg/errors"

infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/util/reconciler"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)

// Client wraps go-sdk.
type Client interface {
Get(context.Context, string, string, string) (network.VirtualNetworkPeering, error)
CreateOrUpdate(context.Context, string, string, string, network.VirtualNetworkPeering) error
Delete(context.Context, string, string, string) error
CreateOrUpdateAsync(context.Context, azure.ResourceSpecGetter) (interface{}, azureautorest.FutureAPI, error)
DeleteAsync(context.Context, azure.ResourceSpecGetter) (azureautorest.FutureAPI, error)
IsDone(context.Context, azureautorest.FutureAPI) (bool, error)
Result(context.Context, azureautorest.FutureAPI, string) (interface{}, error)
}

// AzureClient contains the Azure go-sdk Client.
Expand Down Expand Up @@ -61,36 +68,120 @@ func (ac *AzureClient) Get(ctx context.Context, resourceGroupName, vnetName, pee
return ac.peerings.Get(ctx, resourceGroupName, vnetName, peeringName)
}

// CreateOrUpdate creates or updates a virtual network peering in the specified virtual network.
func (ac *AzureClient) CreateOrUpdate(ctx context.Context, resourceGroupName, vnetName, peeringName string, peering network.VirtualNetworkPeering) error {
ctx, span := tele.Tracer().Start(ctx, "vnetpeerings.AzureClient.CreateOrUpdate")
// CreateOrUpdateAsync creates or updates a virtual network peering asynchronously.
// It sends a PUT request to Azure and if accepted without error, the func will return a Future which can be used to track the ongoing
// progress of the operation.
func (ac *AzureClient) CreateOrUpdateAsync(ctx context.Context, spec azure.ResourceSpecGetter) (interface{}, azureautorest.FutureAPI, error) {
ctx, span := tele.Tracer().Start(ctx, "vnetpeerings.AzureClient.CreateOrUpdateAsync")
defer span.End()

future, err := ac.peerings.CreateOrUpdate(ctx, resourceGroupName, vnetName, peeringName, peering, network.SyncRemoteAddressSpaceTrue)
var existingPeering interface{}

if existing, err := ac.Get(ctx, spec.ResourceGroupName(), spec.OwnerResourceName(), spec.ResourceName()); err != nil && !azure.ResourceNotFound(err) {
return nil, nil, errors.Wrapf(err, "failed to get virtual network peering %s for %s in %s", spec.ResourceName(), spec.OwnerResourceName(), spec.ResourceGroupName())
} else if err == nil {
existingPeering = existing
}

params, err := spec.Parameters(existingPeering)
if err != nil {
return err
return nil, nil, errors.Wrapf(err, "failed to get desired parameters for virtual network peering %s", spec.ResourceName())
}

peering, ok := params.(network.VirtualNetworkPeering)
if !ok {
if params == nil {
// nothing to do here.
return existingPeering, nil, nil
}
return nil, nil, errors.Errorf("%T is not a network.VirtualNetworkPeering", params)
}

future, err := ac.peerings.CreateOrUpdate(ctx, spec.ResourceGroupName(), spec.OwnerResourceName(), spec.ResourceName(), peering, network.SyncRemoteAddressSpaceTrue)
if err != nil {
return nil, nil, err
}

ctx, cancel := context.WithTimeout(ctx, reconciler.DefaultAzureCallTimeout)
defer cancel()

err = future.WaitForCompletionRef(ctx, ac.peerings.Client)
if err != nil {
return err
// if an error occurs, return the future.
// this means the long-running operation didn't finish in the specified timeout.
return nil, &future, err
}
_, err = future.Result(ac.peerings)
return err

result, err := future.Result(ac.peerings)
// if the operation completed, return a nil future
return result, nil, err
}

// Delete deletes the specified virtual network peering.
func (ac *AzureClient) Delete(ctx context.Context, resourceGroupName, vnetName, peeringName string) error {
// DeleteAsync deletes a virtual network peering asynchronously. DeleteAsync sends a DELETE
// request to Azure and if accepted without error, the func will return a Future which can be used to track the ongoing
// progress of the operation.
func (ac *AzureClient) DeleteAsync(ctx context.Context, spec azure.ResourceSpecGetter) (azureautorest.FutureAPI, error) {
ctx, span := tele.Tracer().Start(ctx, "vnetpeerings.AzureClient.Delete")
defer span.End()

future, err := ac.peerings.Delete(ctx, resourceGroupName, vnetName, peeringName)
future, err := ac.peerings.Delete(ctx, spec.ResourceGroupName(), spec.OwnerResourceName(), spec.ResourceName())
if err != nil {
return err
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, reconciler.DefaultAzureCallTimeout)
defer cancel()

err = future.WaitForCompletionRef(ctx, ac.peerings.Client)
if err != nil {
return err
// if an error occurs, return the future.
// this means the long-running operation didn't finish in the specified timeout.
return &future, err
}
_, err = future.Result(ac.peerings)
return err
// if the operation completed, return a nil future.
return nil, err
}

// IsDone returns true if the long-running operation has completed.
func (ac *AzureClient) IsDone(ctx context.Context, future azureautorest.FutureAPI) (bool, error) {
ctx, span := tele.Tracer().Start(ctx, "vnetpeerings.AzureClient.IsDone")
defer span.End()

done, err := future.DoneWithContext(ctx, ac.peerings)
if err != nil {
return false, errors.Wrap(err, "failed checking if the operation was complete")
}

return done, nil
}

// Result fetches the result of a long-running operation future.
func (ac *AzureClient) Result(ctx context.Context, futureData azureautorest.FutureAPI, futureType string) (interface{}, error) {
if futureData == nil {
return nil, errors.Errorf("cannot get result from nil future")
}
var result func(client network.VirtualNetworkPeeringsClient) (peering network.VirtualNetworkPeering, err error)

switch futureType {
case infrav1.PutFuture:
var future *network.VirtualNetworkPeeringsCreateOrUpdateFuture
jsonData, err := futureData.MarshalJSON()
if err != nil {
return nil, errors.Wrap(err, "failed to marshal future")
}
if err := json.Unmarshal(jsonData, &future); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal future data")
}
result = (*future).Result

case infrav1.DeleteFuture:
// Delete does not return a result virtual network peering
return nil, nil

default:
return nil, errors.Errorf("unknown future type %q", futureType)
}

return result(ac.peerings)
}
67 changes: 51 additions & 16 deletions azure/services/vnetpeerings/mock_vnetpeerings/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ed4521f

Please sign in to comment.