From 26e4187eb43020f437b7373de0a2d31f1656eec3 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Wed, 20 Nov 2024 09:54:51 +0100 Subject: [PATCH] [-] rewrite `IPManager` with channel logic, fixes #282 (#283) --- checker/etcd_leader_checker.go | 10 ++-- ipmanager/basicConfigurer_linux.go | 2 +- ipmanager/basicConfigurer_windows.go | 6 +- ipmanager/ip_manager.go | 85 ++++++++++------------------ 4 files changed, 38 insertions(+), 65 deletions(-) diff --git a/checker/etcd_leader_checker.go b/checker/etcd_leader_checker.go index 24f8bc2..9155db9 100644 --- a/checker/etcd_leader_checker.go +++ b/checker/etcd_leader_checker.go @@ -74,20 +74,20 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) { func (elc *EtcdLeaderChecker) get(ctx context.Context, out chan<- bool) { resp, err := elc.Get(ctx, elc.TriggerKey) if err != nil { - elc.Logger.Error("etcd error:", zap.Error(err)) + elc.Logger.Error("Failed to get etcd value:", zap.Error(err)) out <- false return } for _, kv := range resp.Kvs { - elc.Logger.Sugar().Info("current leader from DCS:", string(kv.Value)) + elc.Logger.Sugar().Info("Current leader from DCS:", string(kv.Value)) out <- string(kv.Value) == elc.TriggerValue } } // watch monitors the leader change from etcd func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error { + elc.Logger.Sugar().Info("Setting WATCH on ", elc.TriggerKey) watchChan := elc.Watch(ctx, elc.TriggerKey) - elc.Logger.Sugar().Info("set WATCH on ", elc.TriggerKey) for { select { case <-ctx.Done(): @@ -95,7 +95,7 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error case watchResp := <-watchChan: if watchResp.Canceled { watchChan = elc.Watch(ctx, elc.TriggerKey) - elc.Logger.Sugar().Info("reset cancelled WATCH on ", elc.TriggerKey) + elc.Logger.Sugar().Info("Resetting cancelled WATCH on ", elc.TriggerKey) continue } if err := watchResp.Err(); err != nil { @@ -104,7 +104,7 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error } for _, event := range watchResp.Events { out <- string(event.Kv.Value) == elc.TriggerValue - elc.Logger.Sugar().Info("current leader from DCS:", string(event.Kv.Value)) + elc.Logger.Sugar().Info("Current leader from DCS: ", string(event.Kv.Value)) } } } diff --git a/ipmanager/basicConfigurer_linux.go b/ipmanager/basicConfigurer_linux.go index b65ddd1..342320b 100644 --- a/ipmanager/basicConfigurer_linux.go +++ b/ipmanager/basicConfigurer_linux.go @@ -151,7 +151,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error { time.Sleep(time.Duration(c.RetryAfter) * time.Millisecond) } if err != nil { - log.Error("too many retries", err) + log.Error("Too many retries", err) return err } diff --git a/ipmanager/basicConfigurer_windows.go b/ipmanager/basicConfigurer_windows.go index 8e0826f..fb6bd6f 100644 --- a/ipmanager/basicConfigurer_windows.go +++ b/ipmanager/basicConfigurer_windows.go @@ -17,12 +17,12 @@ func (c *BasicConfigurer) configureAddress() bool { ) iface, err := net.InterfaceByName(c.Iface.Name) if err != nil { - log.Infof("Got error: %v", err) + log.Error("Failed to access interface: ", err) return false } err = iphlpapi.AddIPAddress(ip, mask, uint32(iface.Index), &c.ntecontext, &nteinstance) if err != nil { - log.Infof("Got error: %v", err) + log.Error("Failed to add address: ", err) return false } // For now it is save to say that also working even if a @@ -37,7 +37,7 @@ func (c *BasicConfigurer) deconfigureAddress() bool { log.Infof("Removing address %s on %s", c.getCIDR(), c.Iface.Name) err := iphlpapi.DeleteIPAddress(c.ntecontext) if err != nil { - log.Error(err) + log.Errorf("Failed to remove address %s: %v", c.getCIDR(), err) return false } return true diff --git a/ipmanager/ip_manager.go b/ipmanager/ip_manager.go index ea0ff47..66f6242 100644 --- a/ipmanager/ip_manager.go +++ b/ipmanager/ip_manager.go @@ -4,7 +4,7 @@ import ( "context" "net" "net/netip" - "sync" + "sync/atomic" "time" "github.com/cybertec-postgresql/vip-manager/vipconfig" @@ -25,10 +25,9 @@ var log *zap.SugaredLogger = zap.L().Sugar() type IPManager struct { configurer ipConfigurer - states <-chan bool - currentState bool - stateLock sync.Mutex - recheck *sync.Cond + states <-chan bool + shouldSetIPUp atomic.Bool + recheckChan chan struct{} } func getMask(vip netip.Addr, mask int) net.IPMask { @@ -69,7 +68,7 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err states: states, } log = conf.Logger.Sugar() - m.recheck = sync.NewCond(&m.stateLock) + m.recheckChan = make(chan struct{}) switch conf.HostingType { case "hetzner": m.configurer, err = newHetznerConfigurer(ipConf, conf.Verbose) @@ -86,71 +85,45 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err func (m *IPManager) applyLoop(ctx context.Context) { strUpDown := map[bool]string{true: "up", false: "down"} - timeout := 0 for { - // Check if we should exit + isIPUp := m.configurer.queryAddress() + shouldSetIPUp := m.shouldSetIPUp.Load() + log.Infof("IP address %s is %s, must be %s", + m.configurer.getCIDR(), + strUpDown[isIPUp], + strUpDown[shouldSetIPUp]) + if isIPUp != shouldSetIPUp { + var isOk bool + if shouldSetIPUp { + isOk = m.configurer.configureAddress() + } else { + isOk = m.configurer.deconfigureAddress() + } + if !isOk { + log.Error("Failed to configure virtual ip for this machine") + } + } select { case <-ctx.Done(): - m.configurer.deconfigureAddress() return - case <-time.After(time.Duration(timeout) * time.Second): - actualState := m.configurer.queryAddress() - m.stateLock.Lock() - desiredState := m.currentState - log.Infof("IP address %s state is %s, must be %s", - m.configurer.getCIDR(), - strUpDown[actualState], - strUpDown[desiredState]) - if actualState != desiredState { - m.stateLock.Unlock() - var configureState bool - if desiredState { - configureState = m.configurer.configureAddress() - } else { - configureState = m.configurer.deconfigureAddress() - } - if !configureState { - log.Error("Error while acquiring virtual ip for this machine") - //Sleep a little bit to avoid busy waiting due to the for loop. - timeout = 10 - } else { - timeout = 0 - } - } else { - // Wait for notification - m.recheck.Wait() - // Want to query actual state anyway, so unlock - m.stateLock.Unlock() - } + case <-m.recheckChan: // signal to recheck + case <-time.After(time.Duration(10) * time.Second): // recheck every 10 seconds } } } // SyncStates implements states synchronization func (m *IPManager) SyncStates(ctx context.Context, states <-chan bool) { - ticker := time.NewTicker(10 * time.Second) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - m.applyLoop(ctx) - wg.Done() - }() - + go m.applyLoop(ctx) for { select { case newState := <-states: - m.stateLock.Lock() - if m.currentState != newState { - m.currentState = newState - m.recheck.Broadcast() + if m.shouldSetIPUp.Load() != newState { + m.shouldSetIPUp.Store(newState) + m.recheckChan <- struct{}{} } - m.stateLock.Unlock() - case <-ticker.C: - m.recheck.Broadcast() case <-ctx.Done(): - m.recheck.Broadcast() - wg.Wait() + m.configurer.deconfigureAddress() m.configurer.cleanupArp() return }