Skip to content

Commit

Permalink
Implement conflict checking for SessionAffinity
Browse files Browse the repository at this point in the history
The MCS spec's conflict resolution policy states that a "conflict will
be resolved by assigning precedence based on each ServiceExport's
creationTimestamp, from oldest to newest". We don't have a central
MCS controller with access to all cluster's ServiceExports but we
can store each cluster's ServiceExport creationTimestamp as
annotations in the aggregated ServiceImport and use them to
resolve conflicts. The SessionAffinity and SessionAffinityConfig
fields on the aggregated ServiceImport will be set by the cluster
with the oldest timestamp. The other clusters will set a
ServiceExportConflict condition if their corresponding local
service's fields do not match those on the aggregated ServiceImport.
If a local service is updated in any cluster, each cluster
re-evaluates the updated aggregated ServiceImport and either clears
or sets the conflict conditon. Also if the service from the
precedent cluster is unexported, the next precedent cluster will
set the fields.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and aswinsuryan committed Sep 5, 2024
1 parent 9476467 commit 687c11c
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 78 deletions.
207 changes: 193 additions & 14 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package controller_test
import (
"fmt"
"strconv"
"time"

. "github.com/onsi/ginkgo/v2"
"github.com/submariner-io/admiral/pkg/fake"
Expand Down Expand Up @@ -201,6 +202,9 @@ func testClusterIPServiceInOneCluster() {
t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}

t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

It("should be propagated to the ServiceImport", func() {
Expand Down Expand Up @@ -388,39 +392,55 @@ func testClusterIPServiceInTwoClusters() {

BeforeEach(func() {
t = newTestDiver()

t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}
})

JustBeforeEach(func() {
t.cluster1.createServiceEndpointSlices()
t.cluster1.createService()
t.cluster1.createServiceExport()

t.justBeforeEach()
t.cluster1.start(t, *t.syncerConfig)

// Sleep a little before starting the second cluster to ensure its resource CreationTimestamps will be
// later than the first cluster to ensure conflict checking in deterministic.
time.Sleep(100 * time.Millisecond)

t.cluster2.createServiceEndpointSlices()
t.cluster2.createService()
t.cluster2.createServiceExport()

t.cluster2.start(t, *t.syncerConfig)
})

AfterEach(func() {
t.afterEach()
})

It("should export the service in both clusters", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
Context("", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}

t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig

By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status")
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

t.cluster1.ensureNoServiceExportActions()
It("should export the service in both clusters", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)

By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status")

t.cluster1.ensureNoServiceExportActions()
})
})

Context("with differing ports", func() {
Expand Down Expand Up @@ -492,6 +512,165 @@ func testClusterIPServiceInTwoClusters() {
})
})
})

Context("with differing service SessionAffinity", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
})

It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() {
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})

Context("initially and after updating the SessionAffinity on the conflicting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))

By("Updating the SessionAffinity on the service")

t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.cluster2.updateService()

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after updating the SessionAffinity on the oldest exporting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))

By("Updating the SessionAffinity on the service")

t.cluster1.service.Spec.SessionAffinity = t.cluster2.service.Spec.SessionAffinity
t.cluster1.updateService()

t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after the service on the oldest exporting cluster is unexported", func() {
It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))

By("Unexporting the service")

t.cluster1.deleteServiceExport()

t.aggregatedSessionAffinity = t.cluster2.service.Spec.SessionAffinity
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})
})

Context("with differing service SessionAffinityConfig", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity

t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() {
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})

Context("initially and after updating the SessionAffinityConfig on the conflicting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))

By("Updating the SessionAffinityConfig on the service")

t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
t.cluster2.updateService()

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after updating the SessionAffinityConfig on the oldest exporting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))

By("Updating the SessionAffinityConfig on the service")

t.cluster1.service.Spec.SessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig
t.cluster1.updateService()

t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after the service on the oldest exporting cluster is unexported", func() {
BeforeEach(func() {
t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(20))},
}
})

It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))

By("Unexporting the service")

t.cluster1.deleteServiceExport()

t.aggregatedSessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})
})

Context("with differing service SessionAffinity and SessionAffinityConfig", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity

t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

It("should resolve the conflicts and set the Conflict status condition on the conflicting cluster", func() {
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
fmt.Sprintf("%s,%s", controller.SessionAffinityConflictReason, controller.SessionAffinityConfigConflictReason)))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})
})
}

