Skip to content

Commit

Permalink
[-] rewrite IPManager with channel logic, fixes #282 (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub authored Nov 20, 2024
1 parent bf9fece commit 26e4187
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 65 deletions.
10 changes: 5 additions & 5 deletions checker/etcd_leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,28 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {
func (elc *EtcdLeaderChecker) get(ctx context.Context, out chan<- bool) {
resp, err := elc.Get(ctx, elc.TriggerKey)
if err != nil {
elc.Logger.Error("etcd error:", zap.Error(err))
elc.Logger.Error("Failed to get etcd value:", zap.Error(err))
out <- false
return
}
for _, kv := range resp.Kvs {
elc.Logger.Sugar().Info("current leader from DCS:", string(kv.Value))
elc.Logger.Sugar().Info("Current leader from DCS:", string(kv.Value))
out <- string(kv.Value) == elc.TriggerValue
}
}

// watch monitors the leader change from etcd
func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error {
elc.Logger.Sugar().Info("Setting WATCH on ", elc.TriggerKey)
watchChan := elc.Watch(ctx, elc.TriggerKey)
elc.Logger.Sugar().Info("set WATCH on ", elc.TriggerKey)
for {
select {
case <-ctx.Done():
return ctx.Err()
case watchResp := <-watchChan:
if watchResp.Canceled {
watchChan = elc.Watch(ctx, elc.TriggerKey)
elc.Logger.Sugar().Info("reset cancelled WATCH on ", elc.TriggerKey)
elc.Logger.Sugar().Info("Resetting cancelled WATCH on ", elc.TriggerKey)
continue
}
if err := watchResp.Err(); err != nil {
Expand All @@ -104,7 +104,7 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error
}
for _, event := range watchResp.Events {
out <- string(event.Kv.Value) == elc.TriggerValue
elc.Logger.Sugar().Info("current leader from DCS:", string(event.Kv.Value))
elc.Logger.Sugar().Info("Current leader from DCS: ", string(event.Kv.Value))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ipmanager/basicConfigurer_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
time.Sleep(time.Duration(c.RetryAfter) * time.Millisecond)
}
if err != nil {
log.Error("too many retries", err)
log.Error("Too many retries", err)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions ipmanager/basicConfigurer_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func (c *BasicConfigurer) configureAddress() bool {
)
iface, err := net.InterfaceByName(c.Iface.Name)
if err != nil {
log.Infof("Got error: %v", err)
log.Error("Failed to access interface: ", err)
return false
}
err = iphlpapi.AddIPAddress(ip, mask, uint32(iface.Index), &c.ntecontext, &nteinstance)
if err != nil {
log.Infof("Got error: %v", err)
log.Error("Failed to add address: ", err)
return false
}
// For now it is save to say that also working even if a
Expand All @@ -37,7 +37,7 @@ func (c *BasicConfigurer) deconfigureAddress() bool {
log.Infof("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
err := iphlpapi.DeleteIPAddress(c.ntecontext)
if err != nil {
log.Error(err)
log.Errorf("Failed to remove address %s: %v", c.getCIDR(), err)
return false
}
return true
Expand Down
85 changes: 29 additions & 56 deletions ipmanager/ip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"net"
"net/netip"
"sync"
"sync/atomic"
"time"

"github.com/cybertec-postgresql/vip-manager/vipconfig"
Expand All @@ -25,10 +25,9 @@ var log *zap.SugaredLogger = zap.L().Sugar()
type IPManager struct {
configurer ipConfigurer

states <-chan bool
currentState bool
stateLock sync.Mutex
recheck *sync.Cond
states <-chan bool
shouldSetIPUp atomic.Bool
recheckChan chan struct{}
}

func getMask(vip netip.Addr, mask int) net.IPMask {
Expand Down Expand Up @@ -69,7 +68,7 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err
states: states,
}
log = conf.Logger.Sugar()
m.recheck = sync.NewCond(&m.stateLock)
m.recheckChan = make(chan struct{})
switch conf.HostingType {
case "hetzner":
m.configurer, err = newHetznerConfigurer(ipConf, conf.Verbose)
Expand All @@ -86,71 +85,45 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err

func (m *IPManager) applyLoop(ctx context.Context) {
strUpDown := map[bool]string{true: "up", false: "down"}
timeout := 0
for {
// Check if we should exit
isIPUp := m.configurer.queryAddress()
shouldSetIPUp := m.shouldSetIPUp.Load()
log.Infof("IP address %s is %s, must be %s",
m.configurer.getCIDR(),
strUpDown[isIPUp],
strUpDown[shouldSetIPUp])
if isIPUp != shouldSetIPUp {
var isOk bool
if shouldSetIPUp {
isOk = m.configurer.configureAddress()
} else {
isOk = m.configurer.deconfigureAddress()
}
if !isOk {
log.Error("Failed to configure virtual ip for this machine")
}
}
select {
case <-ctx.Done():
m.configurer.deconfigureAddress()
return
case <-time.After(time.Duration(timeout) * time.Second):
actualState := m.configurer.queryAddress()
m.stateLock.Lock()
desiredState := m.currentState
log.Infof("IP address %s state is %s, must be %s",
m.configurer.getCIDR(),
strUpDown[actualState],
strUpDown[desiredState])
if actualState != desiredState {
m.stateLock.Unlock()
var configureState bool
if desiredState {
configureState = m.configurer.configureAddress()
} else {
configureState = m.configurer.deconfigureAddress()
}
if !configureState {
log.Error("Error while acquiring virtual ip for this machine")
//Sleep a little bit to avoid busy waiting due to the for loop.
timeout = 10
} else {
timeout = 0
}
} else {
// Wait for notification
m.recheck.Wait()
// Want to query actual state anyway, so unlock
m.stateLock.Unlock()
}
case <-m.recheckChan: // signal to recheck
case <-time.After(time.Duration(10) * time.Second): // recheck every 10 seconds
}
}
}

// SyncStates implements states synchronization
func (m *IPManager) SyncStates(ctx context.Context, states <-chan bool) {
ticker := time.NewTicker(10 * time.Second)

var wg sync.WaitGroup
wg.Add(1)
go func() {
m.applyLoop(ctx)
wg.Done()
}()

go m.applyLoop(ctx)
for {
select {
case newState := <-states:
m.stateLock.Lock()
if m.currentState != newState {
m.currentState = newState
m.recheck.Broadcast()
if m.shouldSetIPUp.Load() != newState {
m.shouldSetIPUp.Store(newState)
m.recheckChan <- struct{}{}
}
m.stateLock.Unlock()
case <-ticker.C:
m.recheck.Broadcast()
case <-ctx.Done():
m.recheck.Broadcast()
wg.Wait()
m.configurer.deconfigureAddress()
m.configurer.cleanupArp()
return
}
Expand Down

0 comments on commit 26e4187

Please sign in to comment.