Skip to content

Commit

Permalink
Aggregate ServiceExport conflict conditions
Browse files Browse the repository at this point in the history
It's possible there could be more than one concurrent conflict
however the MCS API only defines one conflict type so a second
conflict would overwrite the first. We could define our own
conflict types but the MCS spec does specifically reference the
ServiceExportConflict type, defined as "Conflict", in the conflict
resolution section. It doesn't mention what the behavior should
be if there's more than one conflict.

To represent multiple conflicts, We can still use the
ServiceExportConflict type but aggregate the differing condition
reasons and messages. For the Reason field, join multiple reasons
separated by a comma and, for the Message field, join with a new line.
When a particular conflict is resolved, remove it's reason and message.
When there's no more conflicts, ie Reason is empty, set the condition
Status to False.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Aug 26, 2024
1 parent 9b1ddf8 commit 098cdcc
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 35 deletions.
25 changes: 16 additions & 9 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/syncer/test"
testutil "github.com/submariner-io/admiral/pkg/test"
"github.com/submariner-io/lighthouse/pkg/agent/controller"
"github.com/submariner-io/lighthouse/pkg/constants"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -369,6 +370,12 @@ func testClusterIPServiceInOneCluster() {
}

func testClusterIPServiceInTwoClusters() {
noConflictCondition := &mcsv1a1.ServiceExportCondition{
Type: mcsv1a1.ServiceExportConflict,
Status: corev1.ConditionFalse,
Reason: ptr.To(""),
}

var t *testDriver

BeforeEach(func() {
Expand Down Expand Up @@ -417,7 +424,7 @@ func testClusterIPServiceInTwoClusters() {
It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)

condition := newServiceExportConflictCondition("ConflictingPorts")
condition := newServiceExportConflictCondition(controller.PortConflictReason)
t.cluster1.awaitServiceExportCondition(condition)
t.cluster2.awaitServiceExportCondition(condition)
})
Expand All @@ -431,22 +438,22 @@ func testClusterIPServiceInTwoClusters() {

t.awaitNoEndpointSlice(&t.cluster1)
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster2.service.Name, t.cluster2.service.Namespace, &t.cluster2)
t.cluster2.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after updating the ports to match", func() {
It("should correctly update the ports in the aggregated ServiceImport and clear the Conflict status condition", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingPorts"))
t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition(controller.PortConflictReason))

t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2}
t.cluster2.service.Spec.Ports = []corev1.ServicePort{toServicePort(port1), toServicePort(port2)}
t.cluster2.updateService()

t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster1.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster1.awaitServiceExportCondition(noConflictCondition)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})
})
Expand All @@ -460,20 +467,20 @@ func testClusterIPServiceInTwoClusters() {
t.cluster2.ensureNoEndpointSlice()
t.awaitNonHeadlessServiceExported(&t.cluster1)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingType"))
t.cluster2.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed"))
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.TypeConflictReason))
t.cluster2.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})

Context("initially and after updating the service types to match", func() {
It("should export the service in both clusters", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingType"))
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.TypeConflictReason))

t.cluster2.service.Spec.ClusterIP = t.cluster2.serviceIP
t.cluster2.updateService()

t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})
})
Expand Down
12 changes: 0 additions & 12 deletions pkg/agent/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,18 +584,6 @@ func (c *cluster) ensureNoServiceExportCondition(condType mcsv1a1.ServiceExportC
}
}

func (c *cluster) awaitNoServiceExportCondition(condType mcsv1a1.ServiceExportConditionType, serviceExports ...*mcsv1a1.ServiceExport) {
if len(serviceExports) == 0 {
serviceExports = []*mcsv1a1.ServiceExport{c.serviceExport}
}

for _, se := range serviceExports {
Eventually(func() interface{} {
return c.retrieveServiceExportCondition(se, condType)
}).Should(BeNil(), "Unexpected ServiceExport status condition")
}
}

