Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-655 #628

Merged
merged 25 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b6aff1b
NET-655 draft
uGiFarukh Nov 10, 2023
41756c5
* fixes for NET-655
uGiFarukh Nov 13, 2023
974b0c6
added fallback pull function
uGiFarukh Nov 15, 2023
7acc520
fixed server version empty string
uGiFarukh Nov 16, 2023
3ae167f
fallback mechanism optimization
uGiFarukh Nov 17, 2023
d6b95a6
further optimization of mq fallback goroutine
uGiFarukh Nov 21, 2023
0f3e55a
reset interface if new node
uGiFarukh Nov 22, 2023
2e6ab77
fallback goroutine context and waitgroup changed
uGiFarukh Nov 27, 2023
7d79582
mq connection re-attempt after pull
uGiFarukh Nov 27, 2023
57602fa
reset interface network change scenario
uGiFarukh Nov 29, 2023
737a095
Merge branch 'develop' into NET-655
uGiFarukh Nov 29, 2023
5e718ab
resolving merge conflicts
uGiFarukh Nov 29, 2023
8b99920
changed the network ip change check logic
uGiFarukh Dec 1, 2023
a2318cf
no current server skip tick logic changed
uGiFarukh Dec 12, 2023
a9f8b77
resolve merge conflicts
abhishek9686 Dec 14, 2023
92f3e5f
checking through api during fallback and replace peers when neccessary
abhishek9686 Dec 14, 2023
19a5a9d
add node cfg mutex
abhishek9686 Dec 15, 2023
74f1491
sever cfg mutex lock
abhishek9686 Dec 18, 2023
c11e12c
netclient cfg mutex lock
abhishek9686 Dec 18, 2023
f37008a
replace peers when required
abhishek9686 Dec 18, 2023
6ce79ca
update mq password on pull
abhishek9686 Dec 19, 2023
17b5c82
set host creds when broker is emqx
abhishek9686 Dec 19, 2023
e44b39f
set host creds for mq connection when emqx
abhishek9686 Dec 20, 2023
63ffd93
Merge branch 'develop' of https://github.com/gravitl/netclient into N…
abhishek9686 Dec 20, 2023
07b0164
resolve merge conflicts
abhishek9686 Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var pullCmd = &cobra.Command{
Short: "get the latest host configuration",
Long: `get the latest host configuration and peers from all connected servers`,
Run: func(cmd *cobra.Command, args []string) {
_, err := functions.Pull(true)
_, _, err := functions.Pull(true)
if err != nil {
logger.Log(0, "failed to pull", err.Error())
}
Expand Down
9 changes: 7 additions & 2 deletions functions/daemon.go
abhishek9686 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func Daemon() {
httpWg := sync.WaitGroup{}
httpWg.Add(1)
go HttpServer(httpctx, &httpWg)

// MQTT Fallback Goroutine
httpWg.Add(1)
go mqFallback(httpctx, &httpWg)

for {
select {
case <-quit:
Expand Down Expand Up @@ -151,7 +156,7 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
}
}
slog.Info("configuring netmaker wireguard interface")
pullresp, pullErr := Pull(false)
pullresp, _, pullErr := Pull(false)
if pullErr != nil {
slog.Error("fail to pull config from server", "error", pullErr.Error())
}
Expand Down Expand Up @@ -187,6 +192,7 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
go Checkin(ctx, wg)
wg.Add(1)
go networking.StartIfaceDetection(ctx, wg, config.Netclient().ListenPort)

return cancel
}

Expand Down Expand Up @@ -249,7 +255,6 @@ func setupMQTT(server *config.Server) error {
nil,
)
}

// restart daemon for new udp hole punch if MQTT connection is lost (can happen on network change)
if !config.Netclient().IsStatic {
daemon.Restart()
Expand Down
2 changes: 1 addition & 1 deletion functions/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func uninstall(c *gin.Context) {

func pull(c *gin.Context) {
net := c.Params.ByName("net")
_, err := Pull(true)
_, _, err := Pull(true)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
}
Expand Down
122 changes: 122 additions & 0 deletions functions/mqhandlers.go
uGiFarukh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package functions

import (
"context"
"encoding/json"
"log"
"net"
"os"
"strings"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -26,6 +28,9 @@ import (
// MQTimeout - time out for mqtt connections
const MQTimeout = 30

// MQTT Fallback Routine
var mqFallbackTicker *time.Ticker = time.NewTicker(time.Second * 30)

abhishek9686 marked this conversation as resolved.
Show resolved Hide resolved
// All -- mqtt message hander for all ('#') topics
var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
slog.Info("default message handler -- received message but not handling", "topic", msg.Topic())
Expand Down Expand Up @@ -573,3 +578,120 @@ func handleFwUpdate(server string, payload *models.FwUpdate) {
}

}

// MQTT Fallback Mechanism
func mqFallback(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
mqFallbackTicker.Stop()
slog.Info("mqfallback routine stop")
return
case <-mqFallbackTicker.C: // Execute pull every 30 seconds
if config.CurrServer == "" {
// skip tick if there is no server
continue
}
if Mqclient == nil || !Mqclient.IsConnectionOpen() {
// Call netclient http config pull
slog.Info("mqfallback routine execute")
response, resetInterface, err := Pull(false)
if err != nil {
slog.Error("pull failed", "error", err)
} else {
mqFallbackPull(response, resetInterface)
server := config.GetServer(config.CurrServer)
if server == nil {
continue
}
slog.Info("re-attempt mqtt connection after pull")
if Mqclient != nil {
Mqclient.Disconnect(0)
}
if err := setupMQTT(server); err != nil {
slog.Error("unable to connect to broker", "server", server.Broker, "error", err)
}
}
}
}
}
}

// MQTT Fallback Config Pull
func mqFallbackPull(pullResponse models.HostPull, resetInterface bool) {
serverName := config.CurrServer
server := config.GetServer(serverName)
if server == nil {
slog.Error("server not found in config", "server", serverName)
return
}
if server.UseTurn {
turn.ResetCh <- struct{}{}
}
if pullResponse.ServerConfig.Version != config.Version {
slog.Warn("server/client version mismatch", "server", pullResponse.ServerConfig.Version, "client", config.Version)
vlt, err := versionLessThan(config.Version, pullResponse.ServerConfig.Version)
if err != nil {
slog.Error("error checking version less than", "error", err)
return
}
if vlt && config.Netclient().Host.AutoUpdate {
slog.Info("updating client to server's version", "version", pullResponse.ServerConfig.Version)
if err := UseVersion(pullResponse.ServerConfig.Version, false); err != nil {
slog.Error("error updating client to server's version", "error", err)
} else {
slog.Info("updated client to server's version", "version", pullResponse.ServerConfig.Version)
daemon.HardRestart()
}
}
}
if pullResponse.ServerConfig.Version != server.Version {
slog.Info("updating server version", "server", serverName, "version", pullResponse.ServerConfig.Version)
server.Version = pullResponse.ServerConfig.Version
config.WriteServerConfig()
}
gwDetected := config.GW4PeerDetected || config.GW6PeerDetected
currentGW4 := config.GW4Addr
currentGW6 := config.GW6Addr
isInetGW := config.UpdateHostPeers(pullResponse.Peers)
_ = config.WriteNetclientConfig()
_ = wireguard.SetPeers(false)
if len(pullResponse.EgressRoutes) > 0 {
wireguard.SetEgressRoutes(pullResponse.EgressRoutes)
}
if err := routes.SetNetmakerPeerEndpointRoutes(config.Netclient().DefaultInterface); err != nil {
slog.Warn("error when setting peer routes on mq fallback pull", "error", err)
}
gwDelta := (currentGW4.IP != nil && !currentGW4.IP.Equal(config.GW4Addr.IP)) ||
(currentGW6.IP != nil && !currentGW6.IP.Equal(config.GW6Addr.IP))
originalGW := currentGW4
if originalGW.IP != nil {
originalGW = currentGW6
}
handlePeerInetGateways(
gwDetected,
isInetGW,
gwDelta,
&originalGW,
)
go handleEndpointDetection(pullResponse.Peers, pullResponse.HostNetworkInfo)
handleFwUpdate(serverName, &pullResponse.FwUpdate)

if resetInterface {
nc := wireguard.GetInterface()
nc.Close()
nc = wireguard.NewNCIface(config.Netclient(), config.GetNodes())
nc.Create()
if err := nc.Configure(); err != nil {
slog.Error("could not configure netmaker interface", "error", err)
return
}
if err := wireguard.SetPeers(false); err == nil {
if err = routes.SetNetmakerPeerEndpointRoutes(config.Netclient().DefaultInterface); err != nil {
slog.Error("error when setting peer routes after host update", "error", err)
}
}
slog.Info("mqfallback reset interface")
}
}
35 changes: 28 additions & 7 deletions functions/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@ import (
"github.com/gravitl/netclient/auth"
"github.com/gravitl/netclient/config"
"github.com/gravitl/netclient/daemon"
"github.com/gravitl/netclient/wireguard"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models"
)

// Pull - pulls the latest config from the server, if manual it will overwrite
func Pull(restart bool) (models.HostPull, error) {

func Pull(restart bool) (models.HostPull, bool, error) {
resetInterface := false
serverName := config.CurrServer
server := config.GetServer(serverName)
if server == nil {
return models.HostPull{}, errors.New("server config not found")
return models.HostPull{}, resetInterface, errors.New("server config not found")
}
token, err := auth.Authenticate(server, config.Netclient())
if err != nil {
return models.HostPull{}, err
return models.HostPull{}, resetInterface, err
}
endpoint := httpclient.JSONEndpoint[models.HostPull, models.ErrorResponse]{
URL: "https://" + server.API,
Expand All @@ -39,8 +40,28 @@ func Pull(restart bool) (models.HostPull, error) {
if errors.Is(err, httpclient.ErrStatus) {
logger.Log(0, "error pulling server", serverName, strconv.Itoa(errData.Code), errData.Message)
}
return models.HostPull{}, err
return models.HostPull{}, resetInterface, err
}

// MQTT Fallback Reset Interface
var hostPullNode config.Node
for _, pullNode := range pullResponse.Nodes {
hostPullNode.CommonNode = pullNode.CommonNode
nodeMap := config.GetNodes()
currNode, ok := nodeMap[pullNode.Network]
if !ok {
resetInterface = true
break
}
if wireguard.IfaceDelta(&hostPullNode, &currNode) {
resetInterface = true
break
}
}
if len(config.GetNodes()) != len(pullResponse.Nodes) {
resetInterface = true
}

_ = config.UpdateHostPeers(pullResponse.Peers)
pullResponse.ServerConfig.MQPassword = server.MQPassword // pwd can't change currently
config.UpdateServerConfig(&pullResponse.ServerConfig)
Expand All @@ -52,7 +73,7 @@ func Pull(restart bool) (models.HostPull, error) {
_ = config.WriteNodeConfig()
if restart {
logger.Log(3, "restarting daemon")
return models.HostPull{}, daemon.Restart()
return models.HostPull{}, resetInterface, daemon.Restart()
}
return pullResponse, nil
return pullResponse, resetInterface, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/google/nftables v0.1.0
github.com/google/uuid v1.4.0
github.com/gorilla/websocket v1.5.0
github.com/gravitl/netmaker v0.21.2-0.20231031062230-896041e44236
github.com/gravitl/netmaker v0.21.2-0.20231122125326-b17aeff3ef3d
github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06
github.com/gravitl/txeh v0.0.0-20230509181318-3778c58bd69f
github.com/guumaster/hostctl v1.1.4
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ github.com/gravitl/netmaker v0.21.1-0.20231002153631-3f1211795c2c h1:kmjMZcY8HNt
github.com/gravitl/netmaker v0.21.1-0.20231002153631-3f1211795c2c/go.mod h1:99ewh2N2nkrsqCr3l6s6t8RrPF7emACjE6LPPy4w2pM=
github.com/gravitl/netmaker v0.21.2-0.20231031062230-896041e44236 h1:OnXwXGbeMR2f0qJja2zo/axyUouJDEyXCcg0rc2qqWk=
github.com/gravitl/netmaker v0.21.2-0.20231031062230-896041e44236/go.mod h1:XSuEtCr4ZBT2mZlTHVqTZhCPlXWq6W+iEILwC5yuLtw=
github.com/gravitl/netmaker v0.21.2-0.20231115080052-d9a708840602 h1:w3dfpS0mD8/0Df4oyOf0ybuT5yRhkv+om12nvsA2Iqw=
github.com/gravitl/netmaker v0.21.2-0.20231115080052-d9a708840602/go.mod h1:KaMjQ32quN84tGTPmN9qIVZSEQFpd1AFROFgMRnlUss=
github.com/gravitl/netmaker v0.21.2-0.20231116105900-9c7e3dfec85e h1:Q54TZkH3AOubMXnsqF7tsppcDc7QNCVcaNbqvwXO1IA=
github.com/gravitl/netmaker v0.21.2-0.20231116105900-9c7e3dfec85e/go.mod h1:KaMjQ32quN84tGTPmN9qIVZSEQFpd1AFROFgMRnlUss=
github.com/gravitl/netmaker v0.21.2-0.20231122125326-b17aeff3ef3d h1:jVKGRmEC3NXWPDU8v4X1icaFc4RvnSIdZTbgYAVa+Dc=
github.com/gravitl/netmaker v0.21.2-0.20231122125326-b17aeff3ef3d/go.mod h1:KaMjQ32quN84tGTPmN9qIVZSEQFpd1AFROFgMRnlUss=
github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06 h1:g2fBXRNT9eiQohyHcoME3SVmeG7OKoJPWrs7A+009kU=
github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06/go.mod h1:12iViYKWAzRPj5/oEGAaD7Wje+Nuz8M9eDJbV7qhKAA=
github.com/gravitl/txeh v0.0.0-20230509181318-3778c58bd69f h1:XzsYovKdrDvj2z2HEHoeHU67+JIEFMHQKHU6oU+1fVE=
Expand Down
2 changes: 1 addition & 1 deletion gui/exported.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (app *App) GoWriteToClipboard(data string) (any, error) {

// App.GoPullLatestNodeConfig pulls the latest node config from the server and returns the network config
func (app *App) GoPullLatestNodeConfig(network string) (Network, error) {
_, err := functions.Pull(true)
_, _, err := functions.Pull(true)
if err != nil {
return Network{}, err
}
Expand Down