Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-655 #628

Merged
merged 25 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b6aff1b
NET-655 draft
uGiFarukh Nov 10, 2023
41756c5
* fixes for NET-655
uGiFarukh Nov 13, 2023
974b0c6
added fallback pull function
uGiFarukh Nov 15, 2023
7acc520
fixed server version empty string
uGiFarukh Nov 16, 2023
3ae167f
fallback mechanism optimization
uGiFarukh Nov 17, 2023
d6b95a6
further optimization of mq fallback goroutine
uGiFarukh Nov 21, 2023
0f3e55a
reset interface if new node
uGiFarukh Nov 22, 2023
2e6ab77
fallback goroutine context and waitgroup changed
uGiFarukh Nov 27, 2023
7d79582
mq connection re-attempt after pull
uGiFarukh Nov 27, 2023
57602fa
reset interface network change scenario
uGiFarukh Nov 29, 2023
737a095
Merge branch 'develop' into NET-655
uGiFarukh Nov 29, 2023
5e718ab
resolving merge conflicts
uGiFarukh Nov 29, 2023
8b99920
changed the network ip change check logic
uGiFarukh Dec 1, 2023
a2318cf
no current server skip tick logic changed
uGiFarukh Dec 12, 2023
a9f8b77
resolve merge conflicts
abhishek9686 Dec 14, 2023
92f3e5f
checking through api during fallback and replace peers when neccessary
abhishek9686 Dec 14, 2023
19a5a9d
add node cfg mutex
abhishek9686 Dec 15, 2023
74f1491
sever cfg mutex lock
abhishek9686 Dec 18, 2023
c11e12c
netclient cfg mutex lock
abhishek9686 Dec 18, 2023
f37008a
replace peers when required
abhishek9686 Dec 18, 2023
6ce79ca
update mq password on pull
abhishek9686 Dec 19, 2023
17b5c82
set host creds when broker is emqx
abhishek9686 Dec 19, 2023
e44b39f
set host creds for mq connection when emqx
abhishek9686 Dec 20, 2023
63ffd93
Merge branch 'develop' of https://github.com/gravitl/netclient into N…
abhishek9686 Dec 20, 2023
07b0164
resolve merge conflicts
abhishek9686 Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion functions/daemon.go
abhishek9686 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
go Checkin(ctx, wg)
wg.Add(1)
go networking.StartIfaceDetection(ctx, wg, config.Netclient().ListenPort)

// MQTT Fallback Goroutine
wg.Add(1)
go MQFallback(ctx, wg)

return cancel
}

Expand Down Expand Up @@ -249,7 +254,6 @@ func setupMQTT(server *config.Server) error {
nil,
)
}