func (c *cluster) awaitServiceUnavailableStatus() {
c.awaitServiceExportCondition(newServiceExportValidCondition(corev1.ConditionFalse, "ServiceUnavailable"))
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object,
err = c.serviceImportAggregator.updateOnCreateOrUpdate(ctx, serviceName, serviceNamespace)
if err != nil {
c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace,
newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason,
newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, ExportFailedReason,
fmt.Sprintf("Unable to export: %v", err)))
} else {
c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace,
Expand Down Expand Up @@ -264,12 +264,13 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (

if conflict {
c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition(
mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, portConflictReason,
mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, PortConflictReason,
fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+
"The service will expose the intersection of all the ports: %s",
fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts))))
} else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil {
c.serviceExportClient.RemoveStatusCondition(ctx, name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason)
c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition(
mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, ""))
}

return false, nil
Expand Down
65 changes: 65 additions & 0 deletions pkg/agent/controller/service_export_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ package controller
import (
"context"
"reflect"
goslices "slices"
"strings"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/slices"
"github.com/submariner-io/lighthouse/pkg/constants"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

Expand Down Expand Up @@ -96,12 +100,24 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam
condition := &conditions[i]

prevCond := findStatusCondition(toUpdate.Status.Conditions, condition.Type)

if prevCond == nil {
if condition.Type == mcsv1a1.ServiceExportConflict && condition.Status == corev1.ConditionFalse {
continue
}

logger.V(log.DEBUG).Infof("Add status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q",
namespace, name, condition.Type, condition.Status, *condition.Reason, *condition.Message)

toUpdate.Status.Conditions = append(toUpdate.Status.Conditions, *condition)
updated = true
} else if condition.Type == mcsv1a1.ServiceExportConflict {
updated = updated || c.mergeConflictCondition(prevCond, condition)
if updated {
logger.V(log.DEBUG).Infof(
"Update status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q",
namespace, name, condition.Type, prevCond.Status, *prevCond.Reason, *prevCond.Message)
}
} else if serviceExportConditionEqual(prevCond, condition) {
logger.V(log.TRACE).Infof("Last ServiceExportCondition for (%s/%s) is equal - not updating status: %#v",
namespace, name, prevCond)
Expand All @@ -118,6 +134,55 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam
})
}

func (c *ServiceExportClient) mergeConflictCondition(to, from *mcsv1a1.ServiceExportCondition) bool {
var reasons, messages []string

if ptr.Deref(to.Reason, "") != "" {
reasons = strings.Split(ptr.Deref(to.Reason, ""), ",")
}

if ptr.Deref(to.Message, "") != "" {
messages = strings.Split(ptr.Deref(to.Message, ""), "\n")
}

index := goslices.Index(reasons, *from.Reason)
if index >= 0 {
if from.Status == corev1.ConditionTrue {
if index < len(messages) {
messages[index] = *from.Message
}
} else {
reasons = goslices.Delete(reasons, index, index+1)

if index < len(messages) {
messages = goslices.Delete(messages, index, index+1)
}
}
} else if from.Status == corev1.ConditionTrue {
reasons = append(reasons, *from.Reason)
messages = append(messages, *from.Message)
}

newReason := strings.Join(reasons, ",")
newMessage := strings.Join(messages, "\n")
updated := newReason != ptr.Deref(to.Reason, "") || newMessage != ptr.Deref(to.Message, "")

to.Reason = ptr.To(newReason)
to.Message = ptr.To(newMessage)

if *to.Reason != "" {
to.Status = corev1.ConditionTrue
} else {
to.Status = corev1.ConditionFalse
}

if updated {
to.LastTransitionTime = from.LastTransitionTime
}

return updated
}

func (c *ServiceExportClient) doUpdate(ctx context.Context, name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
obj, err := c.Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
Expand Down
101 changes: 101 additions & 0 deletions pkg/agent/controller/service_export_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package controller_test

import (
"context"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -126,4 +127,104 @@ var _ = Describe("ServiceExportClient", func() {
constants.ServiceExportReady)).ToNot(BeNil())
})
})

