Skip to content

Commit

Permalink
fixed a race condition on reload and exit
Browse files Browse the repository at this point in the history
  • Loading branch information
HeavyHorst committed Oct 20, 2016
1 parent 36646d9 commit b09a2ad
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 41 deletions.
48 changes: 30 additions & 18 deletions configWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package main

import (
"fmt"
"sync"
"time"

"github.com/HeavyHorst/easyKV"
Expand All @@ -19,19 +18,17 @@ import (

// the configWatcher watches the config file for changes
type configWatcher struct {
waitAndExit *sync.WaitGroup
stopped chan struct{}
stoppedCW chan struct{}
stoppedW chan struct{}
stopWatchConf chan bool
stopWatch chan bool
filePath string
}

func (w *configWatcher) startWatch(c tomlConf) {
w.waitAndExit.Add(1)
go func() {
defer func() {
w.waitAndExit.Done()
w.stopped <- struct{}{}
w.stoppedW <- struct{}{}
}()
c.run(w.stopWatch)
}()
Expand All @@ -55,43 +52,58 @@ func (w *configWatcher) reload() {
log.Error(err)
return
}
// waitAndExit is temporally increased by 1, so that the program don't terminate after stopWatch
w.waitAndExit.Add(1)
// stop the old watcher and wait until it has stopped
w.stopWatch <- true
<-w.stopped
<-w.stoppedW
// start a new watcher
w.startWatch(newConf)
w.waitAndExit.Done()
}

func newConfigWatcher(filepath string, watcher easyKV.ReadWatcher, config tomlConf) *configWatcher {
func newConfigWatcher(filepath string, watcher easyKV.ReadWatcher, config tomlConf, done chan struct{}) *configWatcher {
w := &configWatcher{
waitAndExit: &sync.WaitGroup{},
stopped: make(chan struct{}),
stoppedW: make(chan struct{}),
stoppedCW: make(chan struct{}),
stopWatchConf: make(chan bool),
stopWatch: make(chan bool),
filePath: filepath,
}

w.startWatch(config)

reload := make(chan struct{})
go func() {
// watch the config for changes
for {
select {
case <-w.stopWatchConf:
w.stopped <- struct{}{}
w.stoppedCW <- struct{}{}
return
default:
_, err := watcher.WatchPrefix("", w.stopWatchConf, easyKV.WithKeys([]string{""}))
if err != nil {
log.Error(err)
time.Sleep(2 * time.Second)
if err != easyKV.ErrWatchCanceled {
log.Error(err)
time.Sleep(2 * time.Second)
}
continue
}
time.Sleep(1 * time.Second)
//w.reload()
reload <- struct{}{}
}
}
}()

go func() {
for {
select {
case <-reload:
w.reload()
case <-w.stoppedW:
close(done)
// there is no runnign startWatch function which can answer to the stop method
// we need to send to the w.stoppedW channel so that we not block
w.stoppedW <- struct{}{}
}
}
}()
Expand All @@ -102,6 +114,6 @@ func newConfigWatcher(filepath string, watcher easyKV.ReadWatcher, config tomlCo
func (w *configWatcher) stop() {
close(w.stopWatchConf)
close(w.stopWatch)
<-w.stopped
<-w.stopped
<-w.stoppedW
<-w.stoppedCW
}
23 changes: 3 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,9 @@ var (
)

func init() {
const (
defaultConfig = "/etc/remco/config"
)
const defaultConfig = "/etc/remco/config"
flag.StringVar(&fileConfig.Filepath, "config", defaultConfig, "path to the configuration file")
flag.BoolVar(&printVersionAndExit, "version", false, "print version and exit")

// glog(used in the kubernetes-client) registers some flags that we don't want to show
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options]\n", os.Args[0])
fmt.Fprintln(os.Stderr, "Parameters:")
fmt.Fprintln(os.Stderr, " -config string\n\tpath to the configuration file (default \"/etc/remco/config\")")
fmt.Fprintln(os.Stderr, " -version\n\tprint version and exit")
}
}

func run() {
Expand All @@ -56,16 +46,9 @@ func run() {
log.Fatal(err.Error())
}

cfgWatcher := newConfigWatcher(fileConfig.Filepath, s.ReadWatcher, c)
defer cfgWatcher.stop()

done := make(chan struct{})
go func() {
// If there is no goroutine left - quit
// this is needed for the onetime mode
cfgWatcher.waitAndExit.Wait()
close(done)
}()
cfgWatcher := newConfigWatcher(fileConfig.Filepath, s.ReadWatcher, c, done)
defer cfgWatcher.stop()

for {
select {
Expand Down
8 changes: 5 additions & 3 deletions template/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,12 @@ func (s Backend) watch(stopChan chan bool, processChan chan Backend) {
default:
index, err := s.WatchPrefix(s.Prefix, stopChan, easyKV.WithKeys(keysPrefix), easyKV.WithWaitIndex(lastIndex))
if err != nil {
log.Error(err)
// Prevent backend errors from consuming all resources.
time.Sleep(time.Second * 2)
if err != easyKV.ErrWatchCanceled {
log.Error(err)
time.Sleep(2 * time.Second)
}
continue

}
processChan <- s
lastIndex = index
Expand Down

0 comments on commit b09a2ad

Please sign in to comment.