Skip to content

Commit

Permalink
Optimize conflict checking
Browse files Browse the repository at this point in the history
Use the local cache to list the EndpointSlices instead of the API server
client.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Oct 3, 2023
1 parent 16a68a7 commit 05d9965
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
Expand All @@ -36,6 +37,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

Expand All @@ -62,8 +64,8 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy
OnSuccessfulSyncToBroker: c.onLocalEndpointSliceSynced,
BrokerResourceType: &discovery.EndpointSlice{},
TransformBrokerToLocal: c.onRemoteEndpointSlice,
OnSuccessfulSyncFromBroker: func(obj runtime.Object, _ syncer.Operation) bool {
c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice))
OnSuccessfulSyncFromBroker: func(obj runtime.Object, op syncer.Operation) bool {
c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice), op)
return false
},
BrokerResyncPeriod: BrokerResyncePeriod,
Expand Down Expand Up @@ -162,7 +164,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object,
c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady,
corev1.ConditionTrue, "", "Service was successfully exported to the broker"))

c.enqueueForConflictCheck(endpointSlice)
c.enqueueForConflictCheck(endpointSlice, op)
}
}

Expand Down Expand Up @@ -198,29 +200,21 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di
return true
}

func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) (bool, error) {
func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (bool, error) {
localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace)
if localServiceExport == nil {
return false, nil
}

epsList, err := c.syncer.GetLocalClient().Resource(endpointSliceGVR).Namespace(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: k8slabels.SelectorFromSet(map[string]string{
discovery.LabelManagedBy: constants.LabelValueManagedBy,
mcsv1a1.LabelServiceName: name,
}).String(),
})
if err != nil {
return true, errors.Wrapf(err, "error during conflict check for %q", key)
}
epsList := c.syncer.ListLocalResources(&discovery.EndpointSlice{})

var prevServicePorts []mcsv1a1.ServicePort
var intersectedServicePorts []mcsv1a1.ServicePort
clusterNames := make([]string, 0, len(epsList.Items))
clusterNames := make([]string, 0, len(epsList))
conflict := false

for i := range epsList.Items {
eps := c.serviceExportClient.toEndpointSlice(&epsList.Items[i])
for _, o := range epsList {
eps := o.(*discovery.EndpointSlice)

servicePorts := c.serviceExportClient.toServicePorts(eps.Ports)
if prevServicePorts == nil {
Expand Down Expand Up @@ -248,11 +242,20 @@ func (c *EndpointSliceController) checkForConflicts(key, name, namespace string)
return false, nil
}

func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice) {
func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice, op syncer.Operation) {
if eps.Labels[constants.LabelIsHeadless] != "false" {
return
}

// Since the conflict checking works off of the local cache for efficiency, wait a little bit here for the local cache to be updated
// with the latest state of the EndpointSlice.
_ = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
_, found, _ := c.syncer.GetLocalResource(eps.Name, eps.Namespace, eps)
return (op == syncer.Delete && !found) || (op != syncer.Delete && found), nil
},
)

c.conflictCheckWorkQueue.Enqueue(&discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: eps.Labels[mcsv1a1.LabelServiceName],
Expand Down

0 comments on commit 05d9965

Please sign in to comment.