Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated backport of #2743: Use leader election in globalnet controller #2759

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions pkg/globalnet/controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -119,15 +118,8 @@ func newTestDriverBase() *testDriverBase {
Expect(submarinerv1.AddToScheme(t.scheme)).To(Succeed())
Expect(corev1.AddToScheme(t.scheme)).To(Succeed())

// TODO: Remove this workaround for https://github.com/kubernetes/client-go/issues/949 once
// admiral has been updated
t.scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "fake-dynamic-client-group", Version: "v1", Kind: "List"},
&unstructured.UnstructuredList{})

t.dynClient = fakeDynClient.NewDynamicClient(t.scheme)

t.watches = fakeDynClient.NewWatchReactor(&t.dynClient.Fake)

t.globalEgressIPs = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &submarinerv1.GlobalEgressIP{})).
Namespace(namespace)

Expand Down Expand Up @@ -323,17 +315,6 @@ func getGlobalEgressIPStatus(client dynamic.ResourceInterface, name string) *sub
return status
}

func awaitNoAllocatedIPs(client dynamic.ResourceInterface, name string) {
Consistently(func() int {
status := getGlobalEgressIPStatus(client, name)
if status == nil {
return 0
}

return len(status.AllocatedIPs)
}, 200*time.Millisecond).Should(Equal(0))
}