Context("with Conflict condition type", func() {
It("should aggregate the different reasons and messages", func() {
// The condition shouldn't be added with Status False.

serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, mcsv1a1.ServiceExportCondition{
Type: mcsv1a1.ServiceExportConflict,
Status: corev1.ConditionFalse,
Reason: ptr.To(""),
Message: ptr.To(""),
})

Expect(controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions,
mcsv1a1.ServiceExportConflict)).To(BeNil())

portConflictMsg := "The service ports conflict"
typeConflictMsg := "The service types conflict"

cond := mcsv1a1.ServiceExportCondition{
Type: mcsv1a1.ServiceExportConflict,
Status: corev1.ConditionTrue,
Reason: ptr.To(controller.PortConflictReason),
Message: ptr.To(portConflictMsg),
}

// Add first condition reason

serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond)

Expect(controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)).To(Equal(&cond))

// Add second condition reason

cond.Reason = ptr.To(controller.TypeConflictReason)
cond.Message = ptr.To(typeConflictMsg)

serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond)

actual := controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)
Expect(strings.Split(*actual.Reason, ",")).To(HaveExactElements(controller.PortConflictReason, controller.TypeConflictReason))
Expect(strings.Split(*actual.Message, "\n")).To(HaveExactElements(portConflictMsg, typeConflictMsg))
Expect(actual.Status).To(Equal(corev1.ConditionTrue))

// Update second condition message

typeConflictMsg = "The service types still conflict"
cond.Message = ptr.To(typeConflictMsg)

serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond)

actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)
Expect(strings.Split(*actual.Reason, ",")).To(HaveExactElements(controller.PortConflictReason, controller.TypeConflictReason))
Expect(strings.Split(*actual.Message, "\n")).To(HaveExactElements(portConflictMsg, typeConflictMsg))
Expect(actual.Status).To(Equal(corev1.ConditionTrue))

// Resolve first condition

cond.Reason = ptr.To(controller.PortConflictReason)
cond.Message = ptr.To("")
cond.Status = corev1.ConditionFalse

serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond)

actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)
Expect(*actual.Reason).To(Equal(controller.TypeConflictReason))
Expect(*actual.Message).To(Equal(typeConflictMsg))
Expect(actual.Status).To(Equal(corev1.ConditionTrue))

// Resolve second condition

cond.Reason = ptr.To(controller.TypeConflictReason)

for i := 1; i <= 2; i++ {
serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond)

actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)
Expect(actual.Status).To(Equal(corev1.ConditionFalse))
Expect(*actual.Reason).To(BeEmpty())
Expect(*actual.Message).To(BeEmpty())
}

// Add the first condition back

serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, mcsv1a1.ServiceExportCondition{
Type: mcsv1a1.ServiceExportConflict,
Status: corev1.ConditionTrue,
Reason: ptr.To(controller.PortConflictReason),
Message: ptr.To(""),
}, mcsv1a1.ServiceExportCondition{
Type: mcsv1a1.ServiceExportConflict,
Status: corev1.ConditionFalse,
Reason: ptr.To(controller.TypeConflictReason),
Message: ptr.To(""),
})

actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)
Expect(*actual.Reason).To(Equal(controller.PortConflictReason))
Expect(actual.Status).To(Equal(corev1.ConditionTrue))
})
})
})
7 changes: 4 additions & 3 deletions pkg/agent/controller/service_export_failures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/onsi/ginkgo/v2"
"github.com/submariner-io/admiral/pkg/fake"
"github.com/submariner-io/lighthouse/pkg/agent/controller"
"github.com/submariner-io/lighthouse/pkg/constants"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -67,7 +68,7 @@ var _ = Describe("Service export failures", func() {
})

It("should eventually export the service", func() {
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed"))
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason))

t.brokerServiceImportReactor.SetFailOnCreate(nil)
t.awaitNonHeadlessServiceExported(&t.cluster1)
Expand All @@ -80,7 +81,7 @@ var _ = Describe("Service export failures", func() {
})

It("should eventually export the service", func() {
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed"))
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason))

t.brokerServiceImportReactor.SetFailOnUpdate(nil)
t.awaitNonHeadlessServiceExported(&t.cluster1)
Expand Down Expand Up @@ -131,7 +132,7 @@ var _ = Describe("Service export failures", func() {
})

It("should eventually export the service", func() {
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed"))
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason))

t.brokerEndpointSliceReactor.SetFailOnList(nil)
t.awaitNonHeadlessServiceExported(&t.cluster1)
Expand Down
Loading

0 comments on commit 098cdcc

Please sign in to comment.