From e7b7dc866ac87ae3b7598aac8b6fcbf9109f4792 Mon Sep 17 00:00:00 2001 From: Alexander Demichev Date: Tue, 18 Aug 2020 22:32:54 +0200 Subject: [PATCH] Add ability to sync aws machine and node labels --- controllers/awsmachine_controller.go | 13 ++++++++++ main.go | 25 +++++++++++++++++++ pkg/cloud/scope/machine.go | 37 ++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/controllers/awsmachine_controller.go b/controllers/awsmachine_controller.go index b1cd8d563c..aec79aa6ff 100644 --- a/controllers/awsmachine_controller.go +++ b/controllers/awsmachine_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/utils/pointer" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/controllers/noderefutil" + "sigs.k8s.io/cluster-api/controllers/remote" capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/conditions" @@ -53,6 +54,7 @@ import ( type AWSMachineReconciler struct { client.Client Log logr.Logger + Tracker *remote.ClusterCacheTracker Recorder record.EventRecorder ec2ServiceFactory func(*scope.ClusterScope) services.EC2MachineInterface secretsManagerServiceFactory func(*scope.ClusterScope) services.SecretsManagerInterface @@ -483,6 +485,17 @@ func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope * // TODO(vincepri): Remove this annotation when clusterctl is no longer relevant. machineScope.SetAnnotation("cluster-api-provider-aws", "true") + // Get the remote cluster client. + remoteClient, err := r.Tracker.GetClient(context.Background(), util.ObjectKey(machineScope.Cluster)) + if err != nil { + return ctrl.Result{}, err + } + + err = machineScope.SyncMachineNodeLabel(context.Background(), remoteClient) + if err != nil { + return ctrl.Result{}, err + } + switch instance.State { case infrav1.InstanceStatePending: machineScope.SetNotReady() diff --git a/main.go b/main.go index a60c03ac26..174675ad2d 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/cluster-api-provider-aws/pkg/record" "sigs.k8s.io/cluster-api-provider-aws/version" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + "sigs.k8s.io/cluster-api/controllers/remote" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -67,6 +68,7 @@ func main() { profilerAddress string awsClusterConcurrency int awsMachineConcurrency int + clusterConcurrency int syncPeriod time.Duration webhookPort int healthAddr string @@ -119,6 +121,9 @@ func main() { "Number of AWSMachines to process simultaneously", ) + flag.IntVar(&clusterConcurrency, "cluster-concurrency", 10, + "Number of clusters to process simultaneously") + flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, @@ -179,10 +184,30 @@ func main() { record.InitFromRecorder(mgr.GetEventRecorderFor("aws-controller")) if webhookPort == 0 { + // Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers + // requiring a connection to a remote cluster + tracker, err := remote.NewClusterCacheTracker( + ctrl.Log.WithName("remote").WithName("ClusterCacheTracker"), + mgr, + ) + if err != nil { + setupLog.Error(err, "unable to create cluster cache tracker") + os.Exit(1) + } + if err := (&remote.ClusterCacheReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("remote").WithName("ClusterCacheReconciler"), + Tracker: tracker, + }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: clusterConcurrency}); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") + os.Exit(1) + } + if err = (&controllers.AWSMachineReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("AWSMachine"), Recorder: mgr.GetEventRecorderFor("awsmachine-controller"), + Tracker: tracker, }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: awsMachineConcurrency}); err != nil { setupLog.Error(err, "unable to create controller", "controller", "AWSMachine") os.Exit(1) diff --git a/pkg/cloud/scope/machine.go b/pkg/cloud/scope/machine.go index 07540e2d8b..aa7c46a934 100644 --- a/pkg/cloud/scope/machine.go +++ b/pkg/cloud/scope/machine.go @@ -20,6 +20,7 @@ import ( "context" "encoding/base64" "fmt" + "strings" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -181,6 +182,42 @@ func (m *MachineScope) SetAnnotation(key, value string) { m.AWSMachine.Annotations[key] = value } +// SyncMachineNodeLabel sets a key value label on the AWSMachine node. +func (m *MachineScope) SyncMachineNodeLabel(ctx context.Context, remoteClient client.Client) error { + if m.Machine.Status.NodeRef == nil { + return nil + } + + node := &corev1.Node{} + err := remoteClient.Get(ctx, types.NamespacedName{Name: m.Machine.Status.NodeRef.Name}, node) + if err != nil { + return err + } + + if node.Labels == nil { + node.Labels = map[string]string{} + } + + machineLabels := m.AWSMachine.Labels + if machineLabels == nil { + return nil + } + + for key, value := range machineLabels { + // sync only labels with "cluster.x-k8s.io", so users can understand where labels come from + if strings.HasPrefix(key, "cluster.x-k8s.io") { + node.Labels[key] = value + } + } + + err = remoteClient.Update(ctx, node) + if err != nil { + return err + } + + return nil +} + // UseSecretsManager returns the computed value of whether or not // userdata should be stored using AWS Secrets Manager. func (m *MachineScope) UseSecretsManager() bool {