func testClusterIPServiceWithMultipleEPS() {
Expand Down
60 changes: 34 additions & 26 deletions pkg/agent/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,18 @@ type cluster struct {
}

type testDriver struct {
cluster1 cluster
cluster2 cluster
brokerServiceImportClient dynamic.NamespaceableResourceInterface
brokerEndpointSliceClient dynamic.ResourceInterface
brokerEndpointSliceReactor *fake.FailingReactor
stopCh chan struct{}
syncerConfig *broker.SyncerConfig
doStart bool
brokerServiceImportReactor *fake.FailingReactor
aggregatedServicePorts []mcsv1a1.ServicePort
cluster1 cluster
cluster2 cluster
brokerServiceImportClient dynamic.NamespaceableResourceInterface
brokerEndpointSliceClient dynamic.ResourceInterface
brokerEndpointSliceReactor *fake.FailingReactor
stopCh chan struct{}
syncerConfig *broker.SyncerConfig
doStart bool
brokerServiceImportReactor *fake.FailingReactor
aggregatedServicePorts []mcsv1a1.ServicePort
aggregatedSessionAffinity corev1.ServiceAffinity
aggregatedSessionAffinityConfig *corev1.SessionAffinityConfig
}

func newTestDiver() *testDriver {
Expand All @@ -163,7 +165,8 @@ func newTestDiver() *testDriver {
fake.AddBasicReactors(&brokerClient.Fake)

t := &testDriver{
aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2},
aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2},
aggregatedSessionAffinity: corev1.ServiceAffinityNone,
cluster1: cluster{
clusterID: clusterID1,
agentSpec: controller.AgentSpecification{
Expand Down Expand Up @@ -595,8 +598,7 @@ func (c *cluster) findLocalServiceImport() *mcsv1a1.ServiceImport {
for i := range list.Items {
if list.Items[i].GetLabels()[mcsv1a1.LabelServiceName] == c.service.Name &&
list.Items[i].GetLabels()[constants.LabelSourceNamespace] == c.service.Namespace {
serviceImport := &mcsv1a1.ServiceImport{}
Expect(scheme.Scheme.Convert(&list.Items[i], serviceImport, nil)).To(Succeed())
serviceImport := toServiceImport(&list.Items[i])

return serviceImport
}
Expand Down Expand Up @@ -645,8 +647,7 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected
return false, nil
}

serviceImport = &mcsv1a1.ServiceImport{}
Expect(scheme.Scheme.Convert(obj, serviceImport, nil)).To(Succeed())
serviceImport = toServiceImport(obj)

sortSlices(serviceImport)

Expand All @@ -667,6 +668,13 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected
Expect(serviceImport.Labels).To(BeEmpty())
}

func getServiceImport(client dynamic.NamespaceableResourceInterface, namespace, name string) *mcsv1a1.ServiceImport {
obj, err := client.Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).To(Succeed())

return toServiceImport(obj)
}

func findEndpointSlices(client dynamic.ResourceInterface, namespace, name, clusterID string) []*discovery.EndpointSlice {
list, err := client.List(context.TODO(), metav1.ListOptions{})
if resource.IsMissingNamespaceErr(err) {
Expand Down Expand Up @@ -777,9 +785,10 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp
Namespace: test.RemoteNamespace,
},
Spec: mcsv1a1.ServiceImportSpec{
Type: sType,
Ports: []mcsv1a1.ServicePort{},
SessionAffinity: corev1.ServiceAffinityNone,
Type: sType,
Ports: []mcsv1a1.ServicePort{},
SessionAffinity: t.aggregatedSessionAffinity,
SessionAffinityConfig: t.aggregatedSessionAffinityConfig,
},
}

Expand All @@ -791,14 +800,6 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp
for _, c := range clusters {
expServiceImport.Status.Clusters = append(expServiceImport.Status.Clusters,
mcsv1a1.ClusterStatus{Cluster: c.clusterID})

if c.service.Spec.SessionAffinity != corev1.ServiceAffinityNone {
expServiceImport.Spec.SessionAffinity = c.service.Spec.SessionAffinity
}

if c.service.Spec.SessionAffinityConfig != nil {
expServiceImport.Spec.SessionAffinityConfig = c.service.Spec.SessionAffinityConfig
}
}
}

Expand Down Expand Up @@ -938,6 +939,13 @@ func toServiceExport(obj interface{}) *mcsv1a1.ServiceExport {
return se
}

func toServiceImport(obj interface{}) *mcsv1a1.ServiceImport {
si := &mcsv1a1.ServiceImport{}
Expect(scheme.Scheme.Convert(obj, si, nil)).To(Succeed())

return si
}

func (t *testDriver) awaitNonHeadlessServiceExported(clusters ...*cluster) {
t.awaitServiceExported(mcsv1a1.ClusterSetIP, clusters...)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (
fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+
"The service will expose the intersection of all the ports: %s",
fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts))))
} else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil {
} else if c.serviceExportClient.hasCondition(name, namespace, mcsv1a1.ServiceExportConflict, PortConflictReason) {
c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition(
mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, ""))
}
Expand Down
Loading

0 comments on commit 687c11c

Please sign in to comment.