diff --git a/api/v1alpha3/machinehealthcheck_types.go b/api/v1alpha3/machinehealthcheck_types.go index e9f6d05b3e12..948abf6c4219 100644 --- a/api/v1alpha3/machinehealthcheck_types.go +++ b/api/v1alpha3/machinehealthcheck_types.go @@ -44,6 +44,11 @@ type MachineHealthCheckSpec struct { // "selector" are not healthy. // +optional MaxUnhealthy *intstr.IntOrString `json:"maxUnhealthy,omitempty"` + + // Machines older than this duration without a node will be considered to have + // failed and will be remediated. + // +optional + NodeStartupTimeout *metav1.Duration `json:"nodeStartupTimeout,omitempty"` } // ANCHOR_END: MachineHealthCHeckSpec @@ -73,11 +78,11 @@ type UnhealthyCondition struct { type MachineHealthCheckStatus struct { // total number of machines counted by this machine health check // +kubebuilder:validation:Minimum=0 - ExpectedMachines int32 `json:"expectedMachines"` + ExpectedMachines int32 `json:"expectedMachines,omitempty"` // total number of healthy machines counted by this machine health check // +kubebuilder:validation:Minimum=0 - CurrentHealthy int32 `json:"currentHealthy"` + CurrentHealthy int32 `json:"currentHealthy,omitempty"` } // ANCHOR_END: MachineHealthCheckStatus diff --git a/api/v1alpha3/machinehealthcheck_webhook.go b/api/v1alpha3/machinehealthcheck_webhook.go index a74831dffa82..8695891b3c6e 100644 --- a/api/v1alpha3/machinehealthcheck_webhook.go +++ b/api/v1alpha3/machinehealthcheck_webhook.go @@ -18,6 +18,7 @@ package v1alpha3 import ( "fmt" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,6 +29,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" ) +var ( + // Default time allowed for a node to start up. Can be made longer as part of + // spec if required for particular provider. + // 10 minutes should allow the instance to start and the node to join the + // cluster on most providers. + defaultNodeStartupTimeout = metav1.Duration{Duration: 10 * time.Minute} +) + func (m *MachineHealthCheck) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(m). @@ -46,6 +55,10 @@ func (m *MachineHealthCheck) Default() { defaultMaxUnhealthy := intstr.FromString("100%") m.Spec.MaxUnhealthy = &defaultMaxUnhealthy } + + if m.Spec.NodeStartupTimeout == nil { + m.Spec.NodeStartupTimeout = &defaultNodeStartupTimeout + } } // ValidateCreate implements webhook.Validator so a webhook will be registered for the type @@ -86,6 +99,13 @@ func (m *MachineHealthCheck) validate(old *MachineHealthCheck) error { ) } + if m.Spec.NodeStartupTimeout != nil && m.Spec.NodeStartupTimeout.Seconds() < 30 { + allErrs = append( + allErrs, + field.Invalid(field.NewPath("spec", "nodeStartupTimeout"), m.Spec.NodeStartupTimeout, "must be at least 30s"), + ) + } + if len(allErrs) == 0 { return nil } diff --git a/api/v1alpha3/machinehealthcheck_webhook_test.go b/api/v1alpha3/machinehealthcheck_webhook_test.go index ebfcc6c05020..e838f3a4cf12 100644 --- a/api/v1alpha3/machinehealthcheck_webhook_test.go +++ b/api/v1alpha3/machinehealthcheck_webhook_test.go @@ -18,6 +18,7 @@ package v1alpha3 import ( "testing" + "time" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,6 +31,8 @@ func TestMachineHealthCheckDefault(t *testing.T) { mhc.Default() g.Expect(mhc.Spec.MaxUnhealthy.String()).To(Equal("100%")) + g.Expect(mhc.Spec.NodeStartupTimeout).ToNot(BeNil()) + g.Expect(*mhc.Spec.NodeStartupTimeout).To(Equal(metav1.Duration{Duration: 10 * time.Minute})) } func TestMachineHealthCheckLabelSelectorAsSelectorValidation(t *testing.T) { @@ -115,3 +118,66 @@ func TestMachineHealthCheckClusterNameImmutable(t *testing.T) { }) } } + +func TestMachineHealthCheckNodeStartupTimeout(t *testing.T) { + zero := metav1.Duration{Duration: 0} + twentyNineSeconds := metav1.Duration{Duration: 29 * time.Second} + thirtySeconds := metav1.Duration{Duration: 30 * time.Second} + oneMinute := metav1.Duration{Duration: 1 * time.Minute} + minusOneMinute := metav1.Duration{Duration: -1 * time.Minute} + + tests := []struct { + name string + timeout *metav1.Duration + expectErr bool + }{ + { + name: "when the nodeStartupTimeout is not given", + timeout: nil, + expectErr: false, + }, + { + name: "when the nodeStartupTimeout is greater than 30s", + timeout: &oneMinute, + expectErr: false, + }, + { + name: "when the nodeStartupTimeout is 30s", + timeout: &thirtySeconds, + expectErr: false, + }, + { + name: "when the nodeStartupTimeout is 29s", + timeout: &twentyNineSeconds, + expectErr: true, + }, + { + name: "when the nodeStartupTimeout is less than 0", + timeout: &minusOneMinute, + expectErr: true, + }, + { + name: "when the nodeStartupTimeout is 0", + timeout: &zero, + expectErr: true, + }, + } + + for _, tt := range tests { + g := NewWithT(t) + + mhc := &MachineHealthCheck{ + Spec: MachineHealthCheckSpec{ + NodeStartupTimeout: tt.timeout, + }, + } + + if tt.expectErr { + g.Expect(mhc.ValidateCreate()).NotTo(Succeed()) + g.Expect(mhc.ValidateUpdate(mhc)).NotTo(Succeed()) + } else { + g.Expect(mhc.ValidateCreate()).To(Succeed()) + g.Expect(mhc.ValidateUpdate(mhc)).To(Succeed()) + } + } +} diff --git a/api/v1alpha3/zz_generated.deepcopy.go b/api/v1alpha3/zz_generated.deepcopy.go index a36c4dfca348..29417392e51f 100644 --- a/api/v1alpha3/zz_generated.deepcopy.go +++ b/api/v1alpha3/zz_generated.deepcopy.go @@ -538,6 +538,11 @@ func (in *MachineHealthCheckSpec) DeepCopyInto(out *MachineHealthCheckSpec) { *out = new(intstr.IntOrString) **out = **in } + if in.NodeStartupTimeout != nil { + in, out := &in.NodeStartupTimeout, &out.NodeStartupTimeout + *out = new(metav1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MachineHealthCheckSpec. diff --git a/config/crd/bases/cluster.x-k8s.io_machinehealthchecks.yaml b/config/crd/bases/cluster.x-k8s.io_machinehealthchecks.yaml index 1723162d9cef..c5d109640699 100644 --- a/config/crd/bases/cluster.x-k8s.io_machinehealthchecks.yaml +++ b/config/crd/bases/cluster.x-k8s.io_machinehealthchecks.yaml @@ -67,6 +67,10 @@ spec: description: Any further remediation is only allowed if at most "MaxUnhealthy" machines selected by "selector" are not healthy. x-kubernetes-int-or-string: true + nodeStartupTimeout: + description: Machines older than this duration without a node will + be considered to have failed and will be remediated. + type: string selector: description: Label selector to match machines whose health will be exercised @@ -158,9 +162,6 @@ spec: format: int32 minimum: 0 type: integer - required: - - currentHealthy - - expectedMachines type: object type: object served: true diff --git a/controllers/machine_helpers.go b/controllers/machine_helpers.go index c8efb7a76909..4e70abc82158 100644 --- a/controllers/machine_helpers.go +++ b/controllers/machine_helpers.go @@ -20,6 +20,8 @@ import ( "context" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -47,3 +49,20 @@ func getActiveMachinesInCluster(ctx context.Context, c client.Client, namespace, } return machines, nil } + +// hasMatchingLabels verifies that the Label Selector matches the given Labels +func hasMatchingLabels(matchSelector metav1.LabelSelector, matchLabels map[string]string) bool { + // This should never fail, validating webhook should catch this first + selector, err := metav1.LabelSelectorAsSelector(&matchSelector) + if err != nil { + return false + } + // If a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() { + return false + } + if !selector.Matches(labels.Set(matchLabels)) { + return false + } + return true +} diff --git a/controllers/machine_helpers_test.go b/controllers/machine_helpers_test.go index a6dbad9b7a33..ebf310914ba8 100644 --- a/controllers/machine_helpers_test.go +++ b/controllers/machine_helpers_test.go @@ -127,3 +127,74 @@ func Test_getActiveMachinesInCluster(t *testing.T) { }) } } + +func TestMachineHealthCheckHasMatchingLabels(t *testing.T) { + testCases := []struct { + name string + selector metav1.LabelSelector + labels map[string]string + expected bool + }{ + { + name: "selector matches labels", + + selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "foo": "bar", + }, + }, + + labels: map[string]string{ + "foo": "bar", + }, + + expected: true, + }, + { + name: "selector does not match labels", + + selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "foo": "bar", + }, + }, + + labels: map[string]string{ + "no": "match", + }, + expected: false, + }, + { + name: "selector is empty", + selector: metav1.LabelSelector{}, + labels: map[string]string{}, + expected: false, + }, + { + name: "seelctor is invalid", + selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "foo": "bar", + }, + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Operator: "bad-operator", + }, + }, + }, + labels: map[string]string{ + "foo": "bar", + }, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + g := NewWithT(t) + + got := hasMatchingLabels(tc.selector, tc.labels) + g.Expect(got).To(Equal(tc.expected)) + }) + } +} diff --git a/controllers/machinehealthcheck_controller.go b/controllers/machinehealthcheck_controller.go index 5f7d846748ae..299609713f51 100644 --- a/controllers/machinehealthcheck_controller.go +++ b/controllers/machinehealthcheck_controller.go @@ -19,6 +19,8 @@ package controllers import ( "context" "fmt" + "sync" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -28,11 +30,15 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -41,7 +47,8 @@ import ( ) const ( - mhcClusterNameIndex = "spec.clusterName" + mhcClusterNameIndex = "spec.clusterName" + machineNodeNameIndex = "status.nodeRef.name" ) // MachineHealthCheckReconciler reconciles a MachineHealthCheck object @@ -49,8 +56,11 @@ type MachineHealthCheckReconciler struct { Client client.Client Log logr.Logger - controller controller.Controller - recorder record.EventRecorder + controller controller.Controller + recorder record.EventRecorder + scheme *runtime.Scheme + clusterNodeInformers map[client.ObjectKey]cache.Informer + clusterNodeInformersLock sync.RWMutex } func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { @@ -60,6 +70,10 @@ func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, option &source.Kind{Type: &clusterv1.Cluster{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.clusterToMachineHealthCheck)}, ). + Watches( + &source.Kind{Type: &clusterv1.Machine{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.machineToMachineHealthCheck)}, + ). WithOptions(options). Build(r) @@ -75,8 +89,19 @@ func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, option return errors.Wrap(err, "error setting index fields") } + // Add index to Machine for listing by Node reference + if err := mgr.GetCache().IndexField(&clusterv1.Machine{}, + machineNodeNameIndex, + r.indexMachineByNodeName, + ); err != nil { + return errors.Wrap(err, "error setting index fields") + } + r.controller = controller r.recorder = mgr.GetEventRecorderFor("machinehealthcheck-controller") + r.scheme = mgr.GetScheme() + r.clusterNodeInformers = make(map[client.ObjectKey]cache.Informer) + r.clusterNodeInformersLock = sync.RWMutex{} return nil } @@ -136,13 +161,14 @@ func (r *MachineHealthCheckReconciler) Reconcile(req ctrl.Request) (_ ctrl.Resul logger.Error(err, "Failed to reconcile MachineHealthCheck") r.recorder.Eventf(m, corev1.EventTypeWarning, "ReconcileError", "%v", err) - //TODO(JoelSpeed): Determine how/when to requeue requests if errors occur within r.reconcile + // Requeue immediately if any errors occurred + return ctrl.Result{}, err } return result, nil } -func (r *MachineHealthCheckReconciler) reconcile(_ context.Context, cluster *clusterv1.Cluster, m *clusterv1.MachineHealthCheck) (ctrl.Result, error) { +func (r *MachineHealthCheckReconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, m *clusterv1.MachineHealthCheck) (ctrl.Result, error) { // Ensure the MachineHealthCheck is owned by the Cluster it belongs to m.OwnerReferences = util.EnsureOwnerRef(m.OwnerReferences, metav1.OwnerReference{ APIVersion: clusterv1.GroupVersion.String(), @@ -151,7 +177,49 @@ func (r *MachineHealthCheckReconciler) reconcile(_ context.Context, cluster *clu UID: cluster.UID, }) - return ctrl.Result{}, fmt.Errorf("controller not yet implemented") + logger := r.Log.WithValues("machinehealthcheck", m.Name, "namespace", m.Namespace) + logger = logger.WithValues("cluster", cluster.Name) + + // Create client for target cluster + clusterClient, err := remote.NewClusterClient(ctx, r.Client, util.ObjectKey(cluster), r.scheme) + if err != nil { + logger.Error(err, "Error building target cluster client") + return ctrl.Result{}, err + } + + if err := r.watchClusterNodes(ctx, r.Client, cluster); err != nil { + logger.Error(err, "Error watching nodes on target cluster") + return ctrl.Result{}, err + } + + // fetch all targets + logger.V(3).Info("Finding targets") + targets, err := r.getTargetsFromMHC(clusterClient, cluster, m) + if err != nil { + logger.Error(err, "Failed to fetch targets from MachineHealthCheck") + return ctrl.Result{}, err + } + totalTargets := len(targets) + m.Status.ExpectedMachines = int32(totalTargets) + + // health check all targets and reconcile mhc status + currentHealthy, needRemediationTargets, nextCheckTimes := r.healthCheckTargets(targets, logger, m.Spec.NodeStartupTimeout.Duration) + m.Status.CurrentHealthy = int32(currentHealthy) + + // remediate + for _, t := range needRemediationTargets { + logger.V(3).Info("Target meets unhealthy criteria, triggers remediation", "target", t.string()) + // TODO(JoelSpeed): Implement remediation logic + } + + if minNextCheck := minDuration(nextCheckTimes); minNextCheck > 0 { + logger.V(3).Info("Some targets might go unhealthy. Ensuring a requeue happens", "requeueIn", minNextCheck.Truncate(time.Second).String()) + return ctrl.Result{RequeueAfter: minNextCheck}, nil + } + + logger.V(3).Info("No more targets meet unhealthy criteria") + + return ctrl.Result{}, nil } func (r *MachineHealthCheckReconciler) indexMachineHealthCheckByClusterName(object runtime.Object) []string { @@ -192,3 +260,134 @@ func (r *MachineHealthCheckReconciler) clusterToMachineHealthCheck(o handler.Map } return requests } + +// machineToMachineHealthCheck maps events from Machine objects to +// MachineHealthCheck objects that monitor the given machine +func (r *MachineHealthCheckReconciler) machineToMachineHealthCheck(o handler.MapObject) []reconcile.Request { + m, ok := o.Object.(*clusterv1.Machine) + if !ok { + r.Log.Error(errors.New("incorrect type"), "expected a Machine", "type", fmt.Sprintf("%T", o)) + return nil + } + + mhcList := &clusterv1.MachineHealthCheckList{} + if err := r.Client.List( + context.Background(), + mhcList, + &client.ListOptions{Namespace: m.Namespace}, + client.MatchingFields{mhcClusterNameIndex: m.Spec.ClusterName}, + ); err != nil { + r.Log.Error(err, "Unable to list MachineHealthChecks", "machine", m.Name, "namespace", m.Namespace) + return nil + } + + var requests []reconcile.Request + for k := range mhcList.Items { + mhc := &mhcList.Items[k] + if hasMatchingLabels(mhc.Spec.Selector, m.Labels) { + key := util.ObjectKey(mhc) + requests = append(requests, reconcile.Request{NamespacedName: key}) + } + } + return requests +} + +func (r *MachineHealthCheckReconciler) nodeToMachineHealthCheck(o handler.MapObject) []reconcile.Request { + node, ok := o.Object.(*corev1.Node) + if !ok { + r.Log.Error(errors.New("incorrect type"), "expected a Node", "type", fmt.Sprintf("%T", o)) + return nil + } + + machine, err := r.getMachineFromNode(node.Name) + if machine == nil || err != nil { + r.Log.Error(err, "Unable to retrieve machine from node", "node", node.GetName()) + return nil + } + + return r.machineToMachineHealthCheck(handler.MapObject{Object: machine}) +} + +func (r *MachineHealthCheckReconciler) getMachineFromNode(nodeName string) (*clusterv1.Machine, error) { + machineList := &clusterv1.MachineList{} + if err := r.Client.List( + context.TODO(), + machineList, + client.MatchingFields{machineNodeNameIndex: nodeName}, + ); err != nil { + return nil, errors.Wrap(err, "failed getting machine list") + } + if len(machineList.Items) != 1 { + return nil, errors.Errorf("expecting one machine for node %v, got: %v", nodeName, machineList.Items) + } + return &machineList.Items[0], nil +} + +func (r *MachineHealthCheckReconciler) watchClusterNodes(ctx context.Context, c client.Client, cluster *clusterv1.Cluster) error { + key := util.ObjectKey(cluster) + if _, ok := r.getClusterNodeInformer(key); ok { + // watch was already set up for this cluster + return nil + } + + return r.createClusterNodeInformer(ctx, c, key) +} + +func (r *MachineHealthCheckReconciler) getClusterNodeInformer(key client.ObjectKey) (cache.Informer, bool) { + r.clusterNodeInformersLock.RLock() + defer r.clusterNodeInformersLock.RUnlock() + + informer, ok := r.clusterNodeInformers[key] + return informer, ok +} + +func (r *MachineHealthCheckReconciler) createClusterNodeInformer(ctx context.Context, c client.Client, key client.ObjectKey) error { + r.clusterNodeInformersLock.Lock() + defer r.clusterNodeInformersLock.Unlock() + + // Double check the key still doesn't exist under write lock + if _, ok := r.clusterNodeInformers[key]; ok { + // An informer was created while waiting for the lock + return nil + } + + config, err := remote.RESTConfig(ctx, c, key) + if err != nil { + return errors.Wrap(err, "error fetching remote cluster config") + } + + k8sClient, err := kubernetes.NewForConfig(config) + if err != nil { + return errors.Wrap(err, "error constructing remote cluster client") + } + + // TODO(JoelSpeed): See if we use the resync period from the manager instead of 0 + factory := informers.NewSharedInformerFactory(k8sClient, 0) + nodeInformer := factory.Core().V1().Nodes().Informer() + go nodeInformer.Run(ctx.Done()) + + err = r.controller.Watch( + &source.Informer{Informer: nodeInformer}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.nodeToMachineHealthCheck)}, + ) + if err != nil { + return errors.Wrap(err, "error watching nodes on target cluster") + } + + r.clusterNodeInformers[key] = nodeInformer + return nil +} + +func (r *MachineHealthCheckReconciler) indexMachineByNodeName(object runtime.Object) []string { + machine, ok := object.(*clusterv1.Machine) + if !ok { + r.Log.Error(errors.New("incorrect type"), "expected a Machine", "type", fmt.Sprintf("%T", object)) + return nil + } + + if machine.Status.NodeRef != nil { + return []string{machine.Status.NodeRef.Name} + } + + return nil +} diff --git a/controllers/machinehealthcheck_controller_test.go b/controllers/machinehealthcheck_controller_test.go index 58ff06d0210d..91551a6998bc 100644 --- a/controllers/machinehealthcheck_controller_test.go +++ b/controllers/machinehealthcheck_controller_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/controllers/external" + "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -42,6 +43,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) +const defaultNamespaceName = "default" + var _ = Describe("MachineHealthCheck Reconciler", func() { var namespace *corev1.Namespace var testCluster *clusterv1.Cluster @@ -112,7 +115,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { Eventually(func() map[string]string { mhc := &clusterv1.MachineHealthCheck{} - err := k8sClient.Get(ctx, client.ObjectKey{Namespace: mhcToCreate.GetNamespace(), Name: mhcToCreate.GetName()}, mhc) + err := k8sClient.Get(ctx, util.ObjectKey(mhcToCreate), mhc) if err != nil { return nil } @@ -163,7 +166,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { Eventually(func() []metav1.OwnerReference { mhc := &clusterv1.MachineHealthCheck{} - err := k8sClient.Get(ctx, client.ObjectKey{Namespace: mhcToCreate.GetNamespace(), Name: mhcToCreate.GetName()}, mhc) + err := k8sClient.Get(ctx, util.ObjectKey(mhcToCreate), mhc) if err != nil { return []metav1.OwnerReference{} } @@ -205,7 +208,7 @@ func cleanupTestMachineHealthChecks(ctx context.Context, c client.Client) error func ownerReferenceForCluster(ctx context.Context, c *clusterv1.Cluster) metav1.OwnerReference { // Fetch the cluster to populate the UID cc := &clusterv1.Cluster{} - Expect(k8sClient.Get(ctx, client.ObjectKey{Namespace: c.GetNamespace(), Name: c.GetName()}, cc)).To(Succeed()) + Expect(k8sClient.Get(ctx, util.ObjectKey(c), cc)).To(Succeed()) return metav1.OwnerReference{ APIVersion: clusterv1.GroupVersion.String(), @@ -262,7 +265,7 @@ func TestClusterToMachineHealthCheck(t *testing.T) { // END: setup test environment - namespace := "default" + namespace := defaultNamespaceName clusterName := "test-cluster" labels := make(map[string]string) @@ -339,11 +342,8 @@ func TestClusterToMachineHealthCheck(t *testing.T) { gs.Expect(r.Client.Delete(ctx, &o)).To(Succeed()) }() // Check the cache is populated - key, err := client.ObjectKeyFromObject(&o) - gs.Expect(err).ToNot(HaveOccurred()) - getObj := func() error { - return r.Client.Get(ctx, key, &clusterv1.MachineHealthCheck{}) + return r.Client.Get(ctx, util.ObjectKey(&o), &clusterv1.MachineHealthCheck{}) } gs.Eventually(getObj, timeout).Should(Succeed()) } @@ -416,3 +416,364 @@ func newTestMachineHealthCheck(name, namespace, cluster string, labels map[strin }, } } + +func TestMachineToMachineHealthCheck(t *testing.T) { + // This test sets up a proper test env to allow testing of the cache index + // that is used as part of the clusterToMachineHealthCheck map function + + // BEGIN: Set up test environment + g := NewWithT(t) + + testEnv = &envtest.Environment{ + CRDs: []runtime.Object{ + external.TestGenericBootstrapCRD, + external.TestGenericBootstrapTemplateCRD, + external.TestGenericInfrastructureCRD, + external.TestGenericInfrastructureTemplateCRD, + }, + CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")}, + } + + var err error + cfg, err := testEnv.Start() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cfg).ToNot(BeNil()) + defer func() { + g.Expect(testEnv.Stop()).To(Succeed()) + }() + + g.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed()) + + mgr, err = manager.New(cfg, manager.Options{ + Scheme: scheme.Scheme, + MetricsBindAddress: "0", + }) + g.Expect(err).ToNot(HaveOccurred()) + + r := &MachineHealthCheckReconciler{ + Log: log.Log, + Client: mgr.GetClient(), + } + g.Expect(r.SetupWithManager(mgr, controller.Options{})).To(Succeed()) + + doneMgr := make(chan struct{}) + go func() { + g.Expect(mgr.Start(doneMgr)).To(Succeed()) + }() + defer close(doneMgr) + + // END: setup test environment + + namespace := defaultNamespaceName + clusterName := "test-cluster" + nodeName := "node1" + labels := map[string]string{"cluster": "foo", "nodepool": "bar"} + + mhc1 := newTestMachineHealthCheck("mhc1", namespace, clusterName, labels) + mhc1Req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mhc1.Namespace, Name: mhc1.Name}} + mhc2 := newTestMachineHealthCheck("mhc2", namespace, clusterName, labels) + mhc2Req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mhc2.Namespace, Name: mhc2.Name}} + mhc3 := newTestMachineHealthCheck("mhc3", namespace, clusterName, map[string]string{"cluster": "foo", "nodepool": "other"}) + mhc4 := newTestMachineHealthCheck("mhc4", "othernamespace", clusterName, labels) + machine1 := newTestMachine("machine1", namespace, clusterName, nodeName, labels) + + testCases := []struct { + name string + toCreate []clusterv1.MachineHealthCheck + object handler.MapObject + expected []reconcile.Request + }{ + { + name: "when the object passed isn't a machine", + toCreate: []clusterv1.MachineHealthCheck{*mhc1}, + object: handler.MapObject{ + Object: &clusterv1.Cluster{}, + }, + expected: []reconcile.Request{}, + }, + { + name: "when a MachineHealthCheck matches labels for the Machine in the same namespace", + toCreate: []clusterv1.MachineHealthCheck{*mhc1}, + object: handler.MapObject{ + Object: machine1, + }, + expected: []reconcile.Request{mhc1Req}, + }, + { + name: "when 2 MachineHealthChecks match labels for the Machine in the same namespace", + toCreate: []clusterv1.MachineHealthCheck{*mhc1, *mhc2}, + object: handler.MapObject{ + Object: machine1, + }, + expected: []reconcile.Request{mhc1Req, mhc2Req}, + }, + { + name: "when a MachineHealthCheck does not match labels for the Machine in the same namespace", + toCreate: []clusterv1.MachineHealthCheck{*mhc3}, + object: handler.MapObject{ + Object: machine1, + }, + expected: []reconcile.Request{}, + }, + { + name: "when a MachineHealthCheck matches labels for the Machine in another namespace", + toCreate: []clusterv1.MachineHealthCheck{*mhc4}, + object: handler.MapObject{ + Object: machine1, + }, + expected: []reconcile.Request{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gs := NewWithT(t) + + ctx := context.Background() + for _, obj := range tc.toCreate { + o := obj + gs.Expect(r.Client.Create(ctx, &o)).To(Succeed()) + defer func() { + gs.Expect(r.Client.Delete(ctx, &o)).To(Succeed()) + }() + // Check the cache is populated + getObj := func() error { + return r.Client.Get(ctx, util.ObjectKey(&o), &clusterv1.MachineHealthCheck{}) + } + gs.Eventually(getObj, timeout).Should(Succeed()) + } + + got := r.machineToMachineHealthCheck(tc.object) + gs.Expect(got).To(ConsistOf(tc.expected)) + }) + } +} + +func TestNodeToMachineHealthCheck(t *testing.T) { + // This test sets up a proper test env to allow testing of the cache index + // that is used as part of the clusterToMachineHealthCheck map function + + // BEGIN: Set up test environment + g := NewWithT(t) + + testEnv = &envtest.Environment{ + CRDs: []runtime.Object{ + external.TestGenericBootstrapCRD, + external.TestGenericBootstrapTemplateCRD, + external.TestGenericInfrastructureCRD, + external.TestGenericInfrastructureTemplateCRD, + }, + CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")}, + } + + var err error + cfg, err := testEnv.Start() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cfg).ToNot(BeNil()) + defer func() { + g.Expect(testEnv.Stop()).To(Succeed()) + }() + + g.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed()) + + mgr, err = manager.New(cfg, manager.Options{ + Scheme: scheme.Scheme, + MetricsBindAddress: "0", + }) + g.Expect(err).ToNot(HaveOccurred()) + + r := &MachineHealthCheckReconciler{ + Log: log.Log, + Client: mgr.GetClient(), + } + g.Expect(r.SetupWithManager(mgr, controller.Options{})).To(Succeed()) + + doneMgr := make(chan struct{}) + go func() { + g.Expect(mgr.Start(doneMgr)).To(Succeed()) + }() + defer close(doneMgr) + + // END: setup test environment + + namespace := defaultNamespaceName + clusterName := "test-cluster" + nodeName := "node1" + labels := map[string]string{"cluster": "foo", "nodepool": "bar"} + + mhc1 := newTestMachineHealthCheck("mhc1", namespace, clusterName, labels) + mhc1Req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mhc1.Namespace, Name: mhc1.Name}} + mhc2 := newTestMachineHealthCheck("mhc2", namespace, clusterName, labels) + mhc2Req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mhc2.Namespace, Name: mhc2.Name}} + mhc3 := newTestMachineHealthCheck("mhc3", namespace, "othercluster", labels) + mhc4 := newTestMachineHealthCheck("mhc4", "othernamespace", clusterName, labels) + + machine1 := newTestMachine("machine1", namespace, clusterName, nodeName, labels) + machine2 := newTestMachine("machine2", namespace, clusterName, nodeName, labels) + + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + } + + testCases := []struct { + name string + mhcToCreate []clusterv1.MachineHealthCheck + mToCreate []clusterv1.Machine + object handler.MapObject + expected []reconcile.Request + }{ + { + name: "when the object passed isn't a Node", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc1}, + mToCreate: []clusterv1.Machine{*machine1}, + object: handler.MapObject{ + Object: &clusterv1.Machine{}, + }, + expected: []reconcile.Request{}, + }, + { + name: "when no Machine exists for the Node", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc1}, + mToCreate: []clusterv1.Machine{}, + object: handler.MapObject{ + Object: node1, + }, + expected: []reconcile.Request{}, + }, + { + name: "when two Machines exist for the Node", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc1}, + mToCreate: []clusterv1.Machine{*machine1, *machine2}, + object: handler.MapObject{ + Object: node1, + }, + expected: []reconcile.Request{}, + }, + { + name: "when no MachineHealthCheck exists for the Node in the Machine's namespace", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc4}, + mToCreate: []clusterv1.Machine{*machine1}, + object: handler.MapObject{ + Object: node1, + }, + expected: []reconcile.Request{}, + }, + { + name: "when a MachineHealthCheck exists for the Node in the Machine's namespace", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc1}, + mToCreate: []clusterv1.Machine{*machine1}, + object: handler.MapObject{ + Object: node1, + }, + expected: []reconcile.Request{mhc1Req}, + }, + { + name: "when two MachineHealthChecks exist for the Node in the Machine's namespace", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc1, *mhc2}, + mToCreate: []clusterv1.Machine{*machine1}, + object: handler.MapObject{ + Object: node1, + }, + expected: []reconcile.Request{mhc1Req, mhc2Req}, + }, + { + name: "when a MachineHealthCheck exists for the Node, but not in the Machine's namespace", + mhcToCreate: []clusterv1.MachineHealthCheck{*mhc3}, + mToCreate: []clusterv1.Machine{*machine1}, + object: handler.MapObject{ + Object: node1, + }, + expected: []reconcile.Request{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gs := NewWithT(t) + + ctx := context.Background() + for _, obj := range tc.mhcToCreate { + o := obj + gs.Expect(r.Client.Create(ctx, &o)).To(Succeed()) + defer func() { + gs.Expect(r.Client.Delete(ctx, &o)).To(Succeed()) + }() + // Check the cache is populated + key := util.ObjectKey(&o) + getObj := func() error { + return r.Client.Get(ctx, key, &clusterv1.MachineHealthCheck{}) + } + gs.Eventually(getObj, timeout).Should(Succeed()) + } + for _, obj := range tc.mToCreate { + o := obj + gs.Expect(r.Client.Create(ctx, &o)).To(Succeed()) + defer func() { + gs.Expect(r.Client.Delete(ctx, &o)).To(Succeed()) + }() + // Ensure the status is set (required for matching node to machine) + o.Status = obj.Status + gs.Expect(r.Client.Status().Update(ctx, &o)).To(Succeed()) + + // Check the cache is up to date with the status update + key := util.ObjectKey(&o) + checkStatus := func() clusterv1.MachineStatus { + m := &clusterv1.Machine{} + err := r.Client.Get(ctx, key, m) + if err != nil { + return clusterv1.MachineStatus{} + } + return m.Status + } + gs.Eventually(checkStatus, timeout).Should(Equal(o.Status)) + } + + got := r.nodeToMachineHealthCheck(tc.object) + gs.Expect(got).To(ConsistOf(tc.expected)) + }) + } +} + +func TestIndexMachineByNodeName(t *testing.T) { + r := &MachineHealthCheckReconciler{ + Log: log.Log, + } + + testCases := []struct { + name string + object runtime.Object + expected []string + }{ + { + name: "when the machine has no NodeRef", + object: &clusterv1.Machine{}, + expected: []string{}, + }, + { + name: "when the machine has valid a NodeRef", + object: &clusterv1.Machine{ + Status: clusterv1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: "node1", + }, + }, + }, + expected: []string{"node1"}, + }, + { + name: "when the object passed is not a Machine", + object: &corev1.Node{}, + expected: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + g := NewWithT(t) + got := r.indexMachineByNodeName(tc.object) + g.Expect(got).To(ConsistOf(tc.expected)) + }) + } +} diff --git a/controllers/machinehealthcheck_targets.go b/controllers/machinehealthcheck_targets.go new file mode 100644 index 000000000000..b474ec8c4a01 --- /dev/null +++ b/controllers/machinehealthcheck_targets.go @@ -0,0 +1,258 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // EventDetectedUnhealthy is emitted in case a node associated with a + // machine was detected unhealthy + EventDetectedUnhealthy string = "DetectedUnhealthy" +) + +// healthCheckTarget contains the information required to perform a health check +// on the node to determine if any remediation is required. +type healthCheckTarget struct { + Machine *clusterv1.Machine + Node *corev1.Node + MHC *clusterv1.MachineHealthCheck + nodeMissing bool +} + +func (t *healthCheckTarget) string() string { + return fmt.Sprintf("%s/%s/%s/%s", + t.MHC.GetNamespace(), + t.MHC.GetName(), + t.Machine.GetName(), + t.nodeName(), + ) +} + +// Get the node name if the target has a node +func (t *healthCheckTarget) nodeName() string { + if t.Node != nil { + return t.Node.GetName() + } + return "" +} + +// Determine whether or not a given target needs remediation. +// The node will need remediation if any of the following are true: +// - The Machine has failed for some reason +// - The Machine did not get a node before `timeoutForMachineToHaveNode` elapses +// - The Node has gone away +// - Any condition on the node is matched for the given timeout +// If the target doesn't currently need rememdiation, provide a duration after +// which the target should next be checked. +// The target should be requeued after this duration. +func (t *healthCheckTarget) needsRemediation(logger logr.Logger, timeoutForMachineToHaveNode time.Duration) (bool, time.Duration) { + var nextCheckTimes []time.Duration + now := time.Now() + + // machine has failed + if t.Machine.Status.FailureReason != nil { + logger.V(3).Info("Target is unhealthy", "reason", t.Machine.Status.FailureReason) + return true, time.Duration(0) + } + + // the node does not exist + if t.nodeMissing { + return true, time.Duration(0) + } + + // the node has not been set yet + if t.Node == nil { + // status not updated yet + if t.Machine.Status.LastUpdated == nil { + return false, timeoutForMachineToHaveNode + } + if t.Machine.Status.LastUpdated.Add(timeoutForMachineToHaveNode).Before(now) { + logger.V(3).Info("Target is unhealthy: machine has no node", "duration", timeoutForMachineToHaveNode.String()) + return true, time.Duration(0) + } + durationUnhealthy := now.Sub(t.Machine.Status.LastUpdated.Time) + nextCheck := timeoutForMachineToHaveNode - durationUnhealthy + time.Second + return false, nextCheck + } + + // check conditions + for _, c := range t.MHC.Spec.UnhealthyConditions { + nodeCondition := getNodeCondition(t.Node, c.Type) + + // Skip when current node condition is different from the one reported + // in the MachineHealthCheck. + if nodeCondition == nil || nodeCondition.Status != c.Status { + continue + } + + // If the condition has been in the unhealthy state for longer than the + // timeout, return true with no requeue time. + if nodeCondition.LastTransitionTime.Add(c.Timeout.Duration).Before(now) { + logger.V(3).Info("Target is unhealthy: condition is in state longer than allowed timeout", "condition", c.Type, "state", c.Status, "timeout", c.Timeout.Duration.String()) + return true, time.Duration(0) + } + + durationUnhealthy := now.Sub(nodeCondition.LastTransitionTime.Time) + nextCheck := c.Timeout.Duration - durationUnhealthy + time.Second + if nextCheck > 0 { + nextCheckTimes = append(nextCheckTimes, nextCheck) + } + } + return false, minDuration(nextCheckTimes) +} + +// getTargetsFromMHC uses the MachineHealthCheck's selector to fetch machines +// and their nodes targeted by the health check, ready for health checking. +func (r *MachineHealthCheckReconciler) getTargetsFromMHC(clusterClient client.Client, cluster *clusterv1.Cluster, mhc *clusterv1.MachineHealthCheck) ([]healthCheckTarget, error) { + machines, err := r.getMachinesFromMHC(mhc) + if err != nil { + return nil, errors.Wrap(err, "error getting machines from MachineHealthCheck") + } + if len(machines) == 0 { + return nil, nil + } + + targets := []healthCheckTarget{} + for k := range machines { + target := healthCheckTarget{ + MHC: mhc, + Machine: &machines[k], + } + node, err := r.getNodeFromMachine(clusterClient, target.Machine) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, errors.Wrap(err, "error getting node") + } + + // A node has been seen for this machine, but it no longer exists + target.nodeMissing = true + } + target.Node = node + targets = append(targets, target) + } + return targets, nil +} + +//getMachinesFromMHC fetches Machines matched by the MachineHealthCheck's +// label selector +func (r *MachineHealthCheckReconciler) getMachinesFromMHC(mhc *clusterv1.MachineHealthCheck) ([]clusterv1.Machine, error) { + selector, err := metav1.LabelSelectorAsSelector(&mhc.Spec.Selector) + if err != nil { + return nil, errors.Wrap(err, "failed to build selector") + } + + options := client.ListOptions{ + LabelSelector: selector, + Namespace: mhc.GetNamespace(), + } + machineList := &clusterv1.MachineList{} + if err := r.Client.List(context.Background(), machineList, &options); err != nil { + return nil, errors.Wrap(err, "failed to list machines") + } + return machineList.Items, nil +} + +// getNodeFromMachine fetches the node from a local or remote cluster for a +// given machine. +func (r *MachineHealthCheckReconciler) getNodeFromMachine(clusterClient client.Client, machine *clusterv1.Machine) (*corev1.Node, error) { + if machine.Status.NodeRef == nil { + return nil, nil + } + + node := &corev1.Node{} + nodeKey := types.NamespacedName{ + Name: machine.Status.NodeRef.Name, + } + err := clusterClient.Get(context.TODO(), nodeKey, node) + return node, err +} + +// healthCheckTargets health checks a slice of targets +// and gives a data to measure the average health +func (r *MachineHealthCheckReconciler) healthCheckTargets(targets []healthCheckTarget, logger logr.Logger, timeoutForMachineToHaveNode time.Duration) (int, []healthCheckTarget, []time.Duration) { + var nextCheckTimes []time.Duration + var needRemediationTargets []healthCheckTarget + var currentHealthy int + + for _, t := range targets { + logger = logger.WithValues("Target", t.string()) + logger.V(3).Info("Health checking target") + needsRemediation, nextCheck := t.needsRemediation(logger, timeoutForMachineToHaveNode) + + if needsRemediation { + needRemediationTargets = append(needRemediationTargets, t) + continue + } + + if nextCheck > 0 { + logger.V(3).Info("Target is likely to go unhealthy", "timeUntilUnhealthy", nextCheck.Truncate(time.Second).String()) + r.recorder.Eventf( + t.Machine, + corev1.EventTypeNormal, + EventDetectedUnhealthy, + "Machine %v has unhealthy node %v", + t.string(), + t.nodeName(), + ) + nextCheckTimes = append(nextCheckTimes, nextCheck) + continue + } + + if t.Machine.DeletionTimestamp.IsZero() { + currentHealthy++ + } + } + return currentHealthy, needRemediationTargets, nextCheckTimes +} + +// getNodeCondition returns node condition by type +func getNodeCondition(node *corev1.Node, conditionType corev1.NodeConditionType) *corev1.NodeCondition { + for _, cond := range node.Status.Conditions { + if cond.Type == conditionType { + return &cond + } + } + return nil +} + +func minDuration(durations []time.Duration) time.Duration { + if len(durations) == 0 { + return time.Duration(0) + } + + minDuration := durations[0] + // Ignore first element as that is already minDuration + for _, nc := range durations[1:] { + if nc < minDuration { + minDuration = nc + } + } + return minDuration +} diff --git a/controllers/machinehealthcheck_targets_test.go b/controllers/machinehealthcheck_targets_test.go new file mode 100644 index 000000000000..cef69134e1dd --- /dev/null +++ b/controllers/machinehealthcheck_targets_test.go @@ -0,0 +1,385 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +func TestGetTargetsFromMHC(t *testing.T) { + namespace := "test-mhc" + clusterName := "test-cluster" + mhcSelector := map[string]string{"cluster": clusterName, "machine-group": "foo"} + + // Create a namespace and cluster for the tests + testNS := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "mhc-test"}} + testCluster := &clusterv1.Cluster{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: clusterName}} + + // Create a test MHC + testMHC := &clusterv1.MachineHealthCheck{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-mhc", + Namespace: namespace, + }, + Spec: clusterv1.MachineHealthCheckSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: mhcSelector, + }, + ClusterName: testCluster.ObjectMeta.Name, + UnhealthyConditions: []clusterv1.UnhealthyCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionUnknown, + Timeout: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + }, + } + + baseObjects := []runtime.Object{testNS, testCluster, testMHC} + + // Initialise some test machines and nodes for use in the test cases + + testNode1 := newTestNode("node1") + testMachine1 := newTestMachine("machine1", namespace, clusterName, testNode1.Name, mhcSelector) + testNode2 := newTestNode("node2") + testMachine2 := newTestMachine("machine2", namespace, clusterName, testNode2.Name, map[string]string{"cluster": clusterName}) + testNode3 := newTestNode("node3") + testMachine3 := newTestMachine("machine3", namespace, clusterName, testNode3.Name, mhcSelector) + testNode4 := newTestNode("node4") + testMachine4 := newTestMachine("machine4", namespace, clusterName, testNode4.Name, mhcSelector) + + testCases := []struct { + desc string + toCreate []runtime.Object + expectedTargets []healthCheckTarget + }{ + { + desc: "with no matching machines", + toCreate: baseObjects, + expectedTargets: nil, + }, + { + desc: "when a machine's node is missing", + toCreate: append(baseObjects, testMachine1), + expectedTargets: []healthCheckTarget{ + { + Machine: testMachine1, + MHC: testMHC, + Node: &corev1.Node{}, + nodeMissing: true, + }, + }, + }, + { + desc: "when a machine's labels do not match", + toCreate: append(baseObjects, testMachine1, testMachine2, testNode1), + expectedTargets: []healthCheckTarget{ + { + Machine: testMachine1, + MHC: testMHC, + Node: testNode1, + }, + }, + }, + { + desc: "with multiple machines, should match correct nodes", + toCreate: append(baseObjects, testNode1, testMachine1, testNode3, testMachine3, testNode4, testMachine4), + expectedTargets: []healthCheckTarget{ + { + Machine: testMachine1, + MHC: testMHC, + Node: testNode1, + }, + { + Machine: testMachine3, + MHC: testMHC, + Node: testNode3, + }, + { + Machine: testMachine4, + MHC: testMHC, + Node: testNode4, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gs := NewGomegaWithT(t) + + gs.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed()) + k8sClient := fake.NewFakeClientWithScheme(scheme.Scheme, tc.toCreate...) + + // Create a test reconciler + reconciler := &MachineHealthCheckReconciler{ + Client: k8sClient, + Log: log.Log, + scheme: scheme.Scheme, + } + + targets, err := reconciler.getTargetsFromMHC(k8sClient, testCluster, testMHC) + gs.Expect(err).ToNot(HaveOccurred()) + + gs.Expect(targets).To(ConsistOf(tc.expectedTargets)) + }) + } +} + +func TestHealthCheckTargets(t *testing.T) { + namespace := "test-mhc" + clusterName := "test-cluster" + mhcSelector := map[string]string{"cluster": clusterName, "machine-group": "foo"} + + // Create a test MHC + testMHC := &clusterv1.MachineHealthCheck{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-mhc", + Namespace: namespace, + }, + Spec: clusterv1.MachineHealthCheckSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: mhcSelector, + }, + ClusterName: clusterName, + UnhealthyConditions: []clusterv1.UnhealthyCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionUnknown, + Timeout: metav1.Duration{Duration: 5 * time.Minute}, + }, + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + Timeout: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + }, + } + + testMachine := newTestMachine("machine1", namespace, clusterName, "node1", mhcSelector) + + // Target for when the node has not yet been seen by the Machine controller + testMachineLastUpdated400s := testMachine.DeepCopy() + nowMinus400s := metav1.NewTime(time.Now().Add(-400 * time.Second)) + testMachineLastUpdated400s.Status.LastUpdated = &nowMinus400s + + nodeNotYetStartedTarget := healthCheckTarget{ + MHC: testMHC, + Machine: testMachineLastUpdated400s, + Node: nil, + } + + // Target for when the Node has been seen, but has now gone + nodeGoneAway := healthCheckTarget{ + MHC: testMHC, + Machine: testMachine, + Node: &corev1.Node{}, + nodeMissing: true, + } + + // Target for when the node has been in an unknown state for shorter than the timeout + testNodeUnknown200 := newTestUnhealthyNode("node1", corev1.NodeReady, corev1.ConditionUnknown, 200*time.Second) + nodeUnknown200 := healthCheckTarget{ + MHC: testMHC, + Machine: testMachine, + Node: testNodeUnknown200, + nodeMissing: false, + } + + // Second Target for when the node has been in an unknown state for shorter than the timeout + testNodeUnknown100 := newTestUnhealthyNode("node1", corev1.NodeReady, corev1.ConditionUnknown, 100*time.Second) + nodeUnknown100 := healthCheckTarget{ + MHC: testMHC, + Machine: testMachine, + Node: testNodeUnknown100, + nodeMissing: false, + } + + // Target for when the node has been in an unknown state for longer than the timeout + testNodeUnknown400 := newTestUnhealthyNode("node1", corev1.NodeReady, corev1.ConditionUnknown, 400*time.Second) + nodeUnknown400 := healthCheckTarget{ + MHC: testMHC, + Machine: testMachine, + Node: testNodeUnknown400, + nodeMissing: false, + } + + // Target for when a node is healthy + testNodeHealthy := newTestNode("node1") + testNodeHealthy.UID = "12345" + nodeHealthy := healthCheckTarget{ + MHC: testMHC, + Machine: testMachine, + Node: testNodeHealthy, + nodeMissing: false, + } + + testCases := []struct { + desc string + targets []healthCheckTarget + expectedHealthy int + expectedNeedsRemediation []healthCheckTarget + expectedNextCheckTimes []time.Duration + }{ + { + desc: "when the node has not yet started", + targets: []healthCheckTarget{nodeNotYetStartedTarget}, + expectedHealthy: 0, + expectedNeedsRemediation: []healthCheckTarget{}, + expectedNextCheckTimes: []time.Duration{200 * time.Second}, + }, + { + desc: "when the node has gone away", + targets: []healthCheckTarget{nodeGoneAway}, + expectedHealthy: 0, + expectedNeedsRemediation: []healthCheckTarget{nodeGoneAway}, + expectedNextCheckTimes: []time.Duration{}, + }, + { + desc: "when the node has been in an unknown state for shorter than the timeout", + targets: []healthCheckTarget{nodeUnknown200}, + expectedHealthy: 0, + expectedNeedsRemediation: []healthCheckTarget{}, + expectedNextCheckTimes: []time.Duration{100 * time.Second}, + }, + { + desc: "when the node has been in an unknown state for longer than the timeout", + targets: []healthCheckTarget{nodeUnknown400}, + expectedHealthy: 0, + expectedNeedsRemediation: []healthCheckTarget{nodeUnknown400}, + expectedNextCheckTimes: []time.Duration{}, + }, + { + desc: "when the node is healthy", + targets: []healthCheckTarget{nodeHealthy}, + expectedHealthy: 1, + expectedNeedsRemediation: []healthCheckTarget{}, + expectedNextCheckTimes: []time.Duration{}, + }, + { + desc: "with a mix of healthy and unhealthy nodes", + targets: []healthCheckTarget{nodeUnknown100, nodeUnknown200, nodeUnknown400, nodeHealthy}, + expectedHealthy: 1, + expectedNeedsRemediation: []healthCheckTarget{nodeUnknown400}, + expectedNextCheckTimes: []time.Duration{200 * time.Second, 100 * time.Second}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gs := NewGomegaWithT(t) + + gs.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed()) + k8sClient := fake.NewFakeClientWithScheme(scheme.Scheme) + + // Create a test reconciler + reconciler := &MachineHealthCheckReconciler{ + Client: k8sClient, + Log: log.Log, + scheme: scheme.Scheme, + recorder: record.NewFakeRecorder(5), + } + + timeoutForMachineToHaveNode := 10 * time.Minute + currentHealthy, needRemediationTargets, nextCheckTimes := reconciler.healthCheckTargets(tc.targets, reconciler.Log, timeoutForMachineToHaveNode) + + // Round durations down to nearest second account for minute differences + // in timing when running tests + roundDurations := func(in []time.Duration) []time.Duration { + out := []time.Duration{} + for _, d := range in { + out = append(out, d.Truncate(time.Second)) + } + return out + } + + gs.Expect(currentHealthy).To(Equal(tc.expectedHealthy)) + gs.Expect(needRemediationTargets).To(ConsistOf(tc.expectedNeedsRemediation)) + gs.Expect(nextCheckTimes).To(WithTransform(roundDurations, ConsistOf(tc.expectedNextCheckTimes))) + }) + } +} + +func newTestMachine(name, namespace, clusterName, nodeName string, labels map[string]string) *clusterv1.Machine { + bootstrap := "bootstrap" + return &clusterv1.Machine{ + TypeMeta: metav1.TypeMeta{ + APIVersion: clusterv1.GroupVersion.String(), + Kind: "Machine", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: clusterv1.MachineSpec{ + ClusterName: clusterName, + Bootstrap: clusterv1.Bootstrap{ + Data: &bootstrap, + }, + }, + Status: clusterv1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: nodeName, + Namespace: namespace, + }, + }, + } +} + +func newTestNode(name string) *corev1.Node { + return &corev1.Node{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Node", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } +} + +func newTestUnhealthyNode(name string, condition corev1.NodeConditionType, status corev1.ConditionStatus, unhealthyDuration time.Duration) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "12345", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: condition, + Status: status, + LastTransitionTime: metav1.NewTime(time.Now().Add(-unhealthyDuration)), + }, + }, + }, + } +}