Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-655 #628

Merged
merged 25 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b6aff1b
NET-655 draft
uGiFarukh Nov 10, 2023
41756c5
* fixes for NET-655
uGiFarukh Nov 13, 2023
974b0c6
added fallback pull function
uGiFarukh Nov 15, 2023
7acc520
fixed server version empty string
uGiFarukh Nov 16, 2023
3ae167f
fallback mechanism optimization
uGiFarukh Nov 17, 2023
d6b95a6
further optimization of mq fallback goroutine
uGiFarukh Nov 21, 2023
0f3e55a
reset interface if new node
uGiFarukh Nov 22, 2023
2e6ab77
fallback goroutine context and waitgroup changed
uGiFarukh Nov 27, 2023
7d79582
mq connection re-attempt after pull
uGiFarukh Nov 27, 2023
57602fa
reset interface network change scenario
uGiFarukh Nov 29, 2023
737a095
Merge branch 'develop' into NET-655
uGiFarukh Nov 29, 2023
5e718ab
resolving merge conflicts
uGiFarukh Nov 29, 2023
8b99920
changed the network ip change check logic
uGiFarukh Dec 1, 2023
a2318cf
no current server skip tick logic changed
uGiFarukh Dec 12, 2023
a9f8b77
resolve merge conflicts
abhishek9686 Dec 14, 2023
92f3e5f
checking through api during fallback and replace peers when neccessary
abhishek9686 Dec 14, 2023
19a5a9d
add node cfg mutex
abhishek9686 Dec 15, 2023
74f1491
sever cfg mutex lock
abhishek9686 Dec 18, 2023
c11e12c
netclient cfg mutex lock
abhishek9686 Dec 18, 2023
f37008a
replace peers when required
abhishek9686 Dec 18, 2023
6ce79ca
update mq password on pull
abhishek9686 Dec 19, 2023
17b5c82
set host creds when broker is emqx
abhishek9686 Dec 19, 2023
e44b39f
set host creds for mq connection when emqx
abhishek9686 Dec 20, 2023
63ffd93
Merge branch 'develop' of https://github.com/gravitl/netclient into N…
abhishek9686 Dec 20, 2023
07b0164
resolve merge conflicts
abhishek9686 Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var pullCmd = &cobra.Command{
Short: "get the latest host configuration",
Long: `get the latest host configuration and peers from all connected servers`,
Run: func(cmd *cobra.Command, args []string) {
_, err := functions.Pull(true)
_, _, _, err := functions.Pull(true)
if err != nil {
logger.Log(0, "failed to pull", err.Error())
}
Expand Down
24 changes: 21 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -57,7 +58,8 @@ func (i InitType) String() string {
}

var (
netclient Config // netclient contains the netclient config
netclientCfgMutex = &sync.RWMutex{}
netclient Config // netclient contains the netclient config
// Version - default version string
Version = "dev"
// FwClose - firewall manager shutdown func
Expand Down Expand Up @@ -85,10 +87,14 @@ func init() {
Nodes = make(map[string]Node)
}

// UpdateNetcllient updates the in memory version of the host configuration
// UpdateNetclient updates the in memory version of the host configuration
func UpdateNetclient(c Config) {
netclientCfgMutex.Lock()
defer netclientCfgMutex.Unlock()
if c.Verbosity != logger.Verbosity {
logger.Log(3, "Logging verbosity updated to", strconv.Itoa(logger.Verbosity))
}
logger.Verbosity = c.Verbosity
logger.Log(3, "Logging verbosity updated to", strconv.Itoa(logger.Verbosity))
netclient = c
}

Expand Down Expand Up @@ -130,21 +136,28 @@ func UpdateHost(host *models.Host) (resetInterface, restart, sendHostUpdate bool

// Netclient returns a pointer to the im memory version of the host configuration
func Netclient() *Config {
netclientCfgMutex.RLock()
defer netclientCfgMutex.RUnlock()
return &netclient
}

// UpdateHostPeers - updates host peer map in the netclient config
func UpdateHostPeers(peers []wgtypes.PeerConfig) {
netclientCfgMutex.Lock()
defer netclientCfgMutex.Unlock()
netclient.HostPeers = peers
}

// DeleteServerHostPeerCfg - deletes the host peers for the server
func DeleteServerHostPeerCfg() {
netclientCfgMutex.Lock()
defer netclientCfgMutex.Unlock()
netclient.HostPeers = []wgtypes.PeerConfig{}
}

// RemoveServerHostPeerCfg - sets remove flag for all peers on the given server peers
func RemoveServerHostPeerCfg() {
netclient := Netclient()
if netclient.HostPeers == nil {
netclient.HostPeers = []wgtypes.PeerConfig{}
return
Expand All @@ -156,6 +169,7 @@ func RemoveServerHostPeerCfg() {
peers[i] = peer
}
netclient.HostPeers = peers
UpdateNetclient(*netclient)
_ = WriteNetclientConfig()
}

Expand All @@ -177,6 +191,8 @@ func ReadNetclientConfig() (*Config, error) {
return nil, err
}
defer f.Close()
netclientCfgMutex.Lock()
defer netclientCfgMutex.Unlock()
netclient = Config{}
if err := yaml.NewDecoder(f).Decode(&netclient); err != nil {
return nil, err
Expand Down Expand Up @@ -209,6 +225,8 @@ func WriteNetclientConfig() error {
return err
}
defer f.Close()
netclientCfgMutex.Lock()
defer netclientCfgMutex.Unlock()
err = yaml.NewEncoder(f).Encode(netclient)
if err != nil {
return err
Expand Down
21 changes: 18 additions & 3 deletions config/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"net"
"os"
"path/filepath"
"sync"

"github.com/gravitl/netclient/ncutils"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models"
"gopkg.in/yaml.v3"
)

var nodeMutex = &sync.RWMutex{}

// NodeMap is an in memory map of the all nodes indexed by network name
type NodeMap map[string]Node

Expand Down Expand Up @@ -41,9 +44,9 @@ func ReadNodeConfig() error {
return err
}
defer f.Close()
for k := range Nodes {
delete(Nodes, k)
}
nodeMutex.Lock()
defer nodeMutex.Unlock()
Nodes = make(NodeMap)
if err := yaml.NewDecoder(f).Decode(&Nodes); err != nil {
return err
}
Expand All @@ -52,11 +55,15 @@ func ReadNodeConfig() error {

// GetNodes returns a copy of the NodeMap
func GetNodes() NodeMap {
nodeMutex.RLock()
defer nodeMutex.RUnlock()
return Nodes
}

// GetNode returns returns the node configuation of the specified network name
func GetNode(k string) Node {
nodeMutex.RLock()
defer nodeMutex.RUnlock()
if node, ok := Nodes[k]; ok {
return node
}
Expand All @@ -65,6 +72,8 @@ func GetNode(k string) Node {

// SetNodes - sets server nodes in client config
func SetNodes(nodes []models.Node) {
nodeMutex.Lock()
defer nodeMutex.Unlock()
Nodes = make(NodeMap)
for _, node := range nodes {
Nodes[node.Network] = Node{
Expand All @@ -75,11 +84,15 @@ func SetNodes(nodes []models.Node) {

// UpdateNodeMap updates the in memory nodemap for the specified network
func UpdateNodeMap(k string, value Node) {
nodeMutex.Lock()
defer nodeMutex.Unlock()
Nodes[k] = value
}

// DeleteNode deletes the node from the nodemap for the specified network
func DeleteNode(k string) {
nodeMutex.Lock()
defer nodeMutex.Unlock()
delete(Nodes, k)
}

Expand Down Expand Up @@ -116,6 +129,8 @@ func WriteNodeConfig() error {
return err
}
defer f.Close()
nodeMutex.RLock()
defer nodeMutex.RUnlock()
err = yaml.NewEncoder(f).Encode(Nodes)
if err != nil {
return err
Expand Down
24 changes: 20 additions & 4 deletions config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/google/uuid"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models"
"gopkg.in/yaml.v3"
)

var serverMutex = &sync.RWMutex{}

var serverCtxFile = ".serverctx"

// CurrServer - holds the value of current server of client
Expand Down Expand Up @@ -65,9 +68,9 @@ func ReadServerConf() error {
return err
}
defer f.Close()
for k := range Servers {
delete(Servers, k)
}
serverMutex.Lock()
defer serverMutex.Unlock()
Servers = make(map[string]Server)
if err := yaml.NewDecoder(f).Decode(&Servers); err != nil {
return err
}
Expand Down Expand Up @@ -99,6 +102,8 @@ func WriteServerConfig() error {
return err
}
defer f.Close()
serverMutex.Lock()
defer serverMutex.Unlock()
err = yaml.NewEncoder(f).Encode(Servers)
if err != nil {
return err
Expand All @@ -108,17 +113,23 @@ func WriteServerConfig() error {

// SaveServer updates the server map with current server struct and writes map to disk
func SaveServer(name string, server Server) error {
serverMutex.Lock()
Servers[name] = server
serverMutex.Unlock()
return WriteServerConfig()
}

// UpdateServer updates the in-memory server map
func UpdateServer(name string, server Server) {
serverMutex.Lock()
defer serverMutex.Unlock()
Servers[name] = server
}

// GetServer returns the server struct for the given server name
func GetServer(name string) *Server {
serverMutex.RLock()
defer serverMutex.RUnlock()
if server, ok := Servers[name]; ok {
return &server
}
Expand All @@ -127,6 +138,8 @@ func GetServer(name string) *Server {

// GetServers - gets all the server names host has registered to.
func GetServers() (servers []string) {
serverMutex.RLock()
defer serverMutex.RUnlock()
for _, server := range Servers {
servers = append(servers, server.Name)
}
Expand Down Expand Up @@ -173,6 +186,8 @@ func SetServerCtx() {

// DeleteServer deletes the specified server name from the server map
func DeleteServer(k string) {
serverMutex.Lock()
defer serverMutex.Unlock()
delete(Servers, k)
}

Expand All @@ -194,6 +209,8 @@ func ConvertServerCfg(cfg *OldNetmakerServerConfig) *Server {

// UpdateServerConfig updates the in memory server map with values provided from netmaker server
func UpdateServerConfig(cfg *models.ServerConfig) {
serverMutex.Lock()
defer serverMutex.Unlock()
if cfg == nil {
return
}
Expand All @@ -205,6 +222,5 @@ func UpdateServerConfig(cfg *models.ServerConfig) {
server.Name = cfg.Server
server.MQID = netclient.ID
server.ServerConfig = *cfg

Servers[cfg.Server] = server
}
27 changes: 17 additions & 10 deletions functions/daemon.go
abhishek9686 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,20 @@ func Daemon() {
}
cancel := startGoRoutines(&wg)
//start httpserver on its own -- doesn't need to restart on reset
httpctx, httpCancel := context.WithCancel(context.Background())
httpWg := sync.WaitGroup{}
httpWg.Add(1)
go HttpServer(httpctx, &httpWg)
ctx0, cancel0 := context.WithCancel(context.Background())
wg0 := sync.WaitGroup{}
wg0.Add(1)
go HttpServer(ctx0, &wg0)

for {
select {
case <-quit:
slog.Info("shutting down netclient daemon")
closeRoutines([]context.CancelFunc{
cancel,
}, &wg)
httpCancel()
httpWg.Wait()
cancel0()
wg0.Wait()
config.FwClose()
slog.Info("shutdown complete")
return
Expand Down Expand Up @@ -151,7 +152,7 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
}
}
slog.Info("configuring netmaker wireguard interface")
pullresp, pullErr := Pull(false)
pullresp, _, _, pullErr := Pull(false)
if pullErr != nil {
slog.Error("fail to pull config from server", "error", pullErr.Error())
}
Expand Down Expand Up @@ -181,6 +182,8 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
wg.Add(1)
go watchPeerConnections(ctx, wg)
}
wg.Add(1)
go mqFallback(ctx, wg)

return cancel
}
Expand Down Expand Up @@ -208,8 +211,13 @@ func messageQueue(ctx context.Context, wg *sync.WaitGroup, server *config.Server
func setupMQTT(server *config.Server) error {
opts := mqtt.NewClientOptions()
opts.AddBroker(server.Broker)
opts.SetUsername(server.MQUserName)
opts.SetPassword(server.MQPassword)
if server.BrokerType == "emqx" {
opts.SetUsername(config.Netclient().ID.String())
opts.SetPassword(config.Netclient().HostPass)
} else {
opts.SetUsername(server.MQUserName)
opts.SetPassword(server.MQPassword)
}
//opts.SetClientID(ncutils.MakeRandomString(23))
opts.SetClientID(server.MQID.String())
opts.SetAutoReconnect(true)
Expand All @@ -231,7 +239,6 @@ func setupMQTT(server *config.Server) error {
opts.SetResumeSubs(true)
opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
slog.Warn("detected broker connection lost for", "server", server.Broker)

// restart daemon for new udp hole punch if MQTT connection is lost (can happen on network change)
if !config.Netclient().IsStatic {
daemon.Restart()
Expand Down
2 changes: 1 addition & 1 deletion functions/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func uninstall(c *gin.Context) {

func pull(c *gin.Context) {
net := c.Params.ByName("net")
_, err := Pull(true)
_, _, _, err := Pull(true)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
}
Expand Down
Loading
Loading