From f870ea87e2dcf2ecc64ad47db4087d9d8de67a97 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Thu, 7 Mar 2024 13:12:46 +0100 Subject: [PATCH] close current stream if new is opened Signed-off-by: Kristoffer Dalby --- hscontrol/app.go | 4 ++++ hscontrol/noise.go | 17 +++++++++++++++++ hscontrol/poll.go | 8 ++++---- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/hscontrol/app.go b/hscontrol/app.go index 2061d3a6789..55dbdd1323c 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -99,6 +99,9 @@ type Headscale struct { registrationCache *cache.Cache pollNetMapStreamWG sync.WaitGroup + + mapSessions map[types.NodeID]*mapSession + mapSessionMu sync.Mutex } var ( @@ -130,6 +133,7 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) { registrationCache: registrationCache, pollNetMapStreamWG: sync.WaitGroup{}, nodeNotifier: notifier.NewNotifier(), + mapSessions: make(map[types.NodeID]*mapSession), } app.db, err = db.NewHeadscaleDatabase( diff --git a/hscontrol/noise.go b/hscontrol/noise.go index c8b8f1dc4dd..10408adada1 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -247,5 +247,22 @@ func (ns *noiseServer) NoisePollNetMapHandler( Msg("A node sending a MapRequest with Noise protocol") session := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node) + + // If a streaming mapSession exists for this node, close it + // and start a new one. + if session.isStreaming() { + ns.headscale.mapSessionMu.Lock() + if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok { + log.Info(). + Caller(). + Int("node.id", int(node.ID)). + Msg("Node has an open streaming session, replacing") + oldSession.close() + } + + ns.headscale.mapSessions[node.ID] = session + ns.headscale.mapSessionMu.Unlock() + } + session.serve() } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index ae25ea63bb6..05d6e60ea31 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -325,7 +325,10 @@ func (m *mapSession) serve() { // consume channels with update, keep alives or "batch" blocking signals select { - // Avoid infinite block that would potentially leave + case <-m.cancelCh: + return + + // Avoid infinite block that would potentially leave // some updates in the changed map. case <-blockBreaker.C: continue @@ -398,10 +401,7 @@ func (m *mapSession) serve() { // The connection has been closed, so we can stop polling. return - case <-m.cancelCh: - return } - } }