func (t *testDriverBase) awaitEgressIPStatus(client dynamic.ResourceInterface, name string, expNumIPS int, expCond ...metav1.Condition) {
t.awaitStatusConditions(client, name, expCond...)

Expand Down
189 changes: 164 additions & 25 deletions pkg/globalnet/controllers/gateway_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ limitations under the License.
package controllers

import (
"context"
"os"
"strings"
"sync/atomic"
"time"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
Expand All @@ -39,23 +41,55 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
k8snet "k8s.io/utils/net"
"k8s.io/utils/set"
)

func NewGatewayMonitor(spec Specification, localCIDRs []string, config *watcher.Config) (Interface, error) {
func NewGatewayMonitor(config *GatewayMonitorConfig) (Interface, error) {
// We'll panic if config is nil, this is intentional
gatewayMonitor := &gatewayMonitor{
baseController: newBaseController(),
spec: spec,
spec: config.Spec,
kubeClient: config.KubeClient,
isGatewayNode: atomic.Bool{},
localSubnets: set.New(localCIDRs...).UnsortedList(),
shuttingDown: atomic.Bool{},
localSubnets: set.New(config.LocalCIDRs...).UnsortedList(),
remoteSubnets: set.New[string](),
remoteEndpointTimeStamp: map[string]metav1.Time{},
leaderElectionInfo: atomic.Pointer[LeaderElectionInfo]{},
LeaderElectionConfig: config.LeaderElectionConfig,
}

// When transitioning to a non-gateway or shutting down, the GatewayMonitor cancels leader election which causes it to release the lock,
// which clears the HolderIdentity field and sets LeaseDuration to one second. This enables the next instance to quickly acquire the
// lock. However, if the controller instance crashes and doesn't properly release the lock, the next instance will have to await the
// LeaseDuration period before acquiring the lock. So we don't want LeaseDuration set too high, and we don't want it too low either to
// give the current instance enough time to complete stopping its controllers.
//
// The K8s leader election functionality periodically renews the lease. If it can't be renewed prior to the RenewDeadline, it stops.
// For our usage, we don't have instances concurrently vying for leadership, so we really don't need to keep renewing the lease. Ideally
// we would set the RenewDeadline very high to essentially disable it, but it needs to be less than the LeaseDuration setting which we
// don't want too high.

if gatewayMonitor.LeaseDuration == 0 {
gatewayMonitor.LeaseDuration = 20 * time.Second
}

if gatewayMonitor.RenewDeadline == 0 {
gatewayMonitor.RenewDeadline = 15 * time.Second
}

if gatewayMonitor.RetryPeriod == 0 {
gatewayMonitor.RetryPeriod = 2 * time.Second
}

gatewayMonitor.leaderElectionInfo.Store(&LeaderElectionInfo{})

var err error

gatewayMonitor.ipt, err = iptables.New()
Expand Down Expand Up @@ -88,11 +122,11 @@ func NewGatewayMonitor(spec Specification, localCIDRs []string, config *watcher.
OnUpdateFunc: gatewayMonitor.handleCreatedOrUpdatedEndpoint,
OnDeleteFunc: gatewayMonitor.handleRemovedEndpoint,
},
SourceNamespace: spec.Namespace,
SourceNamespace: config.Spec.Namespace,
},
}

gatewayMonitor.endpointWatcher, err = watcher.New(config)
gatewayMonitor.endpointWatcher, err = watcher.New(&config.Config)
if err != nil {
return nil, errors.Wrap(err, "error creating the Endpoint watcher")
}
Expand Down Expand Up @@ -134,15 +168,22 @@ func (g *gatewayMonitor) Start() error {
func (g *gatewayMonitor) Stop() {
logger.Info("GatewayMonitor stopping")

g.shuttingDown.Store(true)

g.baseController.Stop()

g.stopControllers()
// stopControllers should be pretty quick but put a deadline on it, so we don't block shutdown for a long time.

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

g.stopControllers(ctx, true)
}

func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, _ int) bool {
endpoint := obj.(*v1.Endpoint)

logger.V(log.DEBUG).Infof("In processNextEndpoint, endpoint info: %+v", endpoint)
logger.V(log.DEBUG).Infof("Gateway monitor informed of create/updated endpoint: %v", endpoint)

if endpoint.Spec.ClusterID != g.spec.ClusterID {
lastProcessedTime, ok := g.remoteEndpointTimeStamp[endpoint.Spec.ClusterID]
Expand Down Expand Up @@ -194,22 +235,17 @@ func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, _ in

// If the endpoint hostname matches with our hostname, it implies we are on gateway node
if endpoint.Spec.Hostname == hostname {
logger.V(log.DEBUG).Infof("Transitioned to gateway node with endpoint private IP %s", endpoint.Spec.PrivateIP)

configureTCPMTUProbe()

if g.isGatewayNode.CompareAndSwap(false, true) {
err := g.startControllers()
if err != nil {
logger.Fatalf("Error starting the controllers: %v", err)
}
}
} else {
logger.V(log.DEBUG).Infof("Transitioned to non-gateway node with endpoint private IP %s", endpoint.Spec.PrivateIP)
logger.Infof("Transitioned to gateway node %q with endpoint private IP %s", hostname, endpoint.Spec.PrivateIP)

if g.isGatewayNode.CompareAndSwap(true, false) {
g.stopControllers()
g.startLeaderElection()
}
} else if g.isGatewayNode.CompareAndSwap(true, false) {
logger.Infof("Transitioned to non-gateway node %q", endpoint.Spec.Hostname)

g.stopControllers(context.Background(), true)
}

return false
Expand All @@ -228,7 +264,7 @@ func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, _ int) bool {

delete(g.remoteEndpointTimeStamp, endpoint.Spec.ClusterID)

logger.V(log.DEBUG).Infof("Informed of removed endpoint for gateway monitor: %v", endpoint)
logger.V(log.DEBUG).Infof("Gateway monitor informed of removed endpoint: %v", endpoint)

hostname, err := os.Hostname()
if err != nil {
Expand All @@ -237,7 +273,9 @@ func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, _ int) bool {

if endpoint.Spec.Hostname == hostname && endpoint.Spec.ClusterID == g.spec.ClusterID {
if g.isGatewayNode.CompareAndSwap(true, false) {
g.stopControllers()
logger.Infof("Gateway node %q endpoint removed", hostname)

g.stopControllers(context.Background(), true)
}
} else if endpoint.Spec.ClusterID != g.spec.ClusterID {
// Endpoint associated with remote cluster is removed, delete the associated flows.
Expand All @@ -252,11 +290,81 @@ func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, _ int) bool {
return false
}

func (g *gatewayMonitor) startLeaderElection() {
if g.shuttingDown.Load() {
return
}

g.controllersMutex.Lock()
defer g.controllersMutex.Unlock()

// Usually when leadership is lost it's due to transition to a non-gateway however if it's due to renewal failure then we try to regain
// leadership in which case we'll still be on the gateway. Hence the isGatewayNode check here.
if !g.isGatewayNode.Load() {
return
}

ctx, stop := context.WithCancel(context.Background())

leaderElectionInfo := &LeaderElectionInfo{
stopFunc: stop,
stopped: make(chan struct{}),
}

g.leaderElectionInfo.Store(leaderElectionInfo)

logger.Info("On Gateway node - starting leader election")

lock, err := resourcelock.New(resourcelock.LeasesResourceLock, g.spec.Namespace, LeaderElectionLockName,
g.kubeClient.CoreV1(), g.kubeClient.CoordinationV1(), resourcelock.ResourceLockConfig{
Identity: g.nodeName + "-submariner-gateway",
})
utilruntime.Must(err)

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: g.LeaseDuration,
RenewDeadline: g.RenewDeadline,
RetryPeriod: g.RetryPeriod,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
err := g.startControllers() //nolint:contextcheck // Intentional to not pass context
if err != nil {
logger.Fatalf("Error starting the controllers: %v", err)
}
},
OnStoppedLeading: func() {
logger.Info("Leader election stopped")

close(leaderElectionInfo.stopped)

// We may have lost leadership due to failed renewal and not gateway node transition or shutdown, in which case we want to
// try to regain leadership. We also stop the controllers in case there's a gateway transition during the time that
// leadership is lost and a new instance is able to contact the API server to acquire the lock. This avoids a potential
// window where both instances are running should this instance finally observe the gateway transition. Note that we don't
// clear the globalnet chains to avoid datapath disruption should leadership be regained.
if !g.shuttingDown.Load() && g.isGatewayNode.Load() {
g.stopControllers(context.Background(), false)
g.startLeaderElection()
}
},
},
})
}