// restart daemon for new udp hole punch if MQTT connection is lost (can happen on network change)
if !config.Netclient().IsStatic {
daemon.Restart()
Expand Down
88 changes: 88 additions & 0 deletions functions/mqhandlers.go
uGiFarukh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package functions

import (
"context"
"encoding/json"
"log"
"net"
"os"
"strings"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -26,6 +28,9 @@ import (
// MQTimeout - time out for mqtt connections
const MQTimeout = 30

// MQTT Fallback Routine
var mqFallbackTicker *time.Ticker = time.NewTicker(time.Second * 30)

abhishek9686 marked this conversation as resolved.
Show resolved Hide resolved
// All -- mqtt message hander for all ('#') topics
var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
slog.Info("default message handler -- received message but not handling", "topic", msg.Topic())
Expand Down Expand Up @@ -573,3 +578,86 @@ func handleFwUpdate(server string, payload *models.FwUpdate) {
}

}

// MQTT Fallback Mechanism
func MQFallback(ctx context.Context, wg *sync.WaitGroup) {
uGiFarukh marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
for {
select {
case <-ctx.Done():
mqFallbackTicker.Stop()
return
case <-mqFallbackTicker.C: // Execute pull every 30 seconds
if Mqclient == nil || !Mqclient.IsConnectionOpen() {
// Call netclient http config pull
response, err := Pull(false)
if err != nil {
slog.Error("pull failed", "error", err)
} else {
MQFallbackPull(response)
}
}
}
}
}

// MQTT Fallback Config Pull
func MQFallbackPull(pullResponse models.HostPull) {
serverName := config.CurrServer
server := config.GetServer(serverName)
if server == nil {
slog.Error("server not found in config", "server", serverName)
return
}
if server.UseTurn {
turn.ResetCh <- struct{}{}
}
if pullResponse.ServerConfig.Version != config.Version {
slog.Warn("server/client version mismatch", "server", pullResponse.ServerConfig.Version, "client", config.Version)
vlt, err := versionLessThan(config.Version, pullResponse.ServerConfig.Version)
if err != nil {
slog.Error("error checking version less than", "error", err)
return
}
if vlt && config.Netclient().Host.AutoUpdate {
slog.Info("updating client to server's version", "version", pullResponse.ServerConfig.Version)
if err := UseVersion(pullResponse.ServerConfig.Version, false); err != nil {
slog.Error("error updating client to server's version", "error", err)
} else {
slog.Info("updated client to server's version", "version", pullResponse.ServerConfig.Version)
daemon.HardRestart()
}
}
}
if pullResponse.ServerConfig.Version != server.Version {
slog.Info("updating server version", "server", serverName, "version", pullResponse.ServerConfig.Version)
server.Version = pullResponse.ServerConfig.Version
config.WriteServerConfig()
}
gwDetected := config.GW4PeerDetected || config.GW6PeerDetected
currentGW4 := config.GW4Addr
currentGW6 := config.GW6Addr
isInetGW := config.UpdateHostPeers(pullResponse.Peers)
_ = config.WriteNetclientConfig()
_ = wireguard.SetPeers(false)
if len(pullResponse.EgressRoutes) > 0 {
wireguard.SetEgressRoutes(pullResponse.EgressRoutes)
}
if err := routes.SetNetmakerPeerEndpointRoutes(config.Netclient().DefaultInterface); err != nil {
slog.Warn("error when setting peer routes on mq fallback pull", "error", err)
}
gwDelta := (currentGW4.IP != nil && !currentGW4.IP.Equal(config.GW4Addr.IP)) ||
(currentGW6.IP != nil && !currentGW6.IP.Equal(config.GW6Addr.IP))
originalGW := currentGW4
if originalGW.IP != nil {
originalGW = currentGW6
}
handlePeerInetGateways(
gwDetected,
isInetGW,
gwDelta,
&originalGW,
)
go handleEndpointDetection(pullResponse.Peers, pullResponse.HostNetworkInfo)
handleFwUpdate(serverName, &pullResponse.FwUpdate)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/google/nftables v0.1.0
github.com/google/uuid v1.4.0
github.com/gorilla/websocket v1.5.0
github.com/gravitl/netmaker v0.21.2-0.20231031062230-896041e44236
github.com/gravitl/netmaker v0.21.2-0.20231116105900-9c7e3dfec85e
github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06
github.com/gravitl/txeh v0.0.0-20230509181318-3778c58bd69f
github.com/guumaster/hostctl v1.1.4
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ github.com/gravitl/netmaker v0.21.1-0.20231002153631-3f1211795c2c h1:kmjMZcY8HNt
github.com/gravitl/netmaker v0.21.1-0.20231002153631-3f1211795c2c/go.mod h1:99ewh2N2nkrsqCr3l6s6t8RrPF7emACjE6LPPy4w2pM=
github.com/gravitl/netmaker v0.21.2-0.20231031062230-896041e44236 h1:OnXwXGbeMR2f0qJja2zo/axyUouJDEyXCcg0rc2qqWk=
github.com/gravitl/netmaker v0.21.2-0.20231031062230-896041e44236/go.mod h1:XSuEtCr4ZBT2mZlTHVqTZhCPlXWq6W+iEILwC5yuLtw=
github.com/gravitl/netmaker v0.21.2-0.20231115080052-d9a708840602 h1:w3dfpS0mD8/0Df4oyOf0ybuT5yRhkv+om12nvsA2Iqw=
github.com/gravitl/netmaker v0.21.2-0.20231115080052-d9a708840602/go.mod h1:KaMjQ32quN84tGTPmN9qIVZSEQFpd1AFROFgMRnlUss=
github.com/gravitl/netmaker v0.21.2-0.20231116105900-9c7e3dfec85e h1:Q54TZkH3AOubMXnsqF7tsppcDc7QNCVcaNbqvwXO1IA=
github.com/gravitl/netmaker v0.21.2-0.20231116105900-9c7e3dfec85e/go.mod h1:KaMjQ32quN84tGTPmN9qIVZSEQFpd1AFROFgMRnlUss=
github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06 h1:g2fBXRNT9eiQohyHcoME3SVmeG7OKoJPWrs7A+009kU=
github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06/go.mod h1:12iViYKWAzRPj5/oEGAaD7Wje+Nuz8M9eDJbV7qhKAA=
github.com/gravitl/txeh v0.0.0-20230509181318-3778c58bd69f h1:XzsYovKdrDvj2z2HEHoeHU67+JIEFMHQKHU6oU+1fVE=
Expand Down