Skip to content

Commit

Permalink
rework mapsession
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Apr 9, 2024
1 parent 8a8e25a commit 528799e
Show file tree
Hide file tree
Showing 35 changed files with 1,540 additions and 1,436 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ jobs:
- TestTaildrop
- TestResolveMagicDNS
- TestExpireNode
- TestNodeOnlineLastSeenStatus
- TestNodeOnlineStatus
- TestPingAllByIPManyUpDown
- TestEnablingRoutes
- TestHASubnetRouterFailover
- TestEnableDisableAutoApprovedRoute
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ require (
github.com/opencontainers/image-spec v1.1.0-rc6 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand All @@ -161,6 +162,7 @@ require (
github.com/safchain/ethtool v0.3.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/philip-bui/grpc-zerolog v1.0.1 h1:EMacvLRUd2O1K0eWod27ZP5CY1iTNkhBDLSN+Q4JEvA=
github.com/philip-bui/grpc-zerolog v1.0.1/go.mod h1:qXbiq/2X4ZUMMshsqlWyTHOcw7ns+GZmlqZZN05ZHcQ=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
Expand Down Expand Up @@ -392,6 +394,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
Expand Down
61 changes: 41 additions & 20 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/juanfont/headscale/hscontrol/db"
"github.com/juanfont/headscale/hscontrol/derp"
derpServer "github.com/juanfont/headscale/hscontrol/derp/server"
"github.com/juanfont/headscale/hscontrol/mapper"
"github.com/juanfont/headscale/hscontrol/notifier"
"github.com/juanfont/headscale/hscontrol/policy"
"github.com/juanfont/headscale/hscontrol/types"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
zl "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/sasha-s/go-deadlock"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/oauth2"
Expand Down Expand Up @@ -77,6 +79,11 @@ const (
registerCacheCleanup = time.Minute * 20
)

func init() {
deadlock.Opts.DeadlockTimeout = 15 * time.Second
deadlock.Opts.PrintAllCurrentGoroutines = true
}

// Headscale represents the base app of the service.
type Headscale struct {
cfg *types.Config
Expand All @@ -89,15 +96,18 @@ type Headscale struct {

ACLPolicy *policy.ACLPolicy

mapper *mapper.Mapper
nodeNotifier *notifier.Notifier

oidcProvider *oidc.Provider
oauth2Config *oauth2.Config

registrationCache *cache.Cache

shutdownChan chan struct{}
pollNetMapStreamWG sync.WaitGroup

mapSessions map[types.NodeID]*mapSession
mapSessionMu deadlock.Mutex
}

var (
Expand Down Expand Up @@ -129,6 +139,7 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
registrationCache: registrationCache,
pollNetMapStreamWG: sync.WaitGroup{},
nodeNotifier: notifier.NewNotifier(),
mapSessions: make(map[types.NodeID]*mapSession),
}

app.db, err = db.NewHeadscaleDatabase(
Expand Down Expand Up @@ -199,26 +210,37 @@ func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, target, http.StatusFound)
}

// expireEphemeralNodes deletes ephemeral node records that have not been
// deleteExpireEphemeralNodes deletes ephemeral node records that have not been
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
func (h *Headscale) expireEphemeralNodes(milliSeconds int64) {
func (h *Headscale) deleteExpireEphemeralNodes(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)

var update types.StateUpdate
var changed bool
for range ticker.C {
var removed []types.NodeID
var changed []types.NodeID
if err := h.db.DB.Transaction(func(tx *gorm.DB) error {
update, changed = db.ExpireEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)

return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
continue
}

if changed && update.Valid() {
if removed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, update)
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: removed,
})
}

if changed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: changed,
})
}
}
}
Expand All @@ -243,8 +265,9 @@ func (h *Headscale) expireExpiredMachines(intervalMs int64) {
continue
}

