Skip to content

Commit

Permalink
enable async reconcilation for azure machine pool
Browse files Browse the repository at this point in the history
  • Loading branch information
devigned committed Jan 16, 2021
1 parent 7aaf656 commit a675676
Show file tree
Hide file tree
Showing 25 changed files with 1,420 additions and 1,585 deletions.
17 changes: 17 additions & 0 deletions api/v1alpha3/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func (t Tags) Equals(other Tags) bool {
return reflect.DeepEqual(t, other)
}

// HasMatchingSpecVersionHash returns true if the resource has been tagged with a matching resource spec hash value.
func (t Tags) HasMatchingSpecVersionHash(hash string) bool {
value, ok := t[SpecVersionHashTagKey()]
return ok && value == hash
}

// HasOwned returns true if the tags contains a tag that marks the resource as owned by the cluster from the perspective of this management tooling.
func (t Tags) HasOwned(cluster string) bool {
value, ok := t[ClusterTagKey(cluster)]
Expand Down Expand Up @@ -68,6 +74,12 @@ func (t Tags) Merge(other Tags) {
}
}

// AddSpecVersionHashTag adds a spec version hash to the Azure resource tags to determine if state has changed quickly
func (t Tags) AddSpecVersionHashTag(hash string) Tags {
t[SpecVersionHashTagKey()] = hash
return t
}

// ResourceLifecycle configures the lifecycle of a resource
type ResourceLifecycle string

Expand Down Expand Up @@ -126,6 +138,11 @@ const (
VMTagsLastAppliedAnnotation = "sigs.k8s.io/cluster-api-provider-azure-last-applied-tags-vm"
)

// SpecVersionHashTagKey is the key for the spec version hash used to enable quick spec difference comparison
func SpecVersionHashTagKey() string {
return fmt.Sprintf("%s%s", NameAzureProviderPrefix, "spec-version-hash")
}

