Skip to content

Commit

Permalink
close current stream if new is opened
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Mar 7, 2024
1 parent d6d0c3c commit f870ea8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
4 changes: 4 additions & 0 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Headscale struct {
registrationCache *cache.Cache

pollNetMapStreamWG sync.WaitGroup

mapSessions map[types.NodeID]*mapSession
mapSessionMu sync.Mutex
}

var (
Expand Down Expand Up @@ -130,6 +133,7 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
registrationCache: registrationCache,
pollNetMapStreamWG: sync.WaitGroup{},
nodeNotifier: notifier.NewNotifier(),
mapSessions: make(map[types.NodeID]*mapSession),
}

app.db, err = db.NewHeadscaleDatabase(
Expand Down
17 changes: 17 additions & 0 deletions hscontrol/noise.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,22 @@ func (ns *noiseServer) NoisePollNetMapHandler(
Msg("A node sending a MapRequest with Noise protocol")

session := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node)

// If a streaming mapSession exists for this node, close it
// and start a new one.
if session.isStreaming() {
ns.headscale.mapSessionMu.Lock()
if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok {
log.Info().
Caller().
Int("node.id", int(node.ID)).
Msg("Node has an open streaming session, replacing")
oldSession.close()
}

ns.headscale.mapSessions[node.ID] = session
ns.headscale.mapSessionMu.Unlock()
}

session.serve()
}
8 changes: 4 additions & 4 deletions hscontrol/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ func (m *mapSession) serve() {

// consume channels with update, keep alives or "batch" blocking signals
select {
// Avoid infinite block that would potentially leave
case <-m.cancelCh:
return

// Avoid infinite block that would potentially leave
// some updates in the changed map.
case <-blockBreaker.C:
continue
Expand Down Expand Up @@ -398,10 +401,7 @@ func (m *mapSession) serve() {
// The connection has been closed, so we can stop polling.
return

case <-m.cancelCh:
return
}

}
}

Expand Down

0 comments on commit f870ea8

Please sign in to comment.