diff --git a/go.mod b/go.mod index 8ee3a1feb..4ecbabcff 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,8 @@ require ( github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637 github.com/prometheus-community/pro-bing v0.3.0 github.com/prometheus/client_golang v1.16.0 - github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c - github.com/submariner-io/shipyard v0.16.1-0.20231025063959-1e68200a75e6 + github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca + github.com/submariner-io/shipyard v0.16.1-0.20231031115937-e306910cd742 github.com/uw-labs/lichen v0.1.7 github.com/vishvananda/netlink v1.2.1-beta.2 golang.org/x/sys v0.13.0 diff --git a/go.sum b/go.sum index d7b64b818..4fb3d75e3 100644 --- a/go.sum +++ b/go.sum @@ -506,10 +506,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c h1:zy5mZZrB885JAuLPqpb/RoGhtd9N9tUCFE5OGAZEzWw= -github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c/go.mod h1:GP0TCJkt444r2ONKVHKBbSPaKjJb0S5Qj0MyNUl2keQ= -github.com/submariner-io/shipyard v0.16.1-0.20231025063959-1e68200a75e6 h1:UbExEBgaVP9q4XTNYlw3DnMb/RWXLSFdxWedxfstoN0= -github.com/submariner-io/shipyard v0.16.1-0.20231025063959-1e68200a75e6/go.mod h1:op6JLbiJv3dteWxde6GpG5zp0oljZWVDU1FXmB1j6cs= +github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca h1:O3gUAdYldm4LQJD/IpqaFcOlJVf9ikM+uBWsfVADmjg= +github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca/go.mod h1:GP0TCJkt444r2ONKVHKBbSPaKjJb0S5Qj0MyNUl2keQ= +github.com/submariner-io/shipyard v0.16.1-0.20231031115937-e306910cd742 h1:IxgmM0WOPYnYkBc1eNQdiMu9ozGax8FV0PYhHFHPbuE= +github.com/submariner-io/shipyard v0.16.1-0.20231031115937-e306910cd742/go.mod h1:op6JLbiJv3dteWxde6GpG5zp0oljZWVDU1FXmB1j6cs= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/main.go b/main.go index 5d1a3a648..c713402b0 100644 --- a/main.go +++ b/main.go @@ -144,9 +144,12 @@ func main() { }) logger.FatalOnError(err, "Error creating gateway instance") - err = gw.Run(signals.SetupSignalHandler().Done()) + err = gw.Run(signals.SetupSignalHandler()) - if err := httpServer.Shutdown(context.Background()); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + if err := httpServer.Shutdown(ctx); err != nil { logger.Errorf(err, "Error shutting down metrics HTTP server") } diff --git a/pkg/cableengine/syncer/syncer.go b/pkg/cableengine/syncer/syncer.go index 5d0df3407..3959e877f 100644 --- a/pkg/cableengine/syncer/syncer.go +++ b/pkg/cableengine/syncer/syncer.go @@ -84,7 +84,7 @@ func NewGatewaySyncer(engine cableengine.Engine, client v1typed.GatewayInterface func (gs *GatewaySyncer) Run(stopCh <-chan struct{}) { wait.Until(gs.syncGatewayStatus, GatewayUpdateInterval, stopCh) - gs.CleanupGatewayEntry() + gs.CleanupGatewayEntry(context.Background()) logger.Info("CableEngine syncer stopped") } @@ -93,15 +93,15 @@ func (gs *GatewaySyncer) syncGatewayStatus() { gs.mutex.Lock() defer gs.mutex.Unlock() - gs.syncGatewayStatusSafe() + gs.syncGatewayStatusSafe(context.Background()) } -func (gs *GatewaySyncer) SetGatewayStatusError(err error) { +func (gs *GatewaySyncer) SetGatewayStatusError(ctx context.Context, err error) { gs.mutex.Lock() defer gs.mutex.Unlock() gs.statusError = err - gs.syncGatewayStatusSafe() + gs.syncGatewayStatusSafe(ctx) } func (gs *GatewaySyncer) gatewayResourceInterface() resource.Interface { @@ -122,13 +122,13 @@ func (gs *GatewaySyncer) gatewayResourceInterface() resource.Interface { } } -func (gs *GatewaySyncer) syncGatewayStatusSafe() { +func (gs *GatewaySyncer) syncGatewayStatusSafe(ctx context.Context) { logger.V(log.TRACE).Info("Running Gateway status sync") gatewaySyncIterations.Inc() gatewayObj := gs.generateGatewayObject() - result, err := util.CreateOrUpdate(context.TODO(), gs.gatewayResourceInterface(), gatewayObj, + result, err := util.CreateOrUpdate(ctx, gs.gatewayResourceInterface(), gatewayObj, func(existing runtime.Object) (runtime.Object, error) { existingGw := existing.(*v1.Gateway) existingGw.Status = gatewayObj.Status @@ -150,15 +150,15 @@ func (gs *GatewaySyncer) syncGatewayStatusSafe() { } if gatewayObj.Status.HAStatus == v1.HAStatusActive { - err := gs.cleanupStaleGatewayEntries(gatewayObj.Name) + err := gs.cleanupStaleGatewayEntries(ctx, gatewayObj.Name) if err != nil { utilruntime.HandleError(fmt.Errorf("error cleaning up stale gateway entries: %w", err)) } } } -func (gs *GatewaySyncer) cleanupStaleGatewayEntries(localGatewayName string) error { - gateways, err := gs.client.List(context.TODO(), metav1.ListOptions{}) +func (gs *GatewaySyncer) cleanupStaleGatewayEntries(ctx context.Context, localGatewayName string) error { + gateways, err := gs.client.List(ctx, metav1.ListOptions{}) if err != nil { return errors.Wrap(err, "error listing Gateways") } @@ -176,7 +176,7 @@ func (gs *GatewaySyncer) cleanupStaleGatewayEntries(localGatewayName string) err } if stale { - err := gs.client.Delete(context.TODO(), gw.Name, metav1.DeleteOptions{}) + err := gs.client.Delete(ctx, gw.Name, metav1.DeleteOptions{}) if err != nil { // In this case we don't want to stop the cleanup loop and just log it. utilruntime.HandleError(fmt.Errorf("error deleting stale Gateway %+v: %w", gw, err)) @@ -278,10 +278,10 @@ func (gs *GatewaySyncer) generateGatewayObject() *v1.Gateway { // CleanupGatewayEntry removes this Gateway entry from the k8s API, it does not // propagate error up because it's a termination function that we also provide externally. -func (gs *GatewaySyncer) CleanupGatewayEntry() { +func (gs *GatewaySyncer) CleanupGatewayEntry(ctx context.Context) { hostName := gs.engine.GetLocalEndpoint().Spec.Hostname - err := gs.client.Delete(context.TODO(), hostName, metav1.DeleteOptions{}) + err := gs.client.Delete(ctx, hostName, metav1.DeleteOptions{}) if err != nil { logger.Errorf(err, "Error while trying to delete own Gateway %q", hostName) return diff --git a/pkg/cableengine/syncer/syncer_test.go b/pkg/cableengine/syncer/syncer_test.go index ee95e9fc0..037bf4363 100644 --- a/pkg/cableengine/syncer/syncer_test.go +++ b/pkg/cableengine/syncer/syncer_test.go @@ -189,7 +189,7 @@ func testGatewaySyncing() { statusErr := errors.New("fake error") t.expectedGateway.Status.StatusFailure = statusErr.Error() - t.syncer.SetGatewayStatusError(statusErr) + t.syncer.SetGatewayStatusError(context.Background(), statusErr) t.awaitGatewayUpdated(t.expectedGateway) }) }) diff --git a/pkg/controllers/datastoresyncer/datastore_cluster_sync_test.go b/pkg/controllers/datastoresyncer/datastore_cluster_sync_test.go index 31ae4f8bf..054c8c9c8 100644 --- a/pkg/controllers/datastoresyncer/datastore_cluster_sync_test.go +++ b/pkg/controllers/datastoresyncer/datastore_cluster_sync_test.go @@ -134,7 +134,7 @@ func testClusterCleanup() { }) It("should remove local Clusters from the remote datastore", func() { - Expect(t.syncer.Cleanup()).To(Succeed()) + Expect(t.syncer.Cleanup(context.Background())).To(Succeed()) test.AwaitNoResource(t.brokerClusters, clusterID) @@ -143,7 +143,7 @@ func testClusterCleanup() { }) It("should remove all Clusters from the local datastore", func() { - Expect(t.syncer.Cleanup()).To(Succeed()) + Expect(t.syncer.Cleanup(context.Background())).To(Succeed()) test.AwaitNoResource(t.localClusters, clusterID) test.AwaitNoResource(t.localClusters, otherClusterID) diff --git a/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go b/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go index f0d27aa8d..1b1421f27 100644 --- a/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go +++ b/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go @@ -262,7 +262,7 @@ func testEndpointCleanup() { }) It("should remove local Endpoints from the remote datastore", func() { - Expect(t.syncer.Cleanup()).To(Succeed()) + Expect(t.syncer.Cleanup(context.Background())).To(Succeed()) test.AwaitNoResource(t.brokerEndpoints, existingLocalEndpoint.GetName()) @@ -271,7 +271,7 @@ func testEndpointCleanup() { }) It("should remove all Endpoints from the local datastore", func() { - Expect(t.syncer.Cleanup()).To(Succeed()) + Expect(t.syncer.Cleanup(context.Background())).To(Succeed()) test.AwaitNoResource(t.localEndpoints, existingLocalEndpoint.GetName()) test.AwaitNoResource(t.localEndpoints, existingRemoteEndpoint.GetName()) diff --git a/pkg/controllers/datastoresyncer/datastoresyncer.go b/pkg/controllers/datastoresyncer/datastoresyncer.go index 757285318..e57ed182f 100644 --- a/pkg/controllers/datastoresyncer/datastoresyncer.go +++ b/pkg/controllers/datastoresyncer/datastoresyncer.go @@ -68,7 +68,7 @@ func New(syncerConfig *broker.SyncerConfig, localCluster *types.SubmarinerCluste } } -func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error { +func (d *DatastoreSyncer) Start(ctx context.Context) error { defer utilruntime.HandleCrash() logger.Info("Starting the datastore syncer") @@ -81,25 +81,25 @@ func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error { return err } - err = syncer.Start(stopCh) + err = syncer.Start(ctx.Done()) if err != nil { return errors.WithMessage(err, "error starting the syncer") } - if err := d.ensureExclusiveEndpoint(syncer); err != nil { + if err := d.ensureExclusiveEndpoint(ctx, syncer); err != nil { return errors.WithMessage(err, "could not ensure exclusive submariner Endpoint") } - if err := d.createLocalCluster(syncer.GetLocalFederator()); err != nil { + if err := d.createLocalCluster(ctx, syncer.GetLocalFederator()); err != nil { return errors.WithMessage(err, "error creating the local submariner Cluster") } - if err := d.createOrUpdateLocalEndpoint(syncer.GetLocalFederator()); err != nil { + if err := d.createOrUpdateLocalEndpoint(ctx, syncer.GetLocalFederator()); err != nil { return errors.WithMessage(err, "error creating the local submariner Endpoint") } if len(d.localCluster.Spec.GlobalCIDR) > 0 { - if err := d.startNodeWatcher(stopCh); err != nil { + if err := d.startNodeWatcher(ctx.Done()); err != nil { return errors.WithMessage(err, "startNodeWatcher returned error") } } @@ -109,7 +109,7 @@ func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error { return nil } -func (d *DatastoreSyncer) Cleanup() error { +func (d *DatastoreSyncer) Cleanup(ctx context.Context) error { syncer, err := d.createSyncer() if err != nil { return err @@ -123,7 +123,7 @@ func (d *DatastoreSyncer) Cleanup() error { } } - err = d.cleanupResources(localClient.Resource(schema.GroupVersionResource{ + err = d.cleanupResources(ctx, localClient.Resource(schema.GroupVersionResource{ Group: submarinerv1.SchemeGroupVersion.Group, Version: submarinerv1.SchemeGroupVersion.Version, Resource: "endpoints", @@ -132,7 +132,7 @@ func (d *DatastoreSyncer) Cleanup() error { return err } - err = d.cleanupResources(localClient.Resource(schema.GroupVersionResource{ + err = d.cleanupResources(ctx, localClient.Resource(schema.GroupVersionResource{ Group: submarinerv1.SchemeGroupVersion.Group, Version: submarinerv1.SchemeGroupVersion.Version, Resource: "clusters", @@ -144,8 +144,10 @@ func (d *DatastoreSyncer) Cleanup() error { return nil } -func (d *DatastoreSyncer) cleanupResources(client dynamic.NamespaceableResourceInterface, syncer *broker.Syncer) error { - list, err := client.Namespace(d.syncerConfig.LocalNamespace).List(context.TODO(), metav1.ListOptions{}) +func (d *DatastoreSyncer) cleanupResources(ctx context.Context, client dynamic.NamespaceableResourceInterface, + syncer *broker.Syncer, +) error { + list, err := client.Namespace(d.syncerConfig.LocalNamespace).List(ctx, metav1.ListOptions{}) if err != nil { return errors.Wrap(err, "error retrieving submariner resources") } @@ -153,7 +155,7 @@ func (d *DatastoreSyncer) cleanupResources(client dynamic.NamespaceableResourceI for i := range list.Items { obj := &list.Items[i] - err = syncer.GetLocalFederator().Delete(obj) + err = syncer.GetLocalFederator().Delete(ctx, obj) if err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "error deleting submariner %s %q from the local datastore", obj.GetKind(), obj.GetName()) } @@ -165,7 +167,7 @@ func (d *DatastoreSyncer) cleanupResources(client dynamic.NamespaceableResourceI continue } - err = syncer.GetBrokerFederator().Delete(obj) + err = syncer.GetBrokerFederator().Delete(ctx, obj) if err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "error deleting submariner %s %q from the remote datastore", obj.GetKind(), obj.GetName()) } @@ -239,7 +241,7 @@ func (d *DatastoreSyncer) shouldSyncRemoteEndpoint(obj runtime.Object, _ int, return obj, false } -func (d *DatastoreSyncer) ensureExclusiveEndpoint(syncer *broker.Syncer) error { +func (d *DatastoreSyncer) ensureExclusiveEndpoint(ctx context.Context, syncer *broker.Syncer) error { logger.Info("Ensuring we are the only endpoint active for this cluster") endpoints := syncer.ListLocalResources(&submarinerv1.Endpoint{}) @@ -259,7 +261,7 @@ func (d *DatastoreSyncer) ensureExclusiveEndpoint(syncer *broker.Syncer) error { continue } - err = syncer.GetLocalFederator().Delete(endpoint) + err = syncer.GetLocalFederator().Delete(ctx, endpoint) if err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "error deleting submariner Endpoint %q from the local datastore", endpointName) } @@ -314,7 +316,7 @@ func (d *DatastoreSyncer) createNodeWatcher(stopCh <-chan struct{}) error { return nil } -func (d *DatastoreSyncer) createLocalCluster(federator federate.Federator) error { +func (d *DatastoreSyncer) createLocalCluster(ctx context.Context, federator federate.Federator) error { logger.Infof("Creating local submariner Cluster: %#v ", d.localCluster) cluster := &submarinerv1.Cluster{ @@ -324,10 +326,10 @@ func (d *DatastoreSyncer) createLocalCluster(federator federate.Federator) error Spec: d.localCluster.Spec, } - return federator.Distribute(cluster) //nolint:wrapcheck // Let the caller wrap it + return federator.Distribute(ctx, cluster) //nolint:wrapcheck // Let the caller wrap it } -func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(federator federate.Federator) error { +func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(ctx context.Context, federator federate.Federator) error { logger.Infof("Creating local submariner Endpoint: %#v ", d.localEndpoint) endpointName, err := d.localEndpoint.Spec.GenerateName() @@ -342,5 +344,5 @@ func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(federator federate.Federat Spec: d.localEndpoint.Spec, } - return federator.Distribute(endpoint) //nolint:wrapcheck // Let the caller wrap it + return federator.Distribute(ctx, endpoint) //nolint:wrapcheck // Let the caller wrap it } diff --git a/pkg/controllers/datastoresyncer/datastoresyncer_suite_test.go b/pkg/controllers/datastoresyncer/datastoresyncer_suite_test.go index f4a71736c..7fa3f1fc3 100644 --- a/pkg/controllers/datastoresyncer/datastoresyncer_suite_test.go +++ b/pkg/controllers/datastoresyncer/datastoresyncer_suite_test.go @@ -19,6 +19,7 @@ limitations under the License. package datastoresyncer_test import ( + "context" "fmt" "os" "reflect" @@ -84,7 +85,7 @@ type testDriver struct { brokerEndpoints dynamic.ResourceInterface syncerScheme *runtime.Scheme restMapper meta.RESTMapper - stopCh chan struct{} + stopFn context.CancelFunc startCompleted chan error expectedStartErr error doStart bool @@ -162,11 +163,13 @@ func (t *testDriver) run() { }, t.localCluster, t.localEndpoint) if t.doStart { - t.stopCh = make(chan struct{}) + var ctx context.Context + + ctx, t.stopFn = context.WithCancel(context.Background()) t.startCompleted = make(chan error, 1) go func() { - t.startCompleted <- t.syncer.Start(t.stopCh) + t.startCompleted <- t.syncer.Start(ctx) }() } } @@ -186,7 +189,7 @@ func (t *testDriver) stop() { } }() - close(t.stopCh) + t.stopFn() if t.expectedStartErr == nil { Expect(err).To(Succeed()) diff --git a/pkg/controllers/datastoresyncer/node_handler.go b/pkg/controllers/datastoresyncer/node_handler.go index ba1fb79bc..0ad483ae8 100644 --- a/pkg/controllers/datastoresyncer/node_handler.go +++ b/pkg/controllers/datastoresyncer/node_handler.go @@ -19,6 +19,7 @@ limitations under the License. package datastoresyncer import ( + "context" "net" "github.com/submariner-io/submariner/pkg/globalnet/constants" @@ -75,7 +76,7 @@ func (d *DatastoreSyncer) updateLocalEndpointIfNecessary(globalIPOfNode string) prevHealthCheckIP := d.localEndpoint.Spec.HealthCheckIP d.localEndpoint.Spec.HealthCheckIP = globalIPOfNode - if err := d.createOrUpdateLocalEndpoint(d.updateFederator); err != nil { + if err := d.createOrUpdateLocalEndpoint(context.TODO(), d.updateFederator); err != nil { logger.Warningf("Error updating the local submariner Endpoint with HealthcheckIP: %v", err) d.localEndpoint.Spec.HealthCheckIP = prevHealthCheckIP diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 8e9858338..844754a8f 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -63,7 +63,7 @@ const ( ) type Interface interface { - Run(<-chan struct{}) error + Run(ctx context.Context) error } type LeaderElectionConfig struct { @@ -96,7 +96,6 @@ type gatewayType struct { gatewayPod *pod.GatewayPod hostName string localEndpoint *types.SubmarinerEndpoint - stopCh <-chan struct{} fatalError chan error leaderComponentsStarted *sync.WaitGroup recorder record.EventRecorder @@ -176,25 +175,23 @@ func New(config *Config) (Interface, error) { return g, nil } -func (g *gatewayType) Run(stopCh <-chan struct{}) error { +func (g *gatewayType) Run(ctx context.Context) error { if g.Spec.Uninstall { logger.Info("Uninstalling the submariner gateway engine") - return g.uninstall() + return g.uninstall(ctx) } logger.Info("Starting the gateway engine") - g.stopCh = stopCh - g.cableEngine.SetupNATDiscovery(g.natDiscovery) - err := g.natDiscovery.Run(g.stopCh) + err := g.natDiscovery.Run(ctx.Done()) if err != nil { return errors.Wrap(err, "error starting NAT discovery server") } - g.gatewayPod, err = pod.NewGatewayPod(g.KubeClient) + g.gatewayPod, err = pod.NewGatewayPod(ctx, g.KubeClient) if err != nil { return errors.Wrap(err, "error creating a handler to update the gateway pod") } @@ -202,24 +199,25 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error { var waitGroup sync.WaitGroup g.runAsync(&waitGroup, func() { - g.cableEngineSyncer.Run(g.stopCh) + //nolint:contextcheck // Intentionally not passing the context b/c it can't be used after cancellation. + g.cableEngineSyncer.Run(ctx.Done()) }) if !g.airGapped { g.initPublicIPWatcher() } - err = g.startLeaderElection() + err = g.startLeaderElection(ctx) if err != nil { return errors.Wrap(err, "error starting leader election") } select { - case <-g.stopCh: + case <-ctx.Done(): case fatalErr := <-g.fatalError: - g.cableEngineSyncer.SetGatewayStatusError(fatalErr) + g.cableEngineSyncer.SetGatewayStatusError(ctx, fatalErr) - if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil { + if err := g.gatewayPod.SetHALabels(ctx, subv1.HAStatusPassive); err != nil { logger.Warningf("Error updating pod label: %s", err) } @@ -242,7 +240,7 @@ func (g *gatewayType) runAsync(waitGroup *sync.WaitGroup, run func()) { }() } -func (g *gatewayType) startLeaderElection() error { +func (g *gatewayType) startLeaderElection(ctx context.Context) error { logger.Info("Starting leader election") g.leaderComponentsStarted = &sync.WaitGroup{} @@ -256,7 +254,7 @@ func (g *gatewayType) startLeaderElection() error { return errors.Wrap(err, "error creating leader election resource lock") } - go leaderelection.RunOrDie(context.Background(), leaderelection.LeaderElectionConfig{ + go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: g.LeaseDuration, RenewDeadline: g.RenewDeadline, @@ -295,7 +293,12 @@ func (g *gatewayType) onStartedLeading(ctx context.Context) { }) g.runAsync(g.leaderComponentsStarted, func() { - if err := g.datastoreSyncer.Start(ctx.Done()); err != nil { + err := g.datastoreSyncer.Start(ctx) + if errors.Is(err, context.Canceled) { + return + } + + if err != nil { g.fatalError <- errors.Wrap(err, "error running the datastore syncer") } }) @@ -338,7 +341,7 @@ func (g *gatewayType) onStoppedLeading() { logger.Warningf("Error updating pod label to passive: %s", err) } - err := g.startLeaderElection() + err := g.startLeaderElection(context.Background()) if err != nil { g.fatalError <- errors.Wrap(err, "error restarting leader election") } @@ -375,7 +378,7 @@ func (g *gatewayType) initCableHealthChecker() { } } -func (g *gatewayType) uninstall() error { +func (g *gatewayType) uninstall(ctx context.Context) error { err := g.cableEngine.StartEngine() if err != nil { // As we are in the process of cleaning up, ignore any initialization errors. @@ -383,14 +386,14 @@ func (g *gatewayType) uninstall() error { } // The Gateway object has to be deleted before invoking the cableEngine.Cleanup - g.cableEngineSyncer.CleanupGatewayEntry() + g.cableEngineSyncer.CleanupGatewayEntry(ctx) err = g.cableEngine.Cleanup() if err != nil { logger.Errorf(err, "Error while cleaning up of cable drivers") } - dsErr := g.datastoreSyncer.Cleanup() + dsErr := g.datastoreSyncer.Cleanup(ctx) return errors.Wrap(dsErr, "Error cleaning up the datastore") } diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index 3c02adba8..b5c4bc4be 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -302,12 +302,12 @@ func newTestDriver() *testDriver { gw, err := gateway.New(&t.config) Expect(err).To(Succeed()) - stopCh := make(chan struct{}) + ctx, stop := context.WithCancel(context.Background()) runCompleted := make(chan error, 1) DeferCleanup(func() { if t.expectedRunErr == nil { - close(stopCh) + stop() } err := func() error { @@ -334,7 +334,7 @@ func newTestDriver() *testDriver { }) go func() { - runCompleted <- gw.Run(stopCh) + runCompleted <- gw.Run(ctx) }() }) diff --git a/pkg/globalnet/controllers/base_controllers.go b/pkg/globalnet/controllers/base_controllers.go index f0be964ef..090b6cd69 100644 --- a/pkg/globalnet/controllers/base_controllers.go +++ b/pkg/globalnet/controllers/base_controllers.go @@ -151,7 +151,7 @@ func (c *baseIPAllocationController) reserveAllocatedIPs(federator federate.Fede logger.Infof("Updating %q: %#v", key, obj) - return federator.Distribute(obj) //nolint:wrapcheck // Let the caller wrap it + return federator.Distribute(context.TODO(), obj) //nolint:wrapcheck // Let the caller wrap it } logger.Infof("Successfully reserved GlobalIPs %q for %s \"%s/%s\"", reservedIPs, obj.GetKind(), diff --git a/pkg/globalnet/controllers/node_controller.go b/pkg/globalnet/controllers/node_controller.go index fbf2d2c05..ce6e17c39 100644 --- a/pkg/globalnet/controllers/node_controller.go +++ b/pkg/globalnet/controllers/node_controller.go @@ -187,7 +187,8 @@ func (n *nodeController) reserveAllocatedIP(federator federate.Federator, obj *u if err != nil { logger.Warningf("Could not reserve allocated GlobalIP for Node %q: %v", obj.GetName(), err) - return errors.Wrap(federator.Distribute(updateNodeAnnotation(obj, "")), "error updating the Node global IP annotation") + return errors.Wrap(federator.Distribute(context.TODO(), updateNodeAnnotation(obj, "")), + "error updating the Node global IP annotation") } logger.Infof("Successfully reserved allocated GlobalIP %q for node %q", existingGlobalIP, obj.GetName()) diff --git a/pkg/pod/pod.go b/pkg/pod/pod.go index 08b6d6dce..7257e8ca5 100644 --- a/pkg/pod/pod.go +++ b/pkg/pod/pod.go @@ -45,7 +45,7 @@ type GatewayPod struct { var logger = log.Logger{Logger: logf.Log.WithName("Pod")} -func NewGatewayPod(k8sClient kubernetes.Interface) (*GatewayPod, error) { +func NewGatewayPod(ctx context.Context, k8sClient kubernetes.Interface) (*GatewayPod, error) { gp := &GatewayPod{ namespace: os.Getenv("SUBMARINER_NAMESPACE"), node: os.Getenv("NODE_NAME"), @@ -65,7 +65,7 @@ func NewGatewayPod(k8sClient kubernetes.Interface) (*GatewayPod, error) { return nil, errors.New("POD_NAME environment variable missing") } - if err := gp.SetHALabels(context.Background(), submV1.HAStatusPassive); err != nil { + if err := gp.SetHALabels(ctx, submV1.HAStatusPassive); err != nil { logger.Warningf("Error updating pod label: %s", err) }