Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alleviate excessive periodic logging during conflict check #1400

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN
return nil, errors.Wrap(err, "error converting resource")
}

agentController.serviceExportClient = &ServiceExportClient{
NamespaceableResourceInterface: syncerConf.LocalClient.Resource(*gvr),
converter: converter{scheme: syncerConf.Scheme},
}

agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient)
if err != nil {
return nil, err
}

agentController.localServiceImportFederator = federate.NewCreateOrUpdateFederator(syncerConf.LocalClient, syncerConf.RestMapper,
spec.Namespace, "")

Expand Down Expand Up @@ -116,6 +106,17 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN
return nil, errors.Wrap(err, "error creating Service syncer")
}

agentController.serviceExportClient = &ServiceExportClient{
NamespaceableResourceInterface: syncerConf.LocalClient.Resource(*gvr),
converter: converter{scheme: syncerConf.Scheme},
localSyncer: agentController.serviceExportSyncer,
}

agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient)
if err != nil {
return nil, err
}

agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf,
agentController.endpointSliceController.syncer.GetBrokerClient(),
agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient)
Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func testClusterIPServiceInOneCluster() {
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "AwaitingExport"),
newServiceExportReadyCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)

By(fmt.Sprintf("Ensure cluster %q does not try to update the status for a non-existent ServiceExport",
t.cluster2.clusterID))

t.cluster2.ensureNoServiceExportActions()
})
})

Expand Down Expand Up @@ -329,6 +334,10 @@ func testClusterIPServiceInTwoClusters() {
t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)

By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status")

t.cluster1.ensureNoServiceExportActions()
})

Context("with differing ports", func() {
Expand Down
11 changes: 11 additions & 0 deletions pkg/agent/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/syncer/broker"
"github.com/submariner-io/admiral/pkg/syncer/test"
testutil "github.com/submariner-io/admiral/pkg/test"
"github.com/submariner-io/lighthouse/pkg/agent/controller"
"github.com/submariner-io/lighthouse/pkg/constants"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -108,6 +109,8 @@ func init() {
if err != nil {
panic(err)
}

controller.BrokerResyncPeriod = time.Millisecond * 100
}

func TestController(t *testing.T) {
Expand Down Expand Up @@ -611,6 +614,14 @@ func (c *cluster) ensureNoEndpointSlice() {
}, 300*time.Millisecond).Should(BeZero(), "Unexpected EndpointSlice")
}

func (c *cluster) ensureNoServiceExportActions() {
c.localDynClient.Fake.ClearActions()

Consistently(func() []string {
return testutil.GetOccurredActionVerbs(&c.localDynClient.Fake, "serviceexports", "get", "update")
}, 500*time.Millisecond).Should(BeEmpty())
}

func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected *mcsv1a1.ServiceImport) {
sortSlices := func(si *mcsv1a1.ServiceImport) {
sort.SliceStable(si.Spec.Ports, func(i, j int) bool {
Expand Down
44 changes: 26 additions & 18 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
Expand All @@ -36,6 +37,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

Expand All @@ -62,11 +64,11 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy
OnSuccessfulSyncToBroker: c.onLocalEndpointSliceSynced,
BrokerResourceType: &discovery.EndpointSlice{},
TransformBrokerToLocal: c.onRemoteEndpointSlice,
OnSuccessfulSyncFromBroker: func(obj runtime.Object, _ syncer.Operation) bool {
c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice))
OnSuccessfulSyncFromBroker: func(obj runtime.Object, op syncer.Operation) bool {
c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice), op)
return false
},
BrokerResyncPeriod: brokerResyncePeriod,
BrokerResyncPeriod: BrokerResyncPeriod,
},
}

Expand Down Expand Up @@ -162,7 +164,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object,
c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady,
corev1.ConditionTrue, "", "Service was successfully exported to the broker"))

c.enqueueForConflictCheck(endpointSlice)
c.enqueueForConflictCheck(endpointSlice, op)
}
}

Expand Down Expand Up @@ -198,24 +200,21 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di
return true
}