log.Trace().Str("nodes", update.ChangeNodes.String()).Msgf("expiring nodes")
if changed && update.Valid() {
if changed {
log.Trace().Interface("nodes", update.ChangePatches).Msgf("expiring nodes")

ctx := types.NotifyCtx(context.Background(), "expire-expired", "na")
h.nodeNotifier.NotifyAll(ctx, update)
}
Expand Down Expand Up @@ -272,14 +295,11 @@ func (h *Headscale) scheduledDERPMapUpdateWorker(cancelChan <-chan struct{}) {
h.DERPMap.Regions[region.RegionID] = &region
}

stateUpdate := types.StateUpdate{
ctx := types.NotifyCtx(context.Background(), "derpmap-update", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StateDERPUpdated,
DERPMap: h.DERPMap,
}
if stateUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "derpmap-update", "na")
h.nodeNotifier.NotifyAll(ctx, stateUpdate)
}
})
}
}
}
Expand Down Expand Up @@ -502,6 +522,7 @@ func (h *Headscale) Serve() error {

// Fetch an initial DERP Map before we start serving
h.DERPMap = derp.GetDERPMap(h.cfg.DERP)
h.mapper = mapper.NewMapper(h.db, h.cfg, h.DERPMap, h.nodeNotifier.ConnectedMap())

if h.cfg.DERP.ServerEnabled {
// When embedded DERP is enabled we always need a STUN server
Expand Down Expand Up @@ -533,7 +554,7 @@ func (h *Headscale) Serve() error {

// TODO(kradalby): These should have cancel channels and be cleaned
// up on shutdown.
go h.expireEphemeralNodes(updateInterval)
go h.deleteExpireEphemeralNodes(updateInterval)
go h.expireExpiredMachines(updateInterval)

if zl.GlobalLevel() == zl.TraceLevel {
Expand Down Expand Up @@ -686,6 +707,9 @@ func (h *Headscale) Serve() error {
// no good way to handle streaming timeouts, therefore we need to
// keep this at unlimited and be careful to clean up connections
// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/#aboutstreaming
// TODO(kradalby): this timeout can now be set per handler with http.ResponseController:
// https://www.alexedwards.net/blog/how-to-use-the-http-responsecontroller-type
// replace this so only the longpoller has no timeout.
WriteTimeout: 0,
}

Expand Down Expand Up @@ -742,7 +766,6 @@ func (h *Headscale) Serve() error {
}

// Handle common process-killing signals so we can gracefully shut down:
h.shutdownChan = make(chan struct{})
sigc := make(chan os.Signal, 1)
signal.Notify(sigc,
syscall.SIGHUP,
Expand Down Expand Up @@ -785,8 +808,6 @@ func (h *Headscale) Serve() error {
Str("signal", sig.String()).
Msg("Received signal to stop, shutting down gracefully")

close(h.shutdownChan)

h.pollNetMapStreamWG.Wait()

// Gracefully shut down servers
Expand Down
33 changes: 14 additions & 19 deletions hscontrol/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,8 @@ func (h *Headscale) handleAuthKey(
}
}

mkey := node.MachineKey
update := types.StateUpdateExpire(node.ID, registerRequest.Expiry)

if update.Valid() {
ctx := types.NotifyCtx(context.Background(), "handle-authkey", "na")
h.nodeNotifier.NotifyWithIgnore(ctx, update, mkey.String())
}
ctx := types.NotifyCtx(context.Background(), "handle-authkey", "na")
h.nodeNotifier.NotifyWithIgnore(ctx, types.StateUpdateExpire(node.ID, registerRequest.Expiry), node.ID)
} else {
now := time.Now().UTC()

Expand Down Expand Up @@ -538,11 +533,8 @@ func (h *Headscale) handleNodeLogOut(
return
}

stateUpdate := types.StateUpdateExpire(node.ID, now)
if stateUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "logout-expiry", "na")
h.nodeNotifier.NotifyWithIgnore(ctx, stateUpdate, node.MachineKey.String())
}
ctx := types.NotifyCtx(context.Background(), "logout-expiry", "na")
h.nodeNotifier.NotifyWithIgnore(ctx, types.StateUpdateExpire(node.ID, now), node.ID)

resp.AuthURL = ""
resp.MachineAuthorized = false
Expand Down Expand Up @@ -572,21 +564,24 @@ func (h *Headscale) handleNodeLogOut(
}

if node.IsEphemeral() {
err = h.db.DeleteNode(&node, h.nodeNotifier.ConnectedMap())
changedNodes, err := h.db.DeleteNode(&node, h.nodeNotifier.ConnectedMap())
if err != nil {
log.Error().
Err(err).
Str("node", node.Hostname).
Msg("Cannot delete ephemeral node from the database")
}

stateUpdate := types.StateUpdate{
ctx := types.NotifyCtx(context.Background(), "logout-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: []tailcfg.NodeID{tailcfg.NodeID(node.ID)},
}
if stateUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "logout-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, stateUpdate)
Removed: []types.NodeID{node.ID},
})
if changedNodes != nil {
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: changedNodes,
})
}

return
Expand Down
Loading

0 comments on commit 528799e

Please sign in to comment.