From 7c978464761913531809ce5c4b62b1bf78e4d580 Mon Sep 17 00:00:00 2001 From: Yang Ding Date: Tue, 8 Mar 2022 14:15:40 -0800 Subject: [PATCH] Address more comments and add logic in stale controller Signed-off-by: Yang Ding --- docs/multicluster/architecture.md | 6 +- .../acnp_resourceimport_controller.go | 89 ++++++++++-------- .../multicluster/stale_controller.go | 62 ++++++++++--- .../multicluster/stale_controller_test.go | 92 +++++++++++++++++++ .../controllers/multicluster/test_data.go | 2 + 5 files changed, 195 insertions(+), 56 deletions(-) diff --git a/docs/multicluster/architecture.md b/docs/multicluster/architecture.md index ed4726ebe75..5e61f24cd5d 100644 --- a/docs/multicluster/architecture.md +++ b/docs/multicluster/architecture.md @@ -102,7 +102,7 @@ To achieve such ACNP copy-span, admins can, in the acting leader cluster of a Mu create a ResourceExport of kind `AntreaClusterNetworkPolicy` which contains the ClusterNetworkPolicy spec they wish to be replicated. The ResourceExport should be created in the Namespace which implements the Common Area of the ClusterSet. In future releases, some additional tooling may become available to -automate the creation of such ResourceExport and make ACNP replication across cluster eaiser. +automate the creation of such ResourceExport and make ACNP replication across clusters eaiser. ```yaml apiVersion: multicluster.crd.antrea.io/v1alpha1 @@ -135,7 +135,7 @@ The above sample spec will create an ACNP in each member cluster which implement isolation for that cluster. Note that because the Tier that an ACNP refers to must exist before the ACNP is applied, an importing -cluster may fail to create the ACNP to be replicated, if the tier in the ResourceExport spec cannot be +cluster may fail to create the ACNP to be replicated, if the Tier in the ResourceExport spec cannot be found in that particular cluster. The ACNP creation status of each member cluster will be reported back to the Common Area as K8s Events, and can be checked by describing the ResourceImport of the original ResourceExport: @@ -172,6 +172,6 @@ Spec: Events: Type Reason Age From Message ---- ------ ---- ---- ------- - Normal ACNPImportSucceeded 2m11s resourceimport-controller ACNP successfully created in the importing cluster test-cluster-east + Normal ACNPImportSucceeded 2m11s resourceimport-controller ACNP successfully synced in the importing cluster test-cluster-east Warning ACNPImportFailed 2m11s resourceimport-controller ACNP Tier does not exist in the importing cluster test-cluster-west ``` diff --git a/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller.go b/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller.go index d8d09c7693a..1c1af459350 100644 --- a/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller.go +++ b/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller.go @@ -64,9 +64,10 @@ func (r *ResourceImportReconciler) handleResImpUpdateForClusterNetworkPolicy(ctx } if !acnpNotFound { if _, ok := acnp.Annotations[common.AntreaMCACNPAnnotation]; !ok { - err := errors.New("unable to import Antrea ClusterNetworkPolicy which conflicts with existing one") + msg := "unable to import Antrea ClusterNetworkPolicy which conflicts with existing one in cluster " + r.localClusterID + err := errors.New(msg) klog.ErrorS(err, "", "acnp", klog.KObj(acnp)) - return ctrl.Result{}, err + return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp) } } acnpObj := getMCAntreaClusterPolicy(resImp) @@ -78,57 +79,32 @@ func (r *ResourceImportReconciler) handleResImpUpdateForClusterNetworkPolicy(ctx // Create or update the ACNP if necessary. if acnpNotFound { if err = r.localClusterClient.Create(ctx, acnpObj, &client.CreateOptions{}); err != nil { - klog.ErrorS(err, "failed to create imported Antrea ClusterNetworkPolicy", "acnp", klog.KObj(acnpObj)) - return ctrl.Result{}, err + msg := "failed to create imported Antrea ClusterNetworkPolicy in cluster " + r.localClusterID + klog.ErrorS(err, msg, "acnp", klog.KObj(acnpObj)) + return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp) } } else if !apiequality.Semantic.DeepEqual(acnp.Spec, acnpObj.Spec) { acnp.Spec = acnpObj.Spec if err = r.localClusterClient.Update(ctx, acnp, &client.UpdateOptions{}); err != nil { - klog.ErrorS(err, "failed to update imported Antrea ClusterNetworkPolicy", "acnp", klog.KObj(acnpObj)) - return ctrl.Result{}, err + msg := "failed to update imported Antrea ClusterNetworkPolicy in cluster " + r.localClusterID + klog.ErrorS(err, msg, "acnp", klog.KObj(acnpObj)) + return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp) } } } else if tierNotFound && !acnpNotFound { // The ACNP Tier does not exist, and the policy cannot be realized in this particular importing member cluster. // If there is an ACNP previously created via import (which has a valid Tier by then), it should be cleaned up. if err = r.localClusterClient.Delete(ctx, acnpObj, &client.DeleteOptions{}); err != nil { - klog.ErrorS(err, "failed to delete imported Antrea ClusterNetworkPolicy that no longer has a valid Tier for the current cluster", "acnp", klog.KObj(acnpObj)) - return ctrl.Result{}, err + msg := "failed to delete imported Antrea ClusterNetworkPolicy that no longer has a valid Tier for cluster " + r.localClusterID + klog.ErrorS(err, msg, "acnp", klog.KObj(acnpObj)) + return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp) } } - - statusEvent := &corev1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Name: randName(acnpImportStatusPrefix + r.localClusterID + "-"), - Namespace: resImp.Namespace, - }, - InvolvedObject: corev1.ObjectReference{ - APIVersion: resourceImportAPIVersion, - Kind: resourceImportKind, - Name: resImp.Name, - Namespace: resImp.Namespace, - UID: resImp.GetUID(), - }, - FirstTimestamp: metav1.Now(), - LastTimestamp: metav1.Now(), - ReportingController: acnpEventReportingController, - ReportingInstance: acnpEventReportingInstance, - Action: "reconciled", - } if tierNotFound { - statusEvent.Type = corev1.EventTypeWarning - statusEvent.Reason = acnpImportFailed - statusEvent.Message = "ACNP Tier does not exist in the importing cluster " + r.localClusterID - } else { - statusEvent.Type = corev1.EventTypeNormal - statusEvent.Reason = acnpImportSucceeded - statusEvent.Message = "ACNP successfully created in the importing cluster " + r.localClusterID + msg := "ACNP Tier does not exist in the importing cluster " + r.localClusterID + return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp) } - if err = r.remoteCommonArea.Create(ctx, statusEvent, &client.CreateOptions{}); err != nil { - klog.ErrorS(err, "failed to create acnp import event for resourceimport", "resImp", klog.KObj(resImp)) - return ctrl.Result{}, err - } - return ctrl.Result{}, nil + return ctrl.Result{}, r.reportStatusEvent("", ctx, resImp) } func (r *ResourceImportReconciler) handleResImpDeleteForClusterNetworkPolicy(ctx context.Context, resImp *multiclusterv1alpha1.ResourceImport) (ctrl.Result, error) { @@ -170,6 +146,41 @@ func getMCAntreaClusterPolicy(resImp *multiclusterv1alpha1.ResourceImport) *v1al } } +func (r *ResourceImportReconciler) reportStatusEvent(errMsg string, ctx context.Context, resImp *multiclusterv1alpha1.ResourceImport) error { + statusEvent := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(acnpImportStatusPrefix + r.localClusterID + "-"), + Namespace: resImp.Namespace, + }, + InvolvedObject: corev1.ObjectReference{ + APIVersion: resourceImportAPIVersion, + Kind: resourceImportKind, + Name: resImp.Name, + Namespace: resImp.Namespace, + UID: resImp.GetUID(), + }, + FirstTimestamp: metav1.Now(), + LastTimestamp: metav1.Now(), + ReportingController: acnpEventReportingController, + ReportingInstance: acnpEventReportingInstance, + Action: "reconciled", + } + if errMsg != "" { + statusEvent.Type = corev1.EventTypeWarning + statusEvent.Reason = acnpImportFailed + statusEvent.Message = errMsg + } else { + statusEvent.Type = corev1.EventTypeNormal + statusEvent.Reason = acnpImportSucceeded + statusEvent.Message = "ACNP successfully synced in the importing cluster " + r.localClusterID + } + if err := r.remoteCommonArea.Create(ctx, statusEvent, &client.CreateOptions{}); err != nil { + klog.ErrorS(err, "failed to create acnp import event for resourceimport", "resImp", klog.KObj(resImp)) + return err + } + return nil +} + func randSeq(n int) string { b := make([]rune, n) for i := range b { diff --git a/multicluster/controllers/multicluster/stale_controller.go b/multicluster/controllers/multicluster/stale_controller.go index 705593b4cf5..da9f6793e5f 100644 --- a/multicluster/controllers/multicluster/stale_controller.go +++ b/multicluster/controllers/multicluster/stale_controller.go @@ -33,6 +33,7 @@ import ( mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" ) // StaleController will clean up ServiceImport and MC Service if no corresponding ResourceImport @@ -68,17 +69,29 @@ func (c *StaleController) cleanup() error { if *c.remoteCommonAreaManager == nil { return errors.New("ClusterSet has not been initialized properly, no available remote common area") } - remoteCluster, err := getRemoteCommonArea(c.remoteCommonAreaManager) if err != nil { return err } - localClusterID := string((*c.remoteCommonAreaManager).GetLocalClusterID()) if len(localClusterID) == 0 { return errors.New("localClusterID is not initialized, retry later") } + resImpList := &mcsv1alpha1.ResourceImportList{} + if err := remoteCluster.List(ctx, resImpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace()}); err != nil { + return err + } + if err := c.cleanupStaleServiceResources(remoteCluster, localClusterID, resImpList); err != nil { + return err + } + if err := c.cleanupACNPResources(resImpList); err != nil { + return err + } + return nil +} +func (c *StaleController) cleanupStaleServiceResources(remoteCluster commonarea.RemoteCommonArea, + localClusterID string, resImpList *mcsv1alpha1.ResourceImportList) error { svcImpList := &k8smcsv1alpha1.ServiceImportList{} if err := c.List(ctx, svcImpList, &client.ListOptions{}); err != nil { return err @@ -89,11 +102,6 @@ func (c *StaleController) cleanup() error { return err } - resImpList := &mcsv1alpha1.ResourceImportList{} - if err := remoteCluster.List(ctx, resImpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace()}); err != nil { - return err - } - svcImpItems := svcImpList.Items var mcsSvcItems []corev1.Service for _, svc := range svcList.Items { @@ -104,12 +112,11 @@ func (c *StaleController) cleanup() error { for _, resImp := range resImpList.Items { for k, svc := range mcsSvcItems { - if svc.Name == common.AntreaMCSPrefix+resImp.Spec.Name && svc.Namespace == resImp.Spec.Namespace { + if resImp.Spec.Kind == common.ServiceKind && svc.Name == common.AntreaMCSPrefix+resImp.Spec.Name && svc.Namespace == resImp.Spec.Namespace { // Set the valid Service item as empty Service, then all left non-empty items should be removed. mcsSvcItems[k] = corev1.Service{} } } - for n, svcImp := range svcImpItems { if svcImp.Name == resImp.Spec.Name && svcImp.Namespace == resImp.Spec.Namespace { svcImpItems[n] = k8smcsv1alpha1.ServiceImport{} @@ -120,18 +127,17 @@ func (c *StaleController) cleanup() error { for _, svc := range mcsSvcItems { s := svc if s.Name != "" { - klog.InfoS("clean up Service", "service", klog.KObj(&s)) + klog.InfoS("Cleaning up stale Service", "service", klog.KObj(&s)) if err := c.Client.Delete(ctx, &s, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return err } } } - for _, svcImp := range svcImpItems { si := svcImp if si.Name != "" { - klog.InfoS("clean up ServiceImport", "serviceimport", klog.KObj(&si)) - if err = c.Client.Delete(ctx, &si, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + klog.InfoS("Cleaning up stale ServiceImport", "serviceimport", klog.KObj(&si)) + if err := c.Client.Delete(ctx, &si, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return err } } @@ -167,7 +173,7 @@ func (c *StaleController) cleanup() error { for _, r := range resExpItems { re := r if re.Name != "" { - klog.InfoS("clean up ResourceExport", "ResourceExport", klog.KObj(&re)) + klog.InfoS("Cleaning up ResourceExport", "ResourceExport", klog.KObj(&re)) if err := remoteCluster.Delete(ctx, &re, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return err } @@ -176,6 +182,34 @@ func (c *StaleController) cleanup() error { return nil } +func (c *StaleController) cleanupACNPResources(resImpList *mcsv1alpha1.ResourceImportList) error { + acnpList := &crdv1alpha1.ClusterNetworkPolicyList{} + if err := c.List(ctx, acnpList, &client.ListOptions{}); err != nil { + return err + } + staleMCACNPItems := map[string]crdv1alpha1.ClusterNetworkPolicy{} + for _, acnp := range acnpList.Items { + if _, ok := acnp.Annotations[common.AntreaMCACNPAnnotation]; ok { + staleMCACNPItems[acnp.Name] = acnp + } + } + for _, resImp := range resImpList.Items { + if resImp.Spec.Kind == common.AntreaClusterNetworkPolicyKind { + acnpNameFromResImp := common.AntreaMCSPrefix + resImp.Spec.Name + if _, ok := staleMCACNPItems[acnpNameFromResImp]; ok { + delete(staleMCACNPItems, acnpNameFromResImp) + } + } + } + for _, acnp := range staleMCACNPItems { + klog.InfoS("Cleaning up stale ACNP", "acnp", klog.KObj(&acnp)) + if err := c.Client.Delete(ctx, &acnp, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + return err + } + } + return nil +} + // Enqueue will be called after StaleController is initialized. func (c *StaleController) Enqueue() { // The key can be anything as we only have single item. diff --git a/multicluster/controllers/multicluster/stale_controller_test.go b/multicluster/controllers/multicluster/stale_controller_test.go index eacc0dd66f9..2cfcc5ef7b7 100644 --- a/multicluster/controllers/multicluster/stale_controller_test.go +++ b/multicluster/controllers/multicluster/stale_controller_test.go @@ -18,6 +18,7 @@ package multicluster import ( "context" + "k8s.io/apimachinery/pkg/util/sets" "testing" corev1 "k8s.io/api/core/v1" @@ -30,9 +31,11 @@ import ( mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" ) func TestStaleController_CleanupService(t *testing.T) { + localClusterID = "cluster-a" remoteMgr := commonarea.NewRemoteCommonAreaManager("test-clusterset", common.ClusterID(localClusterID)) go remoteMgr.Start() @@ -62,6 +65,7 @@ func TestStaleController_CleanupService(t *testing.T) { Spec: mcsv1alpha1.ResourceImportSpec{ Name: "non-nginx", Namespace: "default", + Kind: common.ServiceKind, }, } tests := []struct { @@ -123,6 +127,94 @@ func TestStaleController_CleanupService(t *testing.T) { } } +func TestStaleController_CleanupACNP(t *testing.T) { + localClusterID = "cluster-a" + remoteMgr := commonarea.NewRemoteCommonAreaManager("test-clusterset", common.ClusterID(localClusterID)) + go remoteMgr.Start() + + acnpImportName := "acnp-for-isolation" + acnpResImportName := leaderNamespace + "-" + acnpImportName + acnpResImport := mcsv1alpha1.ResourceImport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: acnpResImportName, + }, + Spec: mcsv1alpha1.ResourceImportSpec{ + Name: acnpImportName, + Kind: common.AntreaClusterNetworkPolicyKind, + ClusterNetworkPolicy: &v1alpha1.ClusterNetworkPolicySpec{ + Tier: "securityops", + Priority: 1.0, + AppliedTo: []v1alpha1.NetworkPolicyPeer{ + {NamespaceSelector: &metav1.LabelSelector{}}, + }, + }, + }, + } + acnp1 := v1alpha1.ClusterNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.AntreaMCSPrefix + acnpImportName, + Annotations: map[string]string{common.AntreaMCACNPAnnotation: "true"}, + }, + } + acnp2 := v1alpha1.ClusterNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.AntreaMCSPrefix + "some-deleted-resimp", + Annotations: map[string]string{common.AntreaMCACNPAnnotation: "true"}, + }, + } + acnp3 := v1alpha1.ClusterNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "non-mcs-acnp", + }, + } + tests := []struct { + name string + existingACNPList *v1alpha1.ClusterNetworkPolicyList + existResImpList *mcsv1alpha1.ResourceImportList + expectedACNPRemaining sets.String + }{ + { + name: "cleanup stale ACNP", + existingACNPList: &v1alpha1.ClusterNetworkPolicyList{ + Items: []v1alpha1.ClusterNetworkPolicy{ + acnp1, acnp2, acnp3, + }, + }, + existResImpList: &mcsv1alpha1.ResourceImportList{ + Items: []mcsv1alpha1.ResourceImport{ + acnpResImport, + }, + }, + expectedACNPRemaining: sets.NewString(common.AntreaMCSPrefix+acnpImportName, "non-mcs-acnp"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existingACNPList).Build() + fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existResImpList).Build() + _ = commonarea.NewFakeRemoteCommonArea(scheme, &remoteMgr, fakeRemoteClient, "leader-cluster", "default") + + c := NewStaleController(fakeClient, scheme, &remoteMgr) + if err := c.cleanup(); err != nil { + t.Errorf("StaleController.cleanup() should clean up all stale ACNPs but got err = %v", err) + } + ctx := context.TODO() + acnpList := &v1alpha1.ClusterNetworkPolicyList{} + if err := fakeClient.List(ctx, acnpList, &client.ListOptions{}); err != nil { + t.Errorf("Error when listing the ACNPs after cleanup") + } + acnpRemaining := sets.NewString() + for _, acnp := range acnpList.Items { + acnpRemaining.Insert(acnp.Name) + } + if !acnpRemaining.Equal(tt.expectedACNPRemaining) { + t.Errorf("Unexpected stale ACNP cleanup result. Expected: %v, Actual: %v", tt.expectedACNPRemaining, acnpRemaining) + } + }) + } +} + func TestStaleController_CleanupResourceExport(t *testing.T) { localClusterID = "cluster-a" remoteMgr := commonarea.NewRemoteCommonAreaManager("test-clusterset", common.ClusterID(localClusterID)) diff --git a/multicluster/controllers/multicluster/test_data.go b/multicluster/controllers/multicluster/test_data.go index e8683da85cf..160db53e4c8 100644 --- a/multicluster/controllers/multicluster/test_data.go +++ b/multicluster/controllers/multicluster/test_data.go @@ -29,6 +29,7 @@ import ( k8smcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" ) var ( @@ -115,4 +116,5 @@ func init() { utilruntime.Must(mcsv1alpha1.AddToScheme(scheme)) utilruntime.Must(k8smcsapi.AddToScheme(scheme)) utilruntime.Must(k8sscheme.AddToScheme(scheme)) + utilruntime.Must(crdv1alpha1.AddToScheme(scheme)) }