// ClusterTagKey generates the key for resources associated with a cluster.
func ClusterTagKey(name string) string {
return fmt.Sprintf("%s%s", NameAzureProviderOwned, name)
Expand Down
17 changes: 17 additions & 0 deletions api/v1alpha3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ const (
Node string = "node"
)

// Future contains the data needed for an Azure long running operation to continue across reconcile loops
type Future struct {
// Type describes the type of future, update, create, delete, etc
Type string `json:"type"`

// ResourceGroup is the Azure resource group for the resource
// +optional
ResourceGroup string `json:"resourceGroup,omitempty"`

// Name is the name of the Azure resource
// +optional
Name string `json:"name,omitempty"`

// FutureData is the base64 url encoded json Azure AutoRest Future
FutureData string `json:"futureData,omitempty"`
}

// NetworkSpec specifies what the Azure networking resources should look like.
type NetworkSpec struct {
// Vnet is the configuration for the Azure virtual network.
Expand Down
15 changes: 15 additions & 0 deletions api/v1alpha3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions cloud/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"time"

"github.com/Azure/go-autorest/autorest"

infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha3"
)

// ErrNotOwned is returned when a resource can't be deleted because it isn't owned.
Expand All @@ -33,6 +35,12 @@ func ResourceNotFound(err error) bool {
return errors.As(err, &derr) && derr.StatusCode == 404
}

// ResourceConflict parses the error to check if it's a resource conflict error (409)
func ResourceConflict(err error) bool {
derr := autorest.DetailedError{}
return errors.As(err, &derr) && derr.StatusCode == 409
}

// VMDeletedError is returned when a virtual machine is deleted outside of capz
type VMDeletedError struct {
ProviderID string
Expand Down Expand Up @@ -105,3 +113,26 @@ func WithTransientError(err error, requeueAfter time.Duration) ReconcileError {
func WithTerminalError(err error) ReconcileError {
return ReconcileError{error: err, errorType: TerminalErrorType}
}

// OperationNotDoneError is used to represent a long running operation that is not yet complete
type OperationNotDoneError struct {
Future *infrav1.Future
}

// NewOperationNotDoneError returns a new OperationNotDoneError wrapping a Future
func NewOperationNotDoneError(future *infrav1.Future) *OperationNotDoneError {
return &OperationNotDoneError{
Future: future,
}
}

// Error returns the error represented as a string
func (onde OperationNotDoneError) Error() string {
return fmt.Sprintf("operation type %s on Azure resource %s/%s is not done", onde.Future.Type, onde.Future.ResourceGroup, onde.Future.Name)
}

// Is returns true if the target is an OperationNotDoneError
func (onde OperationNotDoneError) Is(target error) bool {
_, ok := target.(OperationNotDoneError)
return ok
}
2 changes: 1 addition & 1 deletion cloud/scope/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *AzureClients) SubscriptionID() string {
// ClientID).
func (c *AzureClients) HashKey() string {
hasher := sha256.New()
hasher.Write([]byte(c.TenantID() + c.CloudEnvironment() + c.SubscriptionID() + c.ClientID()))
_, _ = hasher.Write([]byte(c.TenantID() + c.CloudEnvironment() + c.SubscriptionID() + c.ClientID()))
return base64.URLEncoding.EncodeToString(hasher.Sum(nil))
}

Expand Down
8 changes: 4 additions & 4 deletions cloud/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ import (
"strconv"
"strings"

"k8s.io/utils/net"

"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/to"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/klog/klogr"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha3"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
"k8s.io/utils/net"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"

infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha3"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
)

// ClusterScopeParams defines the input parameters used to create a new Scope.
Expand Down
13 changes: 7 additions & 6 deletions cloud/scope/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/klogr"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha3"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"

infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha3"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
)

// MachineScopeParams defines the input parameters used to create a new MachineScope.
Expand Down Expand Up @@ -194,13 +195,13 @@ func (m *MachineScope) NICNames() []string {

// DiskSpecs returns the disk specs.
func (m *MachineScope) DiskSpecs() []azure.DiskSpec {
spec := azure.DiskSpec{
disks := make([]azure.DiskSpec, 1+len(m.AzureMachine.Spec.DataDisks))
disks[0] = azure.DiskSpec{
Name: azure.GenerateOSDiskName(m.Name()),
}
disks := []azure.DiskSpec{spec}

for _, dd := range m.AzureMachine.Spec.DataDisks {
disks = append(disks, azure.DiskSpec{Name: azure.GenerateDataDiskName(m.Name(), dd.NameSuffix)})
for i, dd := range m.AzureMachine.Spec.DataDisks {
disks[i+1] = azure.DiskSpec{Name: azure.GenerateDataDiskName(m.Name(), dd.NameSuffix)}
}
return disks
}
Expand Down
60 changes: 48 additions & 12 deletions cloud/scope/machinepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/klogr"
"k8s.io/utils/pointer"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
capierrors "sigs.k8s.io/cluster-api/errors"
capiv1exp "sigs.k8s.io/cluster-api/exp/api/v1alpha3"
utilkubeconfig "sigs.k8s.io/cluster-api/util/kubeconfig"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"

infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha3"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
infrav1exp "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1alpha3"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)

type (
Expand Down Expand Up @@ -163,6 +163,9 @@ func (m *MachinePoolScope) NeedsK8sVersionUpdate() bool {
// the AzureMachinePool. This calculates the number of ready replicas, the current version the kubelet
// is running on the node, the provider IDs for the instances and the providerIDList for the AzureMachinePool spec.
func (m *MachinePoolScope) UpdateInstanceStatuses(ctx context.Context, instances []infrav1exp.VMSSVM) error {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.UpdateInstanceStatuses")
defer span.End()

providerIDs := make([]string, len(instances))
for i, instance := range instances {
providerIDs[i] = fmt.Sprintf("azure://%s", instance.ID)
Expand Down Expand Up @@ -207,6 +210,18 @@ func (m *MachinePoolScope) SaveK8sVersion() {
m.AzureMachinePool.Status.Version = *m.MachinePool.Spec.Template.Spec.Version
}

// SetLongRunningOperationState will set the future on the AzureMachinePool status to allow the resource to continue
// in the next reconciliation.
func (m *MachinePoolScope) SetLongRunningOperationState(future *infrav1.Future) {
m.AzureMachinePool.Status.LongRunningOperationState = future
}

// GetLongRunningOperationState will get the future on the AzureMachinePool status to allow the resource to continue
// in the next reconciliation.
func (m *MachinePoolScope) GetLongRunningOperationState() *infrav1.Future {
return m.AzureMachinePool.Status.LongRunningOperationState
}

// SetProvisioningState sets the AzureMachinePool provisioning state.
func (m *MachinePoolScope) SetProvisioningState(v infrav1.VMState) {
switch {
Expand Down Expand Up @@ -266,31 +281,34 @@ func (m *MachinePoolScope) SetAnnotation(key, value string) {

// PatchObject persists the machine spec and status.
func (m *MachinePoolScope) PatchObject(ctx context.Context) error {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.PatchObject")
defer span.End()

return m.patchHelper.Patch(ctx, m.AzureMachinePool)
}

// AzureMachineTemplate gets the Azure machine template in this scope.
func (m *MachinePoolScope) AzureMachineTemplate(ctx context.Context) (*infrav1.AzureMachineTemplate, error) {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.AzureMachineTemplate")
defer span.End()

ref := m.MachinePool.Spec.Template.Spec.InfrastructureRef
return getAzureMachineTemplate(ctx, m.client, ref.Name, ref.Namespace)
}

// Close the MachineScope by updating the machine spec, machine status.
func (m *MachinePoolScope) Close(ctx context.Context) error {
return m.patchHelper.Patch(ctx, m.AzureMachinePool)
}
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.Close")
defer span.End()

func getAzureMachineTemplate(ctx context.Context, c client.Client, name, namespace string) (*infrav1.AzureMachineTemplate, error) {
m := &infrav1.AzureMachineTemplate{}
key := client.ObjectKey{Name: name, Namespace: namespace}
if err := c.Get(ctx, key, m); err != nil {
return nil, err
}
return m, nil
return m.patchHelper.Patch(ctx, m.AzureMachinePool)
}

// GetBootstrapData returns the bootstrap data from the secret in the Machine's bootstrap.dataSecretName.
func (m *MachinePoolScope) GetBootstrapData(ctx context.Context) (string, error) {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.GetBootstrapData")
defer span.End()

dataSecretName := m.MachinePool.Spec.Template.Spec.Bootstrap.DataSecretName
if dataSecretName == nil {
return "", errors.New("error retrieving bootstrap data: linked Machine Spec's bootstrap.dataSecretName is nil")
Expand Down Expand Up @@ -338,7 +356,22 @@ func (m *MachinePoolScope) RoleAssignmentSpecs() []azure.RoleAssignmentSpec {
return []azure.RoleAssignmentSpec{}
}

func getAzureMachineTemplate(ctx context.Context, c client.Client, name, namespace string) (*infrav1.AzureMachineTemplate, error) {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.getAzureMachineTemplate")
defer span.End()

m := &infrav1.AzureMachineTemplate{}
key := client.ObjectKey{Name: name, Namespace: namespace}
if err := c.Get(ctx, key, m); err != nil {
return nil, err
}
return m, nil
}

func (m *MachinePoolScope) getNodeStatusByProviderID(ctx context.Context, providerIDList []string) (map[string]*NodeStatus, error) {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.getNodeStatusByProviderID")
defer span.End()

nodeStatusMap := map[string]*NodeStatus{}
for _, id := range providerIDList {
nodeStatusMap[id] = &NodeStatus{}
Expand Down Expand Up @@ -371,6 +404,9 @@ func (m *MachinePoolScope) getNodeStatusByProviderID(ctx context.Context, provid
}

func (m *MachinePoolScope) getWorkloadClient(ctx context.Context) (client.Client, error) {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolScope.getWorkloadClient")
defer span.End()

obj := client.ObjectKey{
Namespace: m.MachinePool.Namespace,
Name: m.ClusterName(),
Expand Down
6 changes: 3 additions & 3 deletions cloud/services/resourceskus/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ var (
clientCache Cacher
)

// NewCache instantiates a cache and initializes its contents.
func NewCache(auth azure.Authorizer, location string) *Cache {
// newCache instantiates a cache and initializes its contents.
func newCache(auth azure.Authorizer, location string) *Cache {
return &Cache{
client: NewClient(auth),
location: location,
Expand All @@ -88,7 +88,7 @@ func GetCache(auth azure.Authorizer, location string) (*Cache, error) {
return c.(*Cache), nil
}

c = NewCache(auth, location)
c = newCache(auth, location)
clientCache.Add(key, c)
return c.(*Cache), nil
}
Expand Down
Loading

0 comments on commit a675676

Please sign in to comment.