diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 3e2524096..3eb805181 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -23,6 +23,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" @@ -36,6 +37,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -62,8 +64,8 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy OnSuccessfulSyncToBroker: c.onLocalEndpointSliceSynced, BrokerResourceType: &discovery.EndpointSlice{}, TransformBrokerToLocal: c.onRemoteEndpointSlice, - OnSuccessfulSyncFromBroker: func(obj runtime.Object, _ syncer.Operation) bool { - c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice)) + OnSuccessfulSyncFromBroker: func(obj runtime.Object, op syncer.Operation) bool { + c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice), op) return false }, BrokerResyncPeriod: BrokerResyncePeriod, @@ -162,7 +164,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionTrue, "", "Service was successfully exported to the broker")) - c.enqueueForConflictCheck(endpointSlice) + c.enqueueForConflictCheck(endpointSlice, op) } } @@ -198,29 +200,21 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di return true } -func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) (bool, error) { +func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (bool, error) { localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace) if localServiceExport == nil { return false, nil } - epsList, err := c.syncer.GetLocalClient().Resource(endpointSliceGVR).Namespace(namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: k8slabels.SelectorFromSet(map[string]string{ - discovery.LabelManagedBy: constants.LabelValueManagedBy, - mcsv1a1.LabelServiceName: name, - }).String(), - }) - if err != nil { - return true, errors.Wrapf(err, "error during conflict check for %q", key) - } + epsList := c.syncer.ListLocalResources(&discovery.EndpointSlice{}) var prevServicePorts []mcsv1a1.ServicePort var intersectedServicePorts []mcsv1a1.ServicePort - clusterNames := make([]string, 0, len(epsList.Items)) + clusterNames := make([]string, 0, len(epsList)) conflict := false - for i := range epsList.Items { - eps := c.serviceExportClient.toEndpointSlice(&epsList.Items[i]) + for _, o := range epsList { + eps := o.(*discovery.EndpointSlice) servicePorts := c.serviceExportClient.toServicePorts(eps.Ports) if prevServicePorts == nil { @@ -248,11 +242,20 @@ func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) return false, nil } -func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice) { +func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice, op syncer.Operation) { if eps.Labels[constants.LabelIsHeadless] != "false" { return } + // Since the conflict checking works off of the local cache for efficiency, wait a little bit here for the local cache to be updated + // with the latest state of the EndpointSlice. + _ = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 100*time.Millisecond, true, + func(_ context.Context) (bool, error) { + _, found, _ := c.syncer.GetLocalResource(eps.Name, eps.Namespace, eps) + return (op == syncer.Delete && !found) || (op != syncer.Delete && found), nil + }, + ) + c.conflictCheckWorkQueue.Enqueue(&discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: eps.Labels[mcsv1a1.LabelServiceName],