diff --git a/backend/extension/extension_network.go b/backend/extension/extension_network.go index b325c3024..bd766d901 100644 --- a/backend/extension/extension_network.go +++ b/backend/extension/extension_network.go @@ -61,11 +61,12 @@ func (n *network) Run(ctx context.Context) { for { select { - case evtBatch := <-evts: + case evtBatch, ok := <-evts: + if !ok { + log.Infof("evts chan closed") + return + } n.handleSubnetEvents(evtBatch) - - case <-ctx.Done(): - return } } } diff --git a/backend/ipsec/ipsec_network.go b/backend/ipsec/ipsec_network.go index 1f49bfadf..632b9b693 100644 --- a/backend/ipsec/ipsec_network.go +++ b/backend/ipsec/ipsec_network.go @@ -94,12 +94,13 @@ func (n *network) Run(ctx context.Context) { for { select { - case evtsBatch := <-evts: + case evtsBatch, ok := <-evts: + if !ok { + log.Infof("evts chan closed") + return + } log.Info("Handling event") n.handleSubnetEvents(evtsBatch) - case <-ctx.Done(): - log.Info("Received DONE") - return } } } diff --git a/backend/route_network.go b/backend/route_network.go index 96ca61b0d..496b43d2f 100644 --- a/backend/route_network.go +++ b/backend/route_network.go @@ -69,11 +69,12 @@ func (n *RouteNetwork) Run(ctx context.Context) { for { select { - case evtBatch := <-evts: + case evtBatch, ok := <-evts: + if !ok { + log.Infof("evts chan closed") + return + } n.handleSubnetEvents(evtBatch) - - case <-ctx.Done(): - return } } } diff --git a/backend/route_network_windows.go b/backend/route_network_windows.go index 2914b9a68..72f0300da 100644 --- a/backend/route_network_windows.go +++ b/backend/route_network_windows.go @@ -67,11 +67,12 @@ func (n *RouteNetwork) Run(ctx context.Context) { for { select { - case evtBatch := <-evts: + case evtBatch, ok := <-evts: + if !ok { + log.Infof("evts chan closed") + return + } n.handleSubnetEvents(evtBatch) - - case <-ctx.Done(): - return } } } diff --git a/backend/udp/udp_network_amd64.go b/backend/udp/udp_network_amd64.go index 92c67807a..608951324 100644 --- a/backend/udp/udp_network_amd64.go +++ b/backend/udp/udp_network_amd64.go @@ -109,12 +109,13 @@ func (n *network) Run(ctx context.Context) { for { select { - case evtBatch := <-evts: + case evtBatch, ok := <-evts: + if !ok { + log.Infof("evts chan closed") + stopProxy(n.ctl) + return + } n.processSubnetEvents(evtBatch) - - case <-ctx.Done(): - stopProxy(n.ctl) - return } } } diff --git a/backend/vxlan/vxlan_network.go b/backend/vxlan/vxlan_network.go index bbab85e26..2d43f1325 100644 --- a/backend/vxlan/vxlan_network.go +++ b/backend/vxlan/vxlan_network.go @@ -69,11 +69,12 @@ func (nw *network) Run(ctx context.Context) { for { select { - case evtBatch := <-events: + case evtBatch, ok := <-events: + if !ok { + log.Infof("evts chan closed") + return + } nw.handleSubnetEvents(evtBatch) - - case <-ctx.Done(): - return } } } diff --git a/backend/vxlan/vxlan_network_windows.go b/backend/vxlan/vxlan_network_windows.go index 8d5d47b46..bfdd05584 100644 --- a/backend/vxlan/vxlan_network_windows.go +++ b/backend/vxlan/vxlan_network_windows.go @@ -73,11 +73,12 @@ func (nw *network) Run(ctx context.Context) { for { select { - case evtBatch := <-events: + case evtBatch, ok := <-events: + if !ok { + log.Infof("evts chan closed") + return + } nw.handleSubnetEvents(evtBatch) - - case <-ctx.Done(): - return } } } diff --git a/main.go b/main.go index 8c8e99800..b6f2e9d61 100644 --- a/main.go +++ b/main.go @@ -426,7 +426,11 @@ func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg log.Info("Lease renewed, new expiration: ", bn.Lease().Expiration) dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin - case e := <-evts: + case e, ok := <-evts: + if !ok { + log.Infof("Stopped monitoring lease") + return errCanceled + } switch e.Type { case subnet.EventAdded: bn.Lease().Expiration = e.Lease.Expiration @@ -437,10 +441,6 @@ func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg log.Error("Lease has been revoked. Shutting down daemon.") return errInterrupted } - - case <-ctx.Done(): - log.Infof("Stopped monitoring lease") - return errCanceled } } } diff --git a/subnet/etcdv2/mock_etcd.go b/subnet/etcdv2/mock_etcd.go index 82bcf6820..fa7383edd 100644 --- a/subnet/etcdv2/mock_etcd.go +++ b/subnet/etcdv2/mock_etcd.go @@ -178,7 +178,7 @@ func (me *mockEtcd) sendEvent(resp *etcd.Response) { me.events = append(me.events, resp) // and notify watchers - for w, _ := range me.watchers { + for w := range me.watchers { w.notifyEvent(resp) } } diff --git a/subnet/watch.go b/subnet/watch.go index 57c959fc6..1f2da7c0e 100644 --- a/subnet/watch.go +++ b/subnet/watch.go @@ -37,6 +37,8 @@ func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan res, err := sm.WatchLeases(ctx, cursor) if err != nil { if err == context.Canceled || err == context.DeadlineExceeded { + log.Infof("%v, close receiver chan", err) + close(receiver) return } @@ -169,6 +171,8 @@ func WatchLease(ctx context.Context, sm Manager, sn ip.IP4Net, receiver chan Eve wr, err := sm.WatchLease(ctx, sn, cursor) if err != nil { if err == context.Canceled || err == context.DeadlineExceeded { + log.Infof("%v, close receiver chan", err) + close(receiver) return }