Skip to content

Commit

Permalink
apply patches after changes if not in peer list
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Nov 24, 2023
1 parent 99d76a2 commit 29053cc
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 7 deletions.
5 changes: 4 additions & 1 deletion hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
}

if derpServerKey.Equal(*noisePrivateKey) {
return nil, fmt.Errorf("DERP server private key and noise private key are the same: %w", err)
return nil, fmt.Errorf(
"DERP server private key and noise private key are the same: %w",
err,
)
}

embeddedDERPServer, err := derpServer.NewDERPServer(
Expand Down
6 changes: 4 additions & 2 deletions hscontrol/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ func NewHeadscaleDatabase(
dKey = "discokey:" + node.DiscoKey
}

err := db.db.Exec("UPDATE nodes SET machine_key = @mKey, node_key = @nKey, disco_key = @dKey WHERE ID = @id",
err := db.db.Exec(
"UPDATE nodes SET machine_key = @mKey, node_key = @nKey, disco_key = @dKey WHERE ID = @id",
sql.Named("mKey", mKey),
sql.Named("nKey", nKey),
sql.Named("dKey", dKey),
sql.Named("id", node.ID)).Error
sql.Named("id", node.ID),
).Error
if err != nil {
return nil, err
}
Expand Down
74 changes: 70 additions & 4 deletions hscontrol/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"path"
"slices"
"sort"
"strings"
"sync"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"golang.org/x/exp/maps"
"tailscale.com/envknob"
"tailscale.com/smallzstd"
"tailscale.com/tailcfg"
Expand Down Expand Up @@ -62,8 +64,14 @@ type Mapper struct {

// Map isnt concurrency safe, so we need to ensure
// only one func is accessing it over time.
mu sync.Mutex
peers map[uint64]*types.Node
mu sync.Mutex
peers map[uint64]*types.Node
patches map[uint64][]patch
}

type patch struct {
timestamp time.Time
change *tailcfg.PeerChange
}

func NewMapper(
Expand Down Expand Up @@ -236,6 +244,19 @@ func (m *Mapper) FullMapResponse(
m.mu.Lock()
defer m.mu.Unlock()

peers := maps.Keys(m.peers)
peersWithPatches := maps.Keys(m.patches)
slices.Sort(peers)
slices.Sort(peersWithPatches)

if len(peersWithPatches) > 0 {
log.Debug().
Str("node", node.Hostname).
Uints64("peers", peers).
Uints64("pending_patches", peersWithPatches).
Msgf("node requested full map response, but has pending patches")
}

resp, err := m.fullMapResponse(node, pol, mapRequest.Version)
if err != nil {
return nil, err
Expand Down Expand Up @@ -294,6 +315,21 @@ func (m *Mapper) PeerChangedResponse(

// Update our internal map.
for _, node := range changed {
if patches, ok := m.patches[node.ID]; ok {
// preserve online status in case the patch has an outdated one
online := node.IsOnline

for _, p := range patches {
// TODO(kradalby): Figure if this needs to be sorted by timestamp
node.ApplyPeerChange(p.change)
}

// Ensure the patches are not applied again later
delete(m.patches, node.ID)

node.IsOnline = online
}

m.peers[node.ID] = node
}

Expand Down Expand Up @@ -331,6 +367,21 @@ func (m *Mapper) PeerChangedWithoutACLResponse(

// Update our internal map.
for _, node := range changed {
if patches, ok := m.patches[node.ID]; ok {
// preserve online status in case the patch has an outdated one
online := node.IsOnline

for _, p := range patches {
// TODO(kradalby): Figure if this needs to be sorted by timestamp
node.ApplyPeerChange(p.change)
}

// Ensure the patches are not applied again later
delete(m.patches, node.ID)

node.IsOnline = online
}

m.peers[node.ID] = node
}

Expand Down Expand Up @@ -367,9 +418,23 @@ func (m *Mapper) PeerChangedPatchResponse(
m.mu.Lock()
defer m.mu.Unlock()

sendUpdate := false
// patch the internal map
for _, patch := range changed {
m.peers[uint64(patch.NodeID)].ApplyPeerChange(patch)
for _, change := range changed {
if peer, ok := m.peers[uint64(change.NodeID)]; ok {
peer.ApplyPeerChange(change)
sendUpdate = true
} else {
log.Trace().Str("node", node.Hostname).Msgf("Node with ID %s is missing from mapper for Node %s, saving change...", change.NodeID, node.Hostname)
m.patches[uint64(change.NodeID)] = append(m.patches[uint64(change.NodeID)], patch{
timestamp: time.Now(),
change: change,
})
}
}

if !sendUpdate {
return nil, nil
}

resp := m.baseMapResponse()
Expand All @@ -378,6 +443,7 @@ func (m *Mapper) PeerChangedPatchResponse(
return m.marshalMapResponse(mapRequest, &resp, node, mapRequest.Compress)
}

// TODO(kradalby): We need some integration tests for this.
func (m *Mapper) PeerRemovedResponse(
mapRequest tailcfg.MapRequest,
node *types.Node,
Expand Down
15 changes: 15 additions & 0 deletions integration/tsic/tsic.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ func (t *TailscaleInContainer) hasTLS() bool {

// Shutdown stops and cleans up the Tailscale container.
func (t *TailscaleInContainer) Shutdown() error {
err := t.SaveLog("/tmp/control")
if err != nil {
log.Printf(
"Failed to save log from %s: %s",
t.hostname,
fmt.Errorf("failed to save log: %w", err),
)
}

return t.pool.Purge(t.container)
}

Expand Down Expand Up @@ -812,3 +821,9 @@ func (t *TailscaleInContainer) Curl(url string, opts ...CurlOption) (string, err
func (t *TailscaleInContainer) WriteFile(path string, data []byte) error {
return integrationutil.WriteFileToContainer(t.pool, t.container, path, data)
}

// SaveLog saves the current stdout log of the container to a path
// on the host system.
func (t *TailscaleInContainer) SaveLog(path string) error {
return dockertestutil.SaveLog(t.pool, t.container, path)
}

0 comments on commit 29053cc

Please sign in to comment.