//nolint:gocyclo // Ignore cyclomatic complexity here
func (g *gatewayMonitor) startControllers() error {
g.controllersMutex.Lock()
defer g.controllersMutex.Unlock()

logger.Infof("On Gateway node - starting controllers")
// Since this is called asynchronously when leadership is gained, check that we're still on the gateway node and that we're
// not shutting down. Also, we may have regained leadership so ensure the controllers weren't already started.
if g.shuttingDown.Load() || !g.isGatewayNode.Load() || len(g.controllers) > 0 {
return nil
}

logger.Info("Starting controllers")

err := g.createGlobalnetChains()
if err != nil {
Expand Down Expand Up @@ -348,22 +456,35 @@ func (g *gatewayMonitor) startControllers() error {
}
}

logger.Infof("Successfully started the controllers")
logger.Info("Successfully started the controllers")

return nil
}

func (g *gatewayMonitor) stopControllers() {
func (g *gatewayMonitor) stopControllers(ctx context.Context, clearGlobalnetChains bool) {
g.controllersMutex.Lock()
defer g.controllersMutex.Unlock()

logger.Infof("Stopping %d controllers", len(g.controllers))

for _, c := range g.controllers {
c.Stop()
}

g.controllers = nil

g.clearGlobalnetChains()
if clearGlobalnetChains {
g.clearGlobalnetChains()
}

leaderElectionInfo := g.leaderElectionInfo.Swap(&LeaderElectionInfo{})

leaderElectionInfo.stop()

g.controllersMutex.Unlock()

leaderElectionInfo.awaitStopped(ctx)

logger.Info("Controllers stopped")
}

func (g *gatewayMonitor) createGlobalNetMarkingChain() error {
Expand Down Expand Up @@ -506,6 +627,24 @@ func (g *gatewayMonitor) markRemoteClusterTraffic(remoteCidr string, addRules bo
}
}

func (l *LeaderElectionInfo) stop() {
if l.stopFunc != nil {
l.stopFunc()
}
}

func (l *LeaderElectionInfo) awaitStopped(ctx context.Context) {
if l.stopped == nil {
return
}

select {
case <-l.stopped:
case <-ctx.Done():
logger.Warning("Timed out waiting for leader election to stop")
}
}

func configureTCPMTUProbe() {
// An mtuProbe value of 2 enables PLPMTUD. Along with this change, we also configure
// base mss to 1024 as per RFC4821 recommendation.
Expand Down
Loading
Loading