func (c *EndpointSliceController) checkForConflicts(key, name, namespace string) (bool, error) {
epsList, err := c.syncer.GetLocalClient().Resource(endpointSliceGVR).Namespace(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: k8slabels.SelectorFromSet(map[string]string{
discovery.LabelManagedBy: constants.LabelValueManagedBy,
mcsv1a1.LabelServiceName: name,
}).String(),
})
if err != nil {
return true, errors.Wrapf(err, "error during conflict check for %q", key)
func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (bool, error) {
localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace)
if localServiceExport == nil {
return false, nil
}

epsList := c.syncer.ListLocalResources(&discovery.EndpointSlice{})

var prevServicePorts []mcsv1a1.ServicePort
var intersectedServicePorts []mcsv1a1.ServicePort
clusterNames := make([]string, 0, len(epsList.Items))
clusterNames := make([]string, 0, len(epsList))
conflict := false

for i := range epsList.Items {
eps := c.serviceExportClient.toEndpointSlice(&epsList.Items[i])
for _, o := range epsList {
eps := o.(*discovery.EndpointSlice)

servicePorts := c.serviceExportClient.toServicePorts(eps.Ports)
if prevServicePorts == nil {
Expand All @@ -236,18 +235,27 @@ func (c *EndpointSliceController) checkForConflicts(key, name, namespace string)
fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+
"The service will expose the intersection of all the ports: %s",
fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts))))
} else {
} else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil {
c.serviceExportClient.removeStatusCondition(name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason)
}

return false, nil
}

func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice) {
func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice, op syncer.Operation) {
if eps.Labels[constants.LabelIsHeadless] != "false" {
return
}

// Since the conflict checking works off of the local cache for efficiency, wait a little bit here for the local cache to be updated
// with the latest state of the EndpointSlice.
_ = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
_, found, _ := c.syncer.GetLocalResource(eps.Name, eps.Namespace, eps)
return (op == syncer.Delete && !found) || (op != syncer.Delete && found), nil
},
)

c.conflictCheckWorkQueue.Enqueue(&discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: eps.Labels[mcsv1a1.LabelServiceName],
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/controller/service_export_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpd
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
obj, err := c.Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
logger.Infof("ServiceExport (%s/%s) not found - unable to update status", namespace, name)
logger.V(log.TRACE).Infof("ServiceExport (%s/%s) not found - unable to update status", namespace, name)
return nil
} else if err != nil {
return errors.Wrap(err, "error retrieving ServiceExport")
Expand All @@ -127,6 +127,15 @@ func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpd
}
}

func (c *ServiceExportClient) getLocalInstance(name, namespace string) *mcsv1a1.ServiceExport {
obj, found, _ := c.localSyncer.GetResource(name, namespace)
if !found {
return nil
}

return obj.(*mcsv1a1.ServiceExport)
}

func serviceExportConditionEqual(c1, c2 *mcsv1a1.ServiceExportCondition) bool {
return c1.Type == c2.Type && c1.Status == c2.Status && reflect.DeepEqual(c1.Reason, c2.Reason) &&
reflect.DeepEqual(c1.Message, c2.Message)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen
Transform: controller.onRemoteServiceImport,
OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker,
Scheme: syncerConfig.Scheme,
ResyncPeriod: brokerResyncePeriod,
ResyncPeriod: BrokerResyncPeriod,
SyncCounterOpts: &prometheus.GaugeOpts{
Name: syncerMetricNames.ServiceImportCounterName,
Help: "Count of imported services",
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
portConflictReason = "ConflictingPorts"
)

var brokerResyncePeriod = time.Minute * 2
var BrokerResyncPeriod = time.Minute * 2

type converter struct {
scheme *runtime.Scheme
Expand Down Expand Up @@ -118,6 +118,7 @@ type EndpointSliceController struct {
type ServiceExportClient struct {
dynamic.NamespaceableResourceInterface
converter
localSyncer syncer.Interface
}

type globalIngressIPCache struct {
Expand Down
Loading