diff --git a/pkg/globalnet/controllers/controllers_suite_test.go b/pkg/globalnet/controllers/controllers_suite_test.go index 404ea0cf1..8926988d5 100644 --- a/pkg/globalnet/controllers/controllers_suite_test.go +++ b/pkg/globalnet/controllers/controllers_suite_test.go @@ -46,7 +46,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" @@ -119,15 +118,8 @@ func newTestDriverBase() *testDriverBase { Expect(submarinerv1.AddToScheme(t.scheme)).To(Succeed()) Expect(corev1.AddToScheme(t.scheme)).To(Succeed()) - // TODO: Remove this workaround for https://github.com/kubernetes/client-go/issues/949 once - // admiral has been updated - t.scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "fake-dynamic-client-group", Version: "v1", Kind: "List"}, - &unstructured.UnstructuredList{}) - t.dynClient = fakeDynClient.NewDynamicClient(t.scheme) - t.watches = fakeDynClient.NewWatchReactor(&t.dynClient.Fake) - t.globalEgressIPs = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &submarinerv1.GlobalEgressIP{})). Namespace(namespace) @@ -323,17 +315,6 @@ func getGlobalEgressIPStatus(client dynamic.ResourceInterface, name string) *sub return status } -func awaitNoAllocatedIPs(client dynamic.ResourceInterface, name string) { - Consistently(func() int { - status := getGlobalEgressIPStatus(client, name) - if status == nil { - return 0 - } - - return len(status.AllocatedIPs) - }, 200*time.Millisecond).Should(Equal(0)) -} - func (t *testDriverBase) awaitEgressIPStatus(client dynamic.ResourceInterface, name string, expNumIPS int, expCond ...metav1.Condition) { t.awaitStatusConditions(client, name, expCond...) diff --git a/pkg/globalnet/controllers/gateway_monitor.go b/pkg/globalnet/controllers/gateway_monitor.go index 54b0db068..cfb8813c0 100644 --- a/pkg/globalnet/controllers/gateway_monitor.go +++ b/pkg/globalnet/controllers/gateway_monitor.go @@ -19,9 +19,11 @@ limitations under the License. package controllers import ( + "context" "os" "strings" "sync/atomic" + "time" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" @@ -39,23 +41,55 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" k8snet "k8s.io/utils/net" "k8s.io/utils/set" ) -func NewGatewayMonitor(spec Specification, localCIDRs []string, config *watcher.Config) (Interface, error) { +func NewGatewayMonitor(config *GatewayMonitorConfig) (Interface, error) { // We'll panic if config is nil, this is intentional gatewayMonitor := &gatewayMonitor{ baseController: newBaseController(), - spec: spec, + spec: config.Spec, + kubeClient: config.KubeClient, isGatewayNode: atomic.Bool{}, - localSubnets: set.New(localCIDRs...).UnsortedList(), + shuttingDown: atomic.Bool{}, + localSubnets: set.New(config.LocalCIDRs...).UnsortedList(), remoteSubnets: set.New[string](), remoteEndpointTimeStamp: map[string]metav1.Time{}, + leaderElectionInfo: atomic.Pointer[LeaderElectionInfo]{}, + LeaderElectionConfig: config.LeaderElectionConfig, } + // When transitioning to a non-gateway or shutting down, the GatewayMonitor cancels leader election which causes it to release the lock, + // which clears the HolderIdentity field and sets LeaseDuration to one second. This enables the next instance to quickly acquire the + // lock. However, if the controller instance crashes and doesn't properly release the lock, the next instance will have to await the + // LeaseDuration period before acquiring the lock. So we don't want LeaseDuration set too high, and we don't want it too low either to + // give the current instance enough time to complete stopping its controllers. + // + // The K8s leader election functionality periodically renews the lease. If it can't be renewed prior to the RenewDeadline, it stops. + // For our usage, we don't have instances concurrently vying for leadership, so we really don't need to keep renewing the lease. Ideally + // we would set the RenewDeadline very high to essentially disable it, but it needs to be less than the LeaseDuration setting which we + // don't want too high. + + if gatewayMonitor.LeaseDuration == 0 { + gatewayMonitor.LeaseDuration = 20 * time.Second + } + + if gatewayMonitor.RenewDeadline == 0 { + gatewayMonitor.RenewDeadline = 15 * time.Second + } + + if gatewayMonitor.RetryPeriod == 0 { + gatewayMonitor.RetryPeriod = 2 * time.Second + } + + gatewayMonitor.leaderElectionInfo.Store(&LeaderElectionInfo{}) + var err error gatewayMonitor.ipt, err = iptables.New() @@ -88,11 +122,11 @@ func NewGatewayMonitor(spec Specification, localCIDRs []string, config *watcher. OnUpdateFunc: gatewayMonitor.handleCreatedOrUpdatedEndpoint, OnDeleteFunc: gatewayMonitor.handleRemovedEndpoint, }, - SourceNamespace: spec.Namespace, + SourceNamespace: config.Spec.Namespace, }, } - gatewayMonitor.endpointWatcher, err = watcher.New(config) + gatewayMonitor.endpointWatcher, err = watcher.New(&config.Config) if err != nil { return nil, errors.Wrap(err, "error creating the Endpoint watcher") } @@ -134,15 +168,22 @@ func (g *gatewayMonitor) Start() error { func (g *gatewayMonitor) Stop() { logger.Info("GatewayMonitor stopping") + g.shuttingDown.Store(true) + g.baseController.Stop() - g.stopControllers() + // stopControllers should be pretty quick but put a deadline on it, so we don't block shutdown for a long time. + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + g.stopControllers(ctx, true) } func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, _ int) bool { endpoint := obj.(*v1.Endpoint) - logger.V(log.DEBUG).Infof("In processNextEndpoint, endpoint info: %+v", endpoint) + logger.V(log.DEBUG).Infof("Gateway monitor informed of create/updated endpoint: %v", endpoint) if endpoint.Spec.ClusterID != g.spec.ClusterID { lastProcessedTime, ok := g.remoteEndpointTimeStamp[endpoint.Spec.ClusterID] @@ -194,22 +235,17 @@ func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, _ in // If the endpoint hostname matches with our hostname, it implies we are on gateway node if endpoint.Spec.Hostname == hostname { - logger.V(log.DEBUG).Infof("Transitioned to gateway node with endpoint private IP %s", endpoint.Spec.PrivateIP) - configureTCPMTUProbe() if g.isGatewayNode.CompareAndSwap(false, true) { - err := g.startControllers() - if err != nil { - logger.Fatalf("Error starting the controllers: %v", err) - } - } - } else { - logger.V(log.DEBUG).Infof("Transitioned to non-gateway node with endpoint private IP %s", endpoint.Spec.PrivateIP) + logger.Infof("Transitioned to gateway node %q with endpoint private IP %s", hostname, endpoint.Spec.PrivateIP) - if g.isGatewayNode.CompareAndSwap(true, false) { - g.stopControllers() + g.startLeaderElection() } + } else if g.isGatewayNode.CompareAndSwap(true, false) { + logger.Infof("Transitioned to non-gateway node %q", endpoint.Spec.Hostname) + + g.stopControllers(context.Background(), true) } return false @@ -228,7 +264,7 @@ func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, _ int) bool { delete(g.remoteEndpointTimeStamp, endpoint.Spec.ClusterID) - logger.V(log.DEBUG).Infof("Informed of removed endpoint for gateway monitor: %v", endpoint) + logger.V(log.DEBUG).Infof("Gateway monitor informed of removed endpoint: %v", endpoint) hostname, err := os.Hostname() if err != nil { @@ -237,7 +273,9 @@ func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, _ int) bool { if endpoint.Spec.Hostname == hostname && endpoint.Spec.ClusterID == g.spec.ClusterID { if g.isGatewayNode.CompareAndSwap(true, false) { - g.stopControllers() + logger.Infof("Gateway node %q endpoint removed", hostname) + + g.stopControllers(context.Background(), true) } } else if endpoint.Spec.ClusterID != g.spec.ClusterID { // Endpoint associated with remote cluster is removed, delete the associated flows. @@ -252,11 +290,81 @@ func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, _ int) bool { return false } +func (g *gatewayMonitor) startLeaderElection() { + if g.shuttingDown.Load() { + return + } + + g.controllersMutex.Lock() + defer g.controllersMutex.Unlock() + + // Usually when leadership is lost it's due to transition to a non-gateway however if it's due to renewal failure then we try to regain + // leadership in which case we'll still be on the gateway. Hence the isGatewayNode check here. + if !g.isGatewayNode.Load() { + return + } + + ctx, stop := context.WithCancel(context.Background()) + + leaderElectionInfo := &LeaderElectionInfo{ + stopFunc: stop, + stopped: make(chan struct{}), + } + + g.leaderElectionInfo.Store(leaderElectionInfo) + + logger.Info("On Gateway node - starting leader election") + + lock, err := resourcelock.New(resourcelock.LeasesResourceLock, g.spec.Namespace, LeaderElectionLockName, + g.kubeClient.CoreV1(), g.kubeClient.CoordinationV1(), resourcelock.ResourceLockConfig{ + Identity: g.nodeName + "-submariner-gateway", + }) + utilruntime.Must(err) + + go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: g.LeaseDuration, + RenewDeadline: g.RenewDeadline, + RetryPeriod: g.RetryPeriod, + ReleaseOnCancel: true, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + err := g.startControllers() //nolint:contextcheck // Intentional to not pass context + if err != nil { + logger.Fatalf("Error starting the controllers: %v", err) + } + }, + OnStoppedLeading: func() { + logger.Info("Leader election stopped") + + close(leaderElectionInfo.stopped) + + // We may have lost leadership due to failed renewal and not gateway node transition or shutdown, in which case we want to + // try to regain leadership. We also stop the controllers in case there's a gateway transition during the time that + // leadership is lost and a new instance is able to contact the API server to acquire the lock. This avoids a potential + // window where both instances are running should this instance finally observe the gateway transition. Note that we don't + // clear the globalnet chains to avoid datapath disruption should leadership be regained. + if !g.shuttingDown.Load() && g.isGatewayNode.Load() { + g.stopControllers(context.Background(), false) + g.startLeaderElection() + } + }, + }, + }) +} + +//nolint:gocyclo // Ignore cyclomatic complexity here func (g *gatewayMonitor) startControllers() error { g.controllersMutex.Lock() defer g.controllersMutex.Unlock() - logger.Infof("On Gateway node - starting controllers") + // Since this is called asynchronously when leadership is gained, check that we're still on the gateway node and that we're + // not shutting down. Also, we may have regained leadership so ensure the controllers weren't already started. + if g.shuttingDown.Load() || !g.isGatewayNode.Load() || len(g.controllers) > 0 { + return nil + } + + logger.Info("Starting controllers") err := g.createGlobalnetChains() if err != nil { @@ -348,14 +456,15 @@ func (g *gatewayMonitor) startControllers() error { } } - logger.Infof("Successfully started the controllers") + logger.Info("Successfully started the controllers") return nil } -func (g *gatewayMonitor) stopControllers() { +func (g *gatewayMonitor) stopControllers(ctx context.Context, clearGlobalnetChains bool) { g.controllersMutex.Lock() - defer g.controllersMutex.Unlock() + + logger.Infof("Stopping %d controllers", len(g.controllers)) for _, c := range g.controllers { c.Stop() @@ -363,7 +472,19 @@ func (g *gatewayMonitor) stopControllers() { g.controllers = nil - g.clearGlobalnetChains() + if clearGlobalnetChains { + g.clearGlobalnetChains() + } + + leaderElectionInfo := g.leaderElectionInfo.Swap(&LeaderElectionInfo{}) + + leaderElectionInfo.stop() + + g.controllersMutex.Unlock() + + leaderElectionInfo.awaitStopped(ctx) + + logger.Info("Controllers stopped") } func (g *gatewayMonitor) createGlobalNetMarkingChain() error { @@ -506,6 +627,24 @@ func (g *gatewayMonitor) markRemoteClusterTraffic(remoteCidr string, addRules bo } } +func (l *LeaderElectionInfo) stop() { + if l.stopFunc != nil { + l.stopFunc() + } +} + +func (l *LeaderElectionInfo) awaitStopped(ctx context.Context) { + if l.stopped == nil { + return + } + + select { + case <-l.stopped: + case <-ctx.Done(): + logger.Warning("Timed out waiting for leader election to stop") + } +} + func configureTCPMTUProbe() { // An mtuProbe value of 2 enables PLPMTUD. Along with this change, we also configure // base mss to 1024 as per RFC4821 recommendation. diff --git a/pkg/globalnet/controllers/gateway_monitor_test.go b/pkg/globalnet/controllers/gateway_monitor_test.go index 278c652b3..b56f8ae24 100644 --- a/pkg/globalnet/controllers/gateway_monitor_test.go +++ b/pkg/globalnet/controllers/gateway_monitor_test.go @@ -26,14 +26,20 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/fake" "github.com/submariner-io/admiral/pkg/syncer/test" + testutil "github.com/submariner-io/admiral/pkg/test" "github.com/submariner-io/admiral/pkg/watcher" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/globalnet/constants" "github.com/submariner-io/submariner/pkg/globalnet/controllers" routeAgent "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/leaderelection/resourcelock" ) const ( @@ -46,24 +52,17 @@ const ( var _ = Describe("Endpoint monitoring", func() { t := newGatewayMonitorTestDriver() - var endpointName string + var endpoint *submarinerv1.Endpoint - When("a local Endpoint is created", func() { + When("a local gateway Endpoint corresponding to the controller host is created", func() { JustBeforeEach(func() { t.createNode(nodeName, "", "") - endpointName = t.createEndpoint(newEndpointSpec(clusterID, t.hostName, localCIDR)) + endpoint = t.createEndpoint(newEndpointSpec(clusterID, t.hostName, localCIDR)) t.createIPTableChain("nat", kubeProxyIPTableChainName) }) - It("should add the appropriate IP table chains", func() { - t.ipt.AwaitChain("nat", constants.SmGlobalnetIngressChain) - t.ipt.AwaitChain("nat", constants.SmGlobalnetEgressChain) - t.ipt.AwaitChain("nat", routeAgent.SmPostRoutingChain) - t.ipt.AwaitChain("nat", constants.SmGlobalnetMarkChain) - }) - It("should start the controllers", func() { - t.awaitClusterGlobalEgressIPStatusAllocated(controllers.DefaultNumberOfClusterEgressIPs) + t.awaitControllersStarted() t.createGlobalEgressIP(newGlobalEgressIP(globalEgressIPName, nil, nil)) t.awaitGlobalEgressIPStatusAllocated(globalEgressIPName, 1) @@ -80,38 +79,149 @@ var _ = Describe("Endpoint monitoring", func() { t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name) }) - Context("and then removed", func() { + Context("and then deleted and recreated", func() { + JustBeforeEach(func() { + t.awaitControllersStarted() + + By("Deleting the Endpoint") + + Expect(t.endpoints.Delete(context.TODO(), endpoint.Name, metav1.DeleteOptions{})).To(Succeed()) + }) + + It("should stop and restart the controllers", func() { + t.awaitLeaderLockReleased() + t.awaitNoGlobalnetChains() + t.ensureControllersStopped() + + By("Recreating the Endpoint") + + time.Sleep(time.Millisecond * 300) + t.createEndpoint(newEndpointSpec(clusterID, t.hostName, localCIDR)) + + t.awaitLeaderLockAcquired() + t.awaitGlobalnetChains() + + t.awaitIngressIPStatusAllocated(serviceName) + }) + }) + + Context("and then updated", func() { + BeforeEach(func() { + t.leaderElectionConfig.LeaseDuration = time.Hour * 3 + t.leaderElectionConfig.RenewDeadline = time.Hour * 2 + t.leaderElectionConfig.RetryPeriod = time.Hour + }) + JustBeforeEach(func() { - t.awaitClusterGlobalEgressIPStatusAllocated(controllers.DefaultNumberOfClusterEgressIPs) + t.awaitControllersStarted() + + // Since the RenewDeadline and RetryPeriod are set very high and the leader lock has been acquired, leader election should + // not try to renew the leader lock at this point, but we'll wait a bit more just in case to give it plenty of time. After + // that and after we update the Endpoint below, any updates to the leader lock means it tried to re-acquire it. + time.Sleep(time.Millisecond * 500) + t.kubeClient.ClearActions() - Expect(t.endpoints.Delete(context.TODO(), endpointName, metav1.DeleteOptions{})).To(Succeed()) + By("Updating the Endpoint") + + endpoint.Annotations = map[string]string{"foo": "bar"} + test.UpdateResource(t.endpoints, endpoint) + }) + + It("should not try to re-acquire the leader lock", func() { + testutil.EnsureNoActionsForResource(&t.kubeClient.Fake, "leases", "update") }) + }) + + Context("and then a local gateway Endpoint corresponding to another host is created", func() { + JustBeforeEach(func() { + t.awaitControllersStarted() + + By("Creating other Endpoint") - It("should remove the appropriate IP table chains", func() { - t.ipt.AwaitNoChain("nat", constants.SmGlobalnetIngressChain) - t.ipt.AwaitNoChain("nat", constants.SmGlobalnetEgressChain) - t.ipt.AwaitNoChain("nat", constants.SmGlobalnetMarkChain) + t.createEndpoint(newEndpointSpec(clusterID, t.hostName+"-other", localCIDR)) }) It("should stop the controllers", func() { - t.ipt.AwaitNoChain("nat", constants.SmGlobalnetMarkChain) + t.awaitLeaderLockReleased() + t.awaitNoGlobalnetChains() + t.ensureControllersStopped() + }) + }) + + Context("and then renewal of the leader lock fails", func() { + var leasesReactor *fake.FailOnActionReactor + + BeforeEach(func() { + t.leaderElectionConfig.RenewDeadline = time.Millisecond * 200 + t.leaderElectionConfig.RetryPeriod = time.Millisecond * 20 + + leasesReactor = fake.FailOnAction(&t.kubeClient.Fake, "leases", "update", nil, false) + leasesReactor.Fail(false) + }) + + JustBeforeEach(func() { + t.awaitControllersStarted() + + By("Setting leases resource updates to fail") + + leasesReactor.Fail(true) + + // Wait enough time for the renewal deadline to be reached + time.Sleep(t.leaderElectionConfig.RenewDeadline + 100) + + By("Ensuring controllers are stopped and globalnet chains are not cleared") + + t.ensureControllersStopped() + t.awaitGlobalnetChains() + }) + + It("should re-acquire the leader lock after the failure is cleared", func() { + now := metav1.NewTime(time.Now()) + + By("Setting leases resource updates to succeed") + + leasesReactor.Fail(false) + + By("Ensuring lease was renewed") + + Eventually(func() int64 { + return t.getLeaderElectionRecord().RenewTime.UnixNano() + }).Should(BeNumerically(">=", now.UnixNano()), "Lease was not renewed") + + t.awaitIngressIPStatusAllocated(serviceName) + }) + + Context("and then the gateway Endpoint is deleted", func() { + It("should clear the globalnet chains", func() { + By("Deleting the Endpoint") - time.Sleep(300 * time.Millisecond) - t.createGlobalEgressIP(newGlobalEgressIP(globalEgressIPName, nil, nil)) - awaitNoAllocatedIPs(t.globalEgressIPs, globalEgressIPName) + Expect(t.endpoints.Delete(context.TODO(), endpoint.Name, metav1.DeleteOptions{})).To(Succeed()) - t.createServiceExport(t.createService(newClusterIPService())) - t.ensureNoGlobalIngressIP(serviceName) + t.awaitNoGlobalnetChains() + }) }) }) }) + Context("and a local gateway Endpoint corresponding to another host is created", func() { + JustBeforeEach(func() { + endpoint = t.createEndpoint(newEndpointSpec(clusterID, t.hostName+"-other", localCIDR)) + }) + + It("should not start the controllers", func() { + t.ensureLeaderLockNotAcquired() + + t.createServiceExport(t.createService(newClusterIPService())) + t.ensureNoGlobalIngressIP(serviceName) + }) + }) + When("a remote Endpoint with non-overlapping CIDRs is created then removed", func() { It("should add/remove appropriate IP table rule(s)", func() { - endpointName := t.createEndpoint(newEndpointSpec(remoteClusterID, t.hostName, remoteCIDR)) + endpoint := t.createEndpoint(newEndpointSpec(remoteClusterID, t.hostName, remoteCIDR)) t.ipt.AwaitRule("nat", constants.SmGlobalnetMarkChain, ContainSubstring(remoteCIDR)) - Expect(t.endpoints.Delete(context.TODO(), endpointName, metav1.DeleteOptions{})).To(Succeed()) + Expect(t.endpoints.Delete(context.TODO(), endpoint.Name, metav1.DeleteOptions{})).To(Succeed()) t.ipt.AwaitNoRule("nat", constants.SmGlobalnetMarkChain, ContainSubstring(remoteCIDR)) }) }) @@ -127,8 +237,10 @@ var _ = Describe("Endpoint monitoring", func() { type gatewayMonitorTestDriver struct { *testDriverBase - endpoints dynamic.ResourceInterface - hostName string + endpoints dynamic.ResourceInterface + hostName string + kubeClient *k8sfake.Clientset + leaderElectionConfig controllers.LeaderElectionConfig } func newGatewayMonitorTestDriver() *gatewayMonitorTestDriver { @@ -139,6 +251,10 @@ func newGatewayMonitorTestDriver() *gatewayMonitorTestDriver { t.endpoints = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &submarinerv1.Endpoint{})). Namespace(namespace) + + t.kubeClient = k8sfake.NewSimpleClientset() + + t.leaderElectionConfig = controllers.LeaderElectionConfig{} }) JustBeforeEach(func() { @@ -160,14 +276,20 @@ func (t *gatewayMonitorTestDriver) start() { t.hostName, err = os.Hostname() Expect(err).To(Succeed()) - t.controller, err = controllers.NewGatewayMonitor(controllers.Specification{ - ClusterID: clusterID, - Namespace: namespace, - GlobalCIDR: []string{localCIDR}, - }, localSubnets, &watcher.Config{ - RestMapper: t.restMapper, - Client: t.dynClient, - Scheme: t.scheme, + t.controller, err = controllers.NewGatewayMonitor(&controllers.GatewayMonitorConfig{ + Config: watcher.Config{ + RestMapper: t.restMapper, + Client: t.dynClient, + Scheme: t.scheme, + }, + Spec: controllers.Specification{ + ClusterID: clusterID, + Namespace: namespace, + GlobalCIDR: []string{localCIDR}, + }, + LocalCIDRs: localSubnets, + KubeClient: t.kubeClient, + LeaderElectionConfig: t.leaderElectionConfig, }) Expect(err).To(Succeed()) @@ -176,25 +298,96 @@ func (t *gatewayMonitorTestDriver) start() { t.ipt.AwaitChain("nat", constants.SmGlobalnetMarkChain) } -func (t *gatewayMonitorTestDriver) createEndpoint(spec *submarinerv1.EndpointSpec) string { +func (t *gatewayMonitorTestDriver) createEndpoint(spec *submarinerv1.EndpointSpec) *submarinerv1.Endpoint { endpointName, err := spec.GenerateName() Expect(err).To(Succeed()) - test.CreateResource(t.endpoints, &submarinerv1.Endpoint{ + endpoint := &submarinerv1.Endpoint{ ObjectMeta: metav1.ObjectMeta{ Name: endpointName, }, Spec: *spec, - }) + } + + obj := test.CreateResource(t.endpoints, endpoint) + + err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, endpoint) + Expect(err).To(Succeed()) + + return endpoint +} + +func (t *gatewayMonitorTestDriver) getLeaderElectionRecord() *resourcelock.LeaderElectionRecord { + lock, err := resourcelock.New(resourcelock.LeasesResourceLock, namespace, controllers.LeaderElectionLockName, + t.kubeClient.CoreV1(), t.kubeClient.CoordinationV1(), resourcelock.ResourceLockConfig{}) + Expect(err).To(Succeed()) + + le, _, err := lock.Get(context.Background()) + if apierrors.IsNotFound(err) { + return nil + } + + Expect(err).To(Succeed()) + + return le +} + +func (t *gatewayMonitorTestDriver) awaitLeaderLockAcquired() { + Eventually(func() string { + le := t.getLeaderElectionRecord() + if le == nil { + return "" + } + + return le.HolderIdentity + }, 3).ShouldNot(BeEmpty(), "Leader lock was not acquired") +} + +func (t *gatewayMonitorTestDriver) ensureLeaderLockNotAcquired() { + Consistently(func() any { + return t.getLeaderElectionRecord() + }, 300*time.Millisecond).Should(BeNil(), "Leader lock was acquired") +} + +func (t *gatewayMonitorTestDriver) awaitLeaderLockReleased() { + Eventually(func() string { + le := t.getLeaderElectionRecord() + Expect(le).ToNot(BeNil(), "LeaderElectionRecord not found") + + return le.HolderIdentity + }, 3).Should(BeEmpty(), "Leader lock was not released") +} + +func (t *gatewayMonitorTestDriver) awaitControllersStarted() { + t.awaitLeaderLockAcquired() + t.awaitGlobalnetChains() + t.awaitClusterGlobalEgressIPStatusAllocated(controllers.DefaultNumberOfClusterEgressIPs) +} + +func (t *gatewayMonitorTestDriver) ensureControllersStopped() { + time.Sleep(300 * time.Millisecond) + t.createServiceExport(t.createService(newClusterIPService())) + t.ensureNoGlobalIngressIP(serviceName) +} + +func (t *gatewayMonitorTestDriver) awaitGlobalnetChains() { + t.ipt.AwaitChain("nat", constants.SmGlobalnetIngressChain) + t.ipt.AwaitChain("nat", constants.SmGlobalnetEgressChain) + t.ipt.AwaitChain("nat", routeAgent.SmPostRoutingChain) + t.ipt.AwaitChain("nat", constants.SmGlobalnetMarkChain) +} - return endpointName +func (t *gatewayMonitorTestDriver) awaitNoGlobalnetChains() { + t.ipt.AwaitNoChain("nat", constants.SmGlobalnetIngressChain) + t.ipt.AwaitNoChain("nat", constants.SmGlobalnetEgressChain) + t.ipt.AwaitNoChain("nat", constants.SmGlobalnetMarkChain) } func newEndpointSpec(clusterID, hostname, subnet string) *submarinerv1.EndpointSpec { return &submarinerv1.EndpointSpec{ - CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-2", clusterID), + CableName: fmt.Sprintf("submariner-cable-%s-%s", clusterID, hostname), ClusterID: clusterID, - PrivateIP: "192-68-1-2", + PrivateIP: "192.68.1.2", Hostname: hostname, Subnets: []string{subnet}, } diff --git a/pkg/globalnet/controllers/global_egressip_controller_test.go b/pkg/globalnet/controllers/global_egressip_controller_test.go index f6848c0ec..45e9442b1 100644 --- a/pkg/globalnet/controllers/global_egressip_controller_test.go +++ b/pkg/globalnet/controllers/global_egressip_controller_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + fakeDynClient "github.com/submariner-io/admiral/pkg/fake" "github.com/submariner-io/admiral/pkg/syncer" "github.com/submariner-io/admiral/pkg/syncer/test" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" @@ -592,6 +593,8 @@ func newGlobalEgressIPControllerTestDriver() *globalEgressIPControllerTestDriver t.pool, err = ipam.NewIPPool(t.globalCIDR) Expect(err).To(Succeed()) + + t.watches = fakeDynClient.NewWatchReactor(&t.dynClient.Fake) }) JustBeforeEach(func() { diff --git a/pkg/globalnet/controllers/types.go b/pkg/globalnet/controllers/types.go index 0dc889f21..597a35f06 100644 --- a/pkg/globalnet/controllers/types.go +++ b/pkg/globalnet/controllers/types.go @@ -19,8 +19,10 @@ limitations under the License. package controllers import ( + "context" "sync" "sync/atomic" + "time" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/syncer" @@ -32,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" "k8s.io/utils/set" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -70,6 +73,8 @@ const ( DeleteRules = false DefaultNumberOfClusterEgressIPs = 8 + + LeaderElectionLockName = "submariner-globalnet-lock" ) type Interface interface { @@ -85,18 +90,41 @@ type Specification struct { Uninstall bool } +type LeaderElectionConfig struct { + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration +} + +type GatewayMonitorConfig struct { + watcher.Config + LeaderElectionConfig + Spec Specification + LocalCIDRs []string + KubeClient kubernetes.Interface +} + type baseController struct { stopCh chan struct{} } +type LeaderElectionInfo struct { + stopFunc context.CancelFunc + stopped chan struct{} +} + type gatewayMonitor struct { *baseController + LeaderElectionConfig syncerConfig *syncer.ResourceSyncerConfig endpointWatcher watcher.Interface + kubeClient kubernetes.Interface remoteEndpointTimeStamp map[string]metav1.Time spec Specification ipt iptables.Interface isGatewayNode atomic.Bool + shuttingDown atomic.Bool + leaderElectionInfo atomic.Pointer[LeaderElectionInfo] nodeName string localSubnets []string remoteSubnets set.Set[string] diff --git a/pkg/globalnet/main.go b/pkg/globalnet/main.go index 9830f5587..8de260762 100644 --- a/pkg/globalnet/main.go +++ b/pkg/globalnet/main.go @@ -39,6 +39,7 @@ import ( "github.com/submariner-io/submariner/pkg/globalnet/controllers" "github.com/submariner-io/submariner/pkg/versions" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -121,9 +122,16 @@ func main() { logger.Fatalf("Cluster %s is not configured to use globalCidr", spec.ClusterID) } - gatewayMonitor, err := controllers.NewGatewayMonitor(spec, append(cidr.ExtractIPv4Subnets(localCluster.Spec.ClusterCIDR), - cidr.ExtractIPv4Subnets(localCluster.Spec.ServiceCIDR)...), - &watcher.Config{RestConfig: cfg}) + k8sClient, err := kubernetes.NewForConfig(cfg) + logger.FatalOnError(err, "Error creating Kubernetes clientset") + + gatewayMonitor, err := controllers.NewGatewayMonitor(&controllers.GatewayMonitorConfig{ + Config: watcher.Config{RestConfig: cfg}, + Spec: spec, + LocalCIDRs: append(cidr.ExtractIPv4Subnets(localCluster.Spec.ClusterCIDR), + cidr.ExtractIPv4Subnets(localCluster.Spec.ServiceCIDR)...), + KubeClient: k8sClient, + }) logger.FatalOnError(err, "Error creating gatewayMonitor") err = gatewayMonitor.Start()