Skip to content

Commit

Permalink
Make tags reconcile async
Browse files Browse the repository at this point in the history
  • Loading branch information
Jont828 committed Mar 18, 2022
1 parent e4d992a commit bc7c796
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 85 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/conditions_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const (
RoleAssignmentReadyCondition clusterv1.ConditionType = "RoleAssignmentReady"
// DisksReadyCondition means the disks exist and are ready to be used.
DisksReadyCondition clusterv1.ConditionType = "DisksReady"
// TagsReadyCondition means the tags are up to date and are ready to be used.
TagsReadyCondition clusterv1.ConditionType = "DisksReady"
// NetworkInterfaceReadyCondition means the network interfaces exist and are ready to be used.
NetworkInterfaceReadyCondition clusterv1.ConditionType = "NetworkInterfacesReady"

Expand Down
1 change: 1 addition & 0 deletions azure/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ClusterDescriber interface {
ClusterName() string
Location() string
AdditionalTags() infrav1.Tags
AzureClusterAnnotations() map[string]string
AvailabilitySetEnabled() bool
CloudProviderConfigOverrides() *infrav1.CloudProviderConfigOverrides
FailureDomains() []string
Expand Down
20 changes: 14 additions & 6 deletions azure/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/cluster-api-provider-azure/azure/services/routetables"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/securitygroups"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/subnets"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/tags"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/virtualnetworks"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/vnetpeerings"
"sigs.k8s.io/cluster-api-provider-azure/util/futures"
Expand Down Expand Up @@ -891,6 +892,12 @@ func (s *ClusterScope) UpdatePatchStatus(condition clusterv1.ConditionType, serv
}
}

// AzureClusterAnnotations returns the annotations on an AzureCluster as a map[string]string.
func (s *ClusterScope) AzureClusterAnnotations() map[string]string {
// TODO: does this need to be a deep copy?
return s.AzureCluster.GetAnnotations()
}

// AnnotationJSON returns a map[string]interface from a JSON annotation.
func (s *ClusterScope) AnnotationJSON(annotation string) (map[string]interface{}, error) {
out := map[string]interface{}{}
Expand Down Expand Up @@ -927,12 +934,13 @@ func (s *ClusterScope) SetAnnotation(key, value string) {
}

// TagsSpecs returns the tag specs for the AzureCluster.
func (s *ClusterScope) TagsSpecs() []azure.TagsSpec {
return []azure.TagsSpec{
{
Scope: azure.ResourceGroupID(s.SubscriptionID(), s.ResourceGroup()),
Tags: s.AdditionalTags(),
Annotation: azure.RGTagsLastAppliedAnnotation,
func (s *ClusterScope) TagsSpecs() []azure.ResourceSpecGetter {
clusterAnnotations := s.AzureClusterAnnotations()
return []azure.ResourceSpecGetter{
&tags.TagsSpec{
Scope: azure.ResourceGroupID(s.SubscriptionID(), s.ResourceGroup()),
Tags: s.AdditionalTags(),
JSONAnnotation: clusterAnnotations[azure.RGTagsLastAppliedAnnotation],
},
}
}
14 changes: 8 additions & 6 deletions azure/scope/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/cluster-api-provider-azure/azure/services/inboundnatrules"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/networkinterfaces"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/resourceskus"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/tags"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/virtualmachines"
"sigs.k8s.io/cluster-api-provider-azure/util/futures"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
Expand Down Expand Up @@ -172,12 +173,13 @@ func (m *MachineScope) VMSpec() azure.ResourceSpecGetter {
}

// TagsSpecs returns the tags for the AzureMachine.
func (m *MachineScope) TagsSpecs() []azure.TagsSpec {
return []azure.TagsSpec{
{
Scope: azure.VMID(m.SubscriptionID(), m.ResourceGroup(), m.Name()),
Tags: m.AdditionalTags(),
Annotation: azure.VMTagsLastAppliedAnnotation,
func (m *MachineScope) TagsSpecs() []azure.ResourceSpecGetter {
clusterAnnotations := m.ClusterScoper.AzureClusterAnnotations()
return []azure.ResourceSpecGetter{
&tags.TagsSpec{
Scope: azure.VMID(m.SubscriptionID(), m.ResourceGroup(), m.Name()),
Tags: m.AdditionalTags(),
JSONAnnotation: clusterAnnotations[azure.VMTagsLastAppliedAnnotation],
},
}
}
Expand Down
30 changes: 28 additions & 2 deletions azure/services/tags/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2019-10-01/resources"
"github.com/Azure/go-autorest/autorest"
"github.com/pkg/errors"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)
Expand Down Expand Up @@ -51,9 +52,34 @@ func newTagsClient(subscriptionID string, baseURI string, authorizer autorest.Au
return tagsClient
}

// Get gets a set of tags at the scope.
func (ac *azureClient) Get(ctx context.Context, spec azure.ResourceSpecGetter) (result interface{}, err error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "tags.azureClient.Get")
defer done()

return ac.tags.GetAtScope(ctx, spec.ResourceName())
}

// CreateOrUpdateAsync wraps the UpdateAtScope() call for tags.
// Creating a set of tags is not a long running operation, so we don't ever return a future.
func (ac *azureClient) CreateOrUpdateAsync(ctx context.Context, spec azure.ResourceSpecGetter, parameters interface{}) (result interface{}, future azureautorest.FutureAPI, err error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "tags.azureClient.CreateOrUpdate")
defer done()

// TODO: UpdateAtScope() returns a resources.TagsResource while parameters are a resources.TagsPatchResource, is this a problem?
tags, ok := parameters.(resources.TagsPatchResource)
if !ok {
return nil, nil, errors.Errorf("%T is not a resources.TagsPatchResource", parameters)
}

// TODO: do we ever want to call CreateOrUpdate on tags?
result, err = ac.tags.UpdateAtScope(ctx, spec.ResourceName(), tags)
return result, nil, err
}

// GetAtScope sends the get at scope request.
func (ac *azureClient) GetAtScope(ctx context.Context, scope string) (resources.TagsResource, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "tags.AzureClient.GetAtScope")
ctx, _, done := tele.StartSpanWithLogger(ctx, "tags.azureClient.GetAtScope")
defer done()

return ac.tags.GetAtScope(ctx, scope)
Expand All @@ -62,7 +88,7 @@ func (ac *azureClient) GetAtScope(ctx context.Context, scope string) (resources.
// UpdateAtScope this operation allows replacing, merging or selectively deleting tags on the specified resource or
// subscription.
func (ac *azureClient) UpdateAtScope(ctx context.Context, scope string, parameters resources.TagsPatchResource) (resources.TagsResource, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "tags.AzureClient.UpdateAtScope")
ctx, _, done := tele.StartSpanWithLogger(ctx, "tags.azureClient.UpdateAtScope")
defer done()

return ac.tags.UpdateAtScope(ctx, scope, parameters)
Expand Down
107 changes: 36 additions & 71 deletions azure/services/tags/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2019-10-01/resources"
"github.com/Azure/go-autorest/autorest/to"
"github.com/pkg/errors"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/converters"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/async"
"sigs.k8s.io/cluster-api-provider-azure/util/reconciler"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)

Expand All @@ -32,23 +35,25 @@ const serviceName = "tags"
// TagScope defines the scope interface for a tags service.
type TagScope interface {
azure.Authorizer
azure.AsyncStatusUpdater
ClusterName() string
TagsSpecs() []azure.TagsSpec
TagsSpecs() []azure.ResourceSpecGetter
AnnotationJSON(string) (map[string]interface{}, error)
UpdateAnnotationJSON(string, map[string]interface{}) error
}

// Service provides operations on Azure resources.
type Service struct {
Scope TagScope
client
async.Reconciler
}

// New creates a new service.
func New(scope TagScope) *Service {
client := newClient(scope)
return &Service{
Scope: scope,
client: newClient(scope),
Scope: scope,
Reconciler: async.New(scope, client, nil),
}
}

Expand All @@ -62,6 +67,33 @@ func (s *Service) Reconcile(ctx context.Context) error {
ctx, log, done := tele.StartSpanWithLogger(ctx, "tags.Service.Reconcile")
defer done()

ctx, cancel := context.WithTimeout(ctx, reconciler.DefaultAzureServiceReconcileTimeout)
defer cancel()

specs := s.Scope.TagsSpecs()
if len(specs) == 0 {
return nil
}

// We go through the list of TagsSpecs to reconcile each one, independently of the result of the previous one.
// If multiple errors occur, we return the most pressing one.
// Order of precedence (highest -> lowest) is: error that is not an operationNotDoneError (i.e. error creating) -> operationNotDoneError (i.e. creating in progress) -> no error (i.e. created)
var resultErr error
for _, tagsSpec := range specs {
if result, err := s.CreateResource(ctx, tagsSpec, serviceName); err != nil {
if !azure.IsOperationNotDoneError(err) || resultErr == nil {
resultErr = err
}
}
// if err = s.Scope.UpdateAnnotationJSON(tagsSpec.Annotation, newAnnotation); err != nil {
// return err
// }

}

s.Scope.UpdatePutStatus(infrav1.VnetPeeringReadyCondition, serviceName, resultErr)
return resultErr

for _, tagsSpec := range s.Scope.TagsSpecs() {
existingTags, err := s.client.GetAtScope(ctx, tagsSpec.Scope)
if err != nil {
Expand Down Expand Up @@ -128,73 +160,6 @@ func (s *Service) Delete(ctx context.Context) error {
return nil
}

// tagsChanged determines which tags to delete and which to add.
func tagsChanged(lastAppliedTags map[string]interface{}, desiredTags map[string]string, currentTags map[string]*string) (bool, map[string]string, map[string]string, map[string]interface{}) {
// Bool tracking if we found any changed state.
changed := false

// Tracking for created/updated
createdOrUpdated := map[string]string{}

// Tracking for tags that were deleted.
deleted := map[string]string{}

// The new annotation that we need to set if anything is created/updated.
newAnnotation := map[string]interface{}{}

// Loop over lastAppliedTags, checking if entries are in desiredTags.
// If an entry is present in lastAppliedTags but not in desiredTags, it has been deleted
// since last time. We flag this in the deleted map.
for t, v := range lastAppliedTags {
_, ok := desiredTags[t]

// Entry isn't in desiredTags, it has been deleted.
if !ok {
// Cast v to a string here. This should be fine, tags are always
// strings.
deleted[t] = v.(string)
changed = true
}
}

// Loop over desiredTags, checking for entries in currentTags.
//
// If an entry is in desiredTags, but not currentTags, it has been created since
// last time, or some external entity deleted it.
//
// If an entry is in both desiredTags and currentTags, we compare their values, if
// the value in desiredTags differs from that in currentTags, the tag has been
// updated since last time or some external entity modified it.
for t, v := range desiredTags {
av, ok := currentTags[t]

// Entries in the desiredTags always need to be noted in the newAnnotation. We
// know they're going to be created or updated.
newAnnotation[t] = v

// Entry isn't in desiredTags, it's new.
if !ok {
createdOrUpdated[t] = v
newAnnotation[t] = v
changed = true
continue
}

// Entry is in desiredTags, has the value changed?
if v != *av {
createdOrUpdated[t] = v
changed = true
}

// Entry existed in both desiredTags and desiredTags, and their values were
// equal. Nothing to do.
}

// We made it through the loop, and everything that was in desiredTags, was also
// in dst. Nothing changed.
return changed, createdOrUpdated, deleted, newAnnotation
}

// IsManaged returns always returns true as CAPZ does not support BYO tags.
func (s *Service) IsManaged(ctx context.Context) (bool, error) {
return true, nil
Expand Down

0 comments on commit bc7c796

Please sign in to comment.