Skip to content

Commit

Permalink
Pass ctx to datastoreSyncer Start method
Browse files Browse the repository at this point in the history
...and propagate to the Federator methods so the caller can cancel and/or
put a deadline on the operations. This also required further propagating the
ctx params where appropriate to satisfy the contextcheck linter.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and skitt committed Oct 31, 2023
1 parent 3f4a3ba commit c936fd6
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 76 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ require (
github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637
github.com/prometheus-community/pro-bing v0.3.0
github.com/prometheus/client_golang v1.16.0
github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c
github.com/submariner-io/shipyard v0.16.1-0.20231025063959-1e68200a75e6
github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca
github.com/submariner-io/shipyard v0.16.1-0.20231031115937-e306910cd742
github.com/uw-labs/lichen v0.1.7
github.com/vishvananda/netlink v1.2.1-beta.2
golang.org/x/sys v0.13.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c h1:zy5mZZrB885JAuLPqpb/RoGhtd9N9tUCFE5OGAZEzWw=
github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c/go.mod h1:GP0TCJkt444r2ONKVHKBbSPaKjJb0S5Qj0MyNUl2keQ=
github.com/submariner-io/shipyard v0.16.1-0.20231025063959-1e68200a75e6 h1:UbExEBgaVP9q4XTNYlw3DnMb/RWXLSFdxWedxfstoN0=
github.com/submariner-io/shipyard v0.16.1-0.20231025063959-1e68200a75e6/go.mod h1:op6JLbiJv3dteWxde6GpG5zp0oljZWVDU1FXmB1j6cs=
github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca h1:O3gUAdYldm4LQJD/IpqaFcOlJVf9ikM+uBWsfVADmjg=
github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca/go.mod h1:GP0TCJkt444r2ONKVHKBbSPaKjJb0S5Qj0MyNUl2keQ=
github.com/submariner-io/shipyard v0.16.1-0.20231031115937-e306910cd742 h1:IxgmM0WOPYnYkBc1eNQdiMu9ozGax8FV0PYhHFHPbuE=
github.com/submariner-io/shipyard v0.16.1-0.20231031115937-e306910cd742/go.mod h1:op6JLbiJv3dteWxde6GpG5zp0oljZWVDU1FXmB1j6cs=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ func main() {
})
logger.FatalOnError(err, "Error creating gateway instance")

err = gw.Run(signals.SetupSignalHandler().Done())
err = gw.Run(signals.SetupSignalHandler())

if err := httpServer.Shutdown(context.Background()); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

if err := httpServer.Shutdown(ctx); err != nil {
logger.Errorf(err, "Error shutting down metrics HTTP server")
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/cableengine/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewGatewaySyncer(engine cableengine.Engine, client v1typed.GatewayInterface

func (gs *GatewaySyncer) Run(stopCh <-chan struct{}) {
wait.Until(gs.syncGatewayStatus, GatewayUpdateInterval, stopCh)
gs.CleanupGatewayEntry()
gs.CleanupGatewayEntry(context.Background())

logger.Info("CableEngine syncer stopped")
}
Expand All @@ -93,15 +93,15 @@ func (gs *GatewaySyncer) syncGatewayStatus() {
gs.mutex.Lock()
defer gs.mutex.Unlock()

gs.syncGatewayStatusSafe()
gs.syncGatewayStatusSafe(context.Background())
}

func (gs *GatewaySyncer) SetGatewayStatusError(err error) {
func (gs *GatewaySyncer) SetGatewayStatusError(ctx context.Context, err error) {
gs.mutex.Lock()
defer gs.mutex.Unlock()

gs.statusError = err
gs.syncGatewayStatusSafe()
gs.syncGatewayStatusSafe(ctx)
}

func (gs *GatewaySyncer) gatewayResourceInterface() resource.Interface {
Expand All @@ -122,13 +122,13 @@ func (gs *GatewaySyncer) gatewayResourceInterface() resource.Interface {
}
}

func (gs *GatewaySyncer) syncGatewayStatusSafe() {
func (gs *GatewaySyncer) syncGatewayStatusSafe(ctx context.Context) {
logger.V(log.TRACE).Info("Running Gateway status sync")
gatewaySyncIterations.Inc()

gatewayObj := gs.generateGatewayObject()

result, err := util.CreateOrUpdate(context.TODO(), gs.gatewayResourceInterface(), gatewayObj,
result, err := util.CreateOrUpdate(ctx, gs.gatewayResourceInterface(), gatewayObj,
func(existing runtime.Object) (runtime.Object, error) {
existingGw := existing.(*v1.Gateway)
existingGw.Status = gatewayObj.Status
Expand All @@ -150,15 +150,15 @@ func (gs *GatewaySyncer) syncGatewayStatusSafe() {
}

if gatewayObj.Status.HAStatus == v1.HAStatusActive {
err := gs.cleanupStaleGatewayEntries(gatewayObj.Name)
err := gs.cleanupStaleGatewayEntries(ctx, gatewayObj.Name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error cleaning up stale gateway entries: %w", err))
}
}
}

func (gs *GatewaySyncer) cleanupStaleGatewayEntries(localGatewayName string) error {
gateways, err := gs.client.List(context.TODO(), metav1.ListOptions{})
func (gs *GatewaySyncer) cleanupStaleGatewayEntries(ctx context.Context, localGatewayName string) error {
gateways, err := gs.client.List(ctx, metav1.ListOptions{})
if err != nil {
return errors.Wrap(err, "error listing Gateways")
}
Expand All @@ -176,7 +176,7 @@ func (gs *GatewaySyncer) cleanupStaleGatewayEntries(localGatewayName string) err
}

if stale {
err := gs.client.Delete(context.TODO(), gw.Name, metav1.DeleteOptions{})
err := gs.client.Delete(ctx, gw.Name, metav1.DeleteOptions{})
if err != nil {
// In this case we don't want to stop the cleanup loop and just log it.
utilruntime.HandleError(fmt.Errorf("error deleting stale Gateway %+v: %w", gw, err))
Expand Down Expand Up @@ -278,10 +278,10 @@ func (gs *GatewaySyncer) generateGatewayObject() *v1.Gateway {

// CleanupGatewayEntry removes this Gateway entry from the k8s API, it does not
// propagate error up because it's a termination function that we also provide externally.
func (gs *GatewaySyncer) CleanupGatewayEntry() {
func (gs *GatewaySyncer) CleanupGatewayEntry(ctx context.Context) {
hostName := gs.engine.GetLocalEndpoint().Spec.Hostname

err := gs.client.Delete(context.TODO(), hostName, metav1.DeleteOptions{})
err := gs.client.Delete(ctx, hostName, metav1.DeleteOptions{})
if err != nil {
logger.Errorf(err, "Error while trying to delete own Gateway %q", hostName)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/cableengine/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func testGatewaySyncing() {
statusErr := errors.New("fake error")
t.expectedGateway.Status.StatusFailure = statusErr.Error()

t.syncer.SetGatewayStatusError(statusErr)
t.syncer.SetGatewayStatusError(context.Background(), statusErr)
t.awaitGatewayUpdated(t.expectedGateway)
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func testClusterCleanup() {
})

It("should remove local Clusters from the remote datastore", func() {
Expect(t.syncer.Cleanup()).To(Succeed())
Expect(t.syncer.Cleanup(context.Background())).To(Succeed())

test.AwaitNoResource(t.brokerClusters, clusterID)

Expand All @@ -143,7 +143,7 @@ func testClusterCleanup() {
})

It("should remove all Clusters from the local datastore", func() {
Expect(t.syncer.Cleanup()).To(Succeed())
Expect(t.syncer.Cleanup(context.Background())).To(Succeed())

test.AwaitNoResource(t.localClusters, clusterID)
test.AwaitNoResource(t.localClusters, otherClusterID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func testEndpointCleanup() {
})

It("should remove local Endpoints from the remote datastore", func() {
Expect(t.syncer.Cleanup()).To(Succeed())
Expect(t.syncer.Cleanup(context.Background())).To(Succeed())

test.AwaitNoResource(t.brokerEndpoints, existingLocalEndpoint.GetName())

Expand All @@ -271,7 +271,7 @@ func testEndpointCleanup() {
})

It("should remove all Endpoints from the local datastore", func() {
Expect(t.syncer.Cleanup()).To(Succeed())
Expect(t.syncer.Cleanup(context.Background())).To(Succeed())

test.AwaitNoResource(t.localEndpoints, existingLocalEndpoint.GetName())
test.AwaitNoResource(t.localEndpoints, existingRemoteEndpoint.GetName())
Expand Down
40 changes: 21 additions & 19 deletions pkg/controllers/datastoresyncer/datastoresyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(syncerConfig *broker.SyncerConfig, localCluster *types.SubmarinerCluste
}
}

func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error {
func (d *DatastoreSyncer) Start(ctx context.Context) error {
defer utilruntime.HandleCrash()

logger.Info("Starting the datastore syncer")
Expand All @@ -81,25 +81,25 @@ func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error {
return err
}

err = syncer.Start(stopCh)
err = syncer.Start(ctx.Done())
if err != nil {
return errors.WithMessage(err, "error starting the syncer")
}

if err := d.ensureExclusiveEndpoint(syncer); err != nil {
if err := d.ensureExclusiveEndpoint(ctx, syncer); err != nil {
return errors.WithMessage(err, "could not ensure exclusive submariner Endpoint")
}

if err := d.createLocalCluster(syncer.GetLocalFederator()); err != nil {
if err := d.createLocalCluster(ctx, syncer.GetLocalFederator()); err != nil {
return errors.WithMessage(err, "error creating the local submariner Cluster")
}

if err := d.createOrUpdateLocalEndpoint(syncer.GetLocalFederator()); err != nil {
if err := d.createOrUpdateLocalEndpoint(ctx, syncer.GetLocalFederator()); err != nil {
return errors.WithMessage(err, "error creating the local submariner Endpoint")
}

if len(d.localCluster.Spec.GlobalCIDR) > 0 {
if err := d.startNodeWatcher(stopCh); err != nil {
if err := d.startNodeWatcher(ctx.Done()); err != nil {
return errors.WithMessage(err, "startNodeWatcher returned error")
}
}
Expand All @@ -109,7 +109,7 @@ func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error {
return nil
}

func (d *DatastoreSyncer) Cleanup() error {
func (d *DatastoreSyncer) Cleanup(ctx context.Context) error {
syncer, err := d.createSyncer()
if err != nil {
return err
Expand All @@ -123,7 +123,7 @@ func (d *DatastoreSyncer) Cleanup() error {
}
}

err = d.cleanupResources(localClient.Resource(schema.GroupVersionResource{
err = d.cleanupResources(ctx, localClient.Resource(schema.GroupVersionResource{
Group: submarinerv1.SchemeGroupVersion.Group,
Version: submarinerv1.SchemeGroupVersion.Version,
Resource: "endpoints",
Expand All @@ -132,7 +132,7 @@ func (d *DatastoreSyncer) Cleanup() error {
return err
}

err = d.cleanupResources(localClient.Resource(schema.GroupVersionResource{
err = d.cleanupResources(ctx, localClient.Resource(schema.GroupVersionResource{
Group: submarinerv1.SchemeGroupVersion.Group,
Version: submarinerv1.SchemeGroupVersion.Version,
Resource: "clusters",
Expand All @@ -144,16 +144,18 @@ func (d *DatastoreSyncer) Cleanup() error {
return nil
}

func (d *DatastoreSyncer) cleanupResources(client dynamic.NamespaceableResourceInterface, syncer *broker.Syncer) error {
list, err := client.Namespace(d.syncerConfig.LocalNamespace).List(context.TODO(), metav1.ListOptions{})
func (d *DatastoreSyncer) cleanupResources(ctx context.Context, client dynamic.NamespaceableResourceInterface,
syncer *broker.Syncer,
) error {
list, err := client.Namespace(d.syncerConfig.LocalNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return errors.Wrap(err, "error retrieving submariner resources")
}

for i := range list.Items {
obj := &list.Items[i]

err = syncer.GetLocalFederator().Delete(obj)
err = syncer.GetLocalFederator().Delete(ctx, obj)
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting submariner %s %q from the local datastore", obj.GetKind(), obj.GetName())
}
Expand All @@ -165,7 +167,7 @@ func (d *DatastoreSyncer) cleanupResources(client dynamic.NamespaceableResourceI
continue
}

err = syncer.GetBrokerFederator().Delete(obj)
err = syncer.GetBrokerFederator().Delete(ctx, obj)
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting submariner %s %q from the remote datastore", obj.GetKind(), obj.GetName())
}
Expand Down Expand Up @@ -239,7 +241,7 @@ func (d *DatastoreSyncer) shouldSyncRemoteEndpoint(obj runtime.Object, _ int,
return obj, false
}

func (d *DatastoreSyncer) ensureExclusiveEndpoint(syncer *broker.Syncer) error {
func (d *DatastoreSyncer) ensureExclusiveEndpoint(ctx context.Context, syncer *broker.Syncer) error {
logger.Info("Ensuring we are the only endpoint active for this cluster")

endpoints := syncer.ListLocalResources(&submarinerv1.Endpoint{})
Expand All @@ -259,7 +261,7 @@ func (d *DatastoreSyncer) ensureExclusiveEndpoint(syncer *broker.Syncer) error {
continue
}

err = syncer.GetLocalFederator().Delete(endpoint)
err = syncer.GetLocalFederator().Delete(ctx, endpoint)
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting submariner Endpoint %q from the local datastore", endpointName)
}
Expand Down Expand Up @@ -314,7 +316,7 @@ func (d *DatastoreSyncer) createNodeWatcher(stopCh <-chan struct{}) error {
return nil
}

func (d *DatastoreSyncer) createLocalCluster(federator federate.Federator) error {
func (d *DatastoreSyncer) createLocalCluster(ctx context.Context, federator federate.Federator) error {
logger.Infof("Creating local submariner Cluster: %#v ", d.localCluster)

cluster := &submarinerv1.Cluster{
Expand All @@ -324,10 +326,10 @@ func (d *DatastoreSyncer) createLocalCluster(federator federate.Federator) error
Spec: d.localCluster.Spec,
}

return federator.Distribute(cluster) //nolint:wrapcheck // Let the caller wrap it
return federator.Distribute(ctx, cluster) //nolint:wrapcheck // Let the caller wrap it
}

func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(federator federate.Federator) error {
func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(ctx context.Context, federator federate.Federator) error {
logger.Infof("Creating local submariner Endpoint: %#v ", d.localEndpoint)

endpointName, err := d.localEndpoint.Spec.GenerateName()
Expand All @@ -342,5 +344,5 @@ func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(federator federate.Federat
Spec: d.localEndpoint.Spec,
}

return federator.Distribute(endpoint) //nolint:wrapcheck // Let the caller wrap it
return federator.Distribute(ctx, endpoint) //nolint:wrapcheck // Let the caller wrap it
}
11 changes: 7 additions & 4 deletions pkg/controllers/datastoresyncer/datastoresyncer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package datastoresyncer_test

import (
"context"
"fmt"
"os"
"reflect"
Expand Down Expand Up @@ -84,7 +85,7 @@ type testDriver struct {
brokerEndpoints dynamic.ResourceInterface
syncerScheme *runtime.Scheme
restMapper meta.RESTMapper
stopCh chan struct{}
stopFn context.CancelFunc
startCompleted chan error
expectedStartErr error
doStart bool
Expand Down Expand Up @@ -162,11 +163,13 @@ func (t *testDriver) run() {
}, t.localCluster, t.localEndpoint)

if t.doStart {
t.stopCh = make(chan struct{})
var ctx context.Context

ctx, t.stopFn = context.WithCancel(context.Background())
t.startCompleted = make(chan error, 1)

go func() {
t.startCompleted <- t.syncer.Start(t.stopCh)
t.startCompleted <- t.syncer.Start(ctx)
}()
}
}
Expand All @@ -186,7 +189,7 @@ func (t *testDriver) stop() {
}
}()

close(t.stopCh)
t.stopFn()

if t.expectedStartErr == nil {
Expect(err).To(Succeed())
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/datastoresyncer/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package datastoresyncer

import (
"context"
"net"

"github.com/submariner-io/submariner/pkg/globalnet/constants"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (d *DatastoreSyncer) updateLocalEndpointIfNecessary(globalIPOfNode string)
prevHealthCheckIP := d.localEndpoint.Spec.HealthCheckIP
d.localEndpoint.Spec.HealthCheckIP = globalIPOfNode

if err := d.createOrUpdateLocalEndpoint(d.updateFederator); err != nil {
if err := d.createOrUpdateLocalEndpoint(context.TODO(), d.updateFederator); err != nil {
logger.Warningf("Error updating the local submariner Endpoint with HealthcheckIP: %v", err)

d.localEndpoint.Spec.HealthCheckIP = prevHealthCheckIP
Expand Down
Loading

0 comments on commit c936fd6

Please sign in to comment.