From 030ec2d607596ad48ac18c356bd6909f52500ce2 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 3 Oct 2023 19:30:56 -0400 Subject: [PATCH 1/2] Alleviate excessive periodic logging during conflict check If a service is imported without a local service export, the conflict checking tries to update the status of a non-existent ServiceExport, specifically to remove the Conflict status condition. This results in a log message: "ServiceExport (...) not found - unable to update status" Due to the 2 minute resync intervaL for the EndpointSlice syncer, this can exhaust the pod disk storage over time. To alleviate this issue, check the local cache for a ServiceExport and, if non-existent, skip the conflict checking. If it does exist but the Conflict status condition doesn't exist in the local ServiceExport then elide trying to remove the status condition. Finally, change the log level to TRACE for the offending message. Signed-off-by: Tom Pantelis --- pkg/agent/controller/agent.go | 21 ++++++++++--------- .../controller/clusterip_service_test.go | 9 ++++++++ pkg/agent/controller/controller_suite_test.go | 11 ++++++++++ pkg/agent/controller/endpoint_slice.go | 9 ++++++-- pkg/agent/controller/service_export_client.go | 11 +++++++++- pkg/agent/controller/service_import.go | 2 +- pkg/agent/controller/types.go | 3 ++- 7 files changed, 51 insertions(+), 15 deletions(-) diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index ec4badf36..ade0a91a8 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -72,16 +72,6 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN return nil, errors.Wrap(err, "error converting resource") } - agentController.serviceExportClient = &ServiceExportClient{ - NamespaceableResourceInterface: syncerConf.LocalClient.Resource(*gvr), - converter: converter{scheme: syncerConf.Scheme}, - } - - agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient) - if err != nil { - return nil, err - } - agentController.localServiceImportFederator = federate.NewCreateOrUpdateFederator(syncerConf.LocalClient, syncerConf.RestMapper, spec.Namespace, "") @@ -116,6 +106,17 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN return nil, errors.Wrap(err, "error creating Service syncer") } + agentController.serviceExportClient = &ServiceExportClient{ + NamespaceableResourceInterface: syncerConf.LocalClient.Resource(*gvr), + converter: converter{scheme: syncerConf.Scheme}, + localSyncer: agentController.serviceExportSyncer, + } + + agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient) + if err != nil { + return nil, err + } + agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf, agentController.endpointSliceController.syncer.GetBrokerClient(), agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient) diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 98924fd89..cc2ae9277 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -66,6 +66,11 @@ func testClusterIPServiceInOneCluster() { t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "AwaitingExport"), newServiceExportReadyCondition(corev1.ConditionTrue, "")) t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + + By(fmt.Sprintf("Ensure cluster %q does not try to update the status for a non-existent ServiceExport", + t.cluster2.clusterID)) + + t.cluster2.ensureNoServiceExportActions() }) }) @@ -329,6 +334,10 @@ func testClusterIPServiceInTwoClusters() { t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, "")) t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + + By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status") + + t.cluster1.ensureNoServiceExportActions() }) Context("with differing ports", func() { diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index fc3209403..8a63ce9e5 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -38,6 +38,7 @@ import ( "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/broker" "github.com/submariner-io/admiral/pkg/syncer/test" + testutil "github.com/submariner-io/admiral/pkg/test" "github.com/submariner-io/lighthouse/pkg/agent/controller" "github.com/submariner-io/lighthouse/pkg/constants" corev1 "k8s.io/api/core/v1" @@ -108,6 +109,8 @@ func init() { if err != nil { panic(err) } + + controller.BrokerResyncPeriod = time.Millisecond * 100 } func TestController(t *testing.T) { @@ -611,6 +614,14 @@ func (c *cluster) ensureNoEndpointSlice() { }, 300*time.Millisecond).Should(BeZero(), "Unexpected EndpointSlice") } +func (c *cluster) ensureNoServiceExportActions() { + c.localDynClient.Fake.ClearActions() + + Consistently(func() []string { + return testutil.GetOccurredActionVerbs(&c.localDynClient.Fake, "serviceexports", "get", "update") + }, 500*time.Millisecond).Should(BeEmpty()) +} + func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected *mcsv1a1.ServiceImport) { sortSlices := func(si *mcsv1a1.ServiceImport) { sort.SliceStable(si.Spec.Ports, func(i, j int) bool { diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index e026f31fe..b3732a111 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -66,7 +66,7 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice)) return false }, - BrokerResyncPeriod: brokerResyncePeriod, + BrokerResyncPeriod: BrokerResyncPeriod, }, } @@ -199,6 +199,11 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di } func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) (bool, error) { + localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace) + if localServiceExport == nil { + return false, nil + } + epsList, err := c.syncer.GetLocalClient().Resource(endpointSliceGVR).Namespace(namespace).List(context.Background(), metav1.ListOptions{ LabelSelector: k8slabels.SelectorFromSet(map[string]string{ discovery.LabelManagedBy: constants.LabelValueManagedBy, @@ -236,7 +241,7 @@ func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) - } else { + } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { c.serviceExportClient.removeStatusCondition(name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) } diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index 00fd3f1c8..cf92132b5 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -104,7 +104,7 @@ func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpd err := retry.RetryOnConflict(retry.DefaultRetry, func() error { obj, err := c.Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - logger.Infof("ServiceExport (%s/%s) not found - unable to update status", namespace, name) + logger.V(log.TRACE).Infof("ServiceExport (%s/%s) not found - unable to update status", namespace, name) return nil } else if err != nil { return errors.Wrap(err, "error retrieving ServiceExport") @@ -127,6 +127,15 @@ func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpd } } +func (c *ServiceExportClient) getLocalInstance(name, namespace string) *mcsv1a1.ServiceExport { + obj, found, _ := c.localSyncer.GetResource(name, namespace) + if !found { + return nil + } + + return obj.(*mcsv1a1.ServiceExport) +} + func serviceExportConditionEqual(c1, c2 *mcsv1a1.ServiceExportCondition) bool { return c1.Type == c2.Type && c1.Status == c2.Status && reflect.DeepEqual(c1.Reason, c2.Reason) && reflect.DeepEqual(c1.Message, c2.Message) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index dd4675ee9..ed4949b25 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -96,7 +96,7 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen Transform: controller.onRemoteServiceImport, OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker, Scheme: syncerConfig.Scheme, - ResyncPeriod: brokerResyncePeriod, + ResyncPeriod: BrokerResyncPeriod, SyncCounterOpts: &prometheus.GaugeOpts{ Name: syncerMetricNames.ServiceImportCounterName, Help: "Count of imported services", diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index e24a92de4..f3b2ecaad 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -39,7 +39,7 @@ const ( portConflictReason = "ConflictingPorts" ) -var brokerResyncePeriod = time.Minute * 2 +var BrokerResyncPeriod = time.Minute * 2 type converter struct { scheme *runtime.Scheme @@ -118,6 +118,7 @@ type EndpointSliceController struct { type ServiceExportClient struct { dynamic.NamespaceableResourceInterface converter + localSyncer syncer.Interface } type globalIngressIPCache struct { From 0f474aaa5427f4e4869a3b1197b05335a2d90e75 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 3 Oct 2023 19:46:45 -0400 Subject: [PATCH 2/2] Optimize conflict checking Use the local cache to list the EndpointSlices instead of the API server client. Signed-off-by: Tom Pantelis --- pkg/agent/controller/endpoint_slice.go | 37 ++++++++++++++------------ 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index b3732a111..1494915ed 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -23,6 +23,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" @@ -36,6 +37,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -62,8 +64,8 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy OnSuccessfulSyncToBroker: c.onLocalEndpointSliceSynced, BrokerResourceType: &discovery.EndpointSlice{}, TransformBrokerToLocal: c.onRemoteEndpointSlice, - OnSuccessfulSyncFromBroker: func(obj runtime.Object, _ syncer.Operation) bool { - c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice)) + OnSuccessfulSyncFromBroker: func(obj runtime.Object, op syncer.Operation) bool { + c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice), op) return false }, BrokerResyncPeriod: BrokerResyncPeriod, @@ -162,7 +164,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionTrue, "", "Service was successfully exported to the broker")) - c.enqueueForConflictCheck(endpointSlice) + c.enqueueForConflictCheck(endpointSlice, op) } } @@ -198,29 +200,21 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di return true } -func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) (bool, error) { +func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (bool, error) { localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace) if localServiceExport == nil { return false, nil } - epsList, err := c.syncer.GetLocalClient().Resource(endpointSliceGVR).Namespace(namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: k8slabels.SelectorFromSet(map[string]string{ - discovery.LabelManagedBy: constants.LabelValueManagedBy, - mcsv1a1.LabelServiceName: name, - }).String(), - }) - if err != nil { - return true, errors.Wrapf(err, "error during conflict check for %q", key) - } + epsList := c.syncer.ListLocalResources(&discovery.EndpointSlice{}) var prevServicePorts []mcsv1a1.ServicePort var intersectedServicePorts []mcsv1a1.ServicePort - clusterNames := make([]string, 0, len(epsList.Items)) + clusterNames := make([]string, 0, len(epsList)) conflict := false - for i := range epsList.Items { - eps := c.serviceExportClient.toEndpointSlice(&epsList.Items[i]) + for _, o := range epsList { + eps := o.(*discovery.EndpointSlice) servicePorts := c.serviceExportClient.toServicePorts(eps.Ports) if prevServicePorts == nil { @@ -248,11 +242,20 @@ func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) return false, nil } -func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice) { +func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice, op syncer.Operation) { if eps.Labels[constants.LabelIsHeadless] != "false" { return } + // Since the conflict checking works off of the local cache for efficiency, wait a little bit here for the local cache to be updated + // with the latest state of the EndpointSlice. + _ = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 100*time.Millisecond, true, + func(_ context.Context) (bool, error) { + _, found, _ := c.syncer.GetLocalResource(eps.Name, eps.Namespace, eps) + return (op == syncer.Delete && !found) || (op != syncer.Delete && found), nil + }, + ) + c.conflictCheckWorkQueue.Enqueue(&discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: eps.Labels[mcsv1a1.LabelServiceName],