Skip to content

Commit

Permalink
reloader: Force trigger reload when config rollbacked
Browse files Browse the repository at this point in the history
Signed-off-by: heylongdacoder <[email protected]>
  • Loading branch information
heylongdacoder committed May 9, 2022
1 parent b452ce5 commit c11de77
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed
- [#5281](https://github.com/thanos-io/thanos/pull/5281) Blocks: Use correct separators for filesystem paths and object storage paths respectively.
- [#5300](https://github.com/thanos-io/thanos/pull/5300) Query: Ignore cache on queries with deduplication off.
- [#5324](https://github.com/thanos-io/thanos/pull/5324) Reloader: Force trigger reload when config rollbacked

### Added

Expand Down
5 changes: 4 additions & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Reloader struct {

lastCfgHash []byte
lastWatchedDirsHash []byte
forceReload bool

reloads prometheus.Counter
reloadErrors prometheus.Counter
Expand Down Expand Up @@ -352,7 +353,7 @@ func (r *Reloader) apply(ctx context.Context) error {
watchedDirsHash = h.Sum(nil)
}

if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
// Nothing to do.
return nil
}
Expand All @@ -368,6 +369,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return errors.Wrap(err, "trigger reload")
}

r.forceReload = false
r.lastCfgHash = cfgHash
r.lastWatchedDirsHash = watchedDirsHash
level.Info(r.logger).Log(
Expand All @@ -379,6 +381,7 @@ func (r *Reloader) apply(ctx context.Context) error {
r.lastReloadSuccessTimestamp.SetToCurrentTime()
return nil
}); err != nil {
r.forceReload = true
level.Error(r.logger).Log("msg", "Failed to trigger reload. Retrying.", "err", err)
}

Expand Down
113 changes: 113 additions & 0 deletions pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,119 @@ config:
testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2"))
}

func TestReloader_ConfigRollback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)

correctConfig := []byte(`
config:
a: 1
`)
faultyConfig := []byte(`
faulty_config:
a: 1
`)

dir, err := ioutil.TempDir("", "reloader-cfg-test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm))
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm))

var (
input = filepath.Join(dir, "in", "cfg.yaml.tmpl")
output = filepath.Join(dir, "out", "cfg.yaml")
)

reloads := &atomic.Value{}
reloads.Store(0)
srv := &http.Server{}

srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) {
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)

if string(f) == string(faultyConfig) {
resp.WriteHeader(http.StatusServiceUnavailable)
return
}

reloads.Store(reloads.Load().(int) + 1) // The only writer.
resp.WriteHeader(http.StatusOK)
})
go func() { _ = srv.Serve(l) }()
defer func() { testutil.Ok(t, srv.Close()) }()

reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String()))
testutil.Ok(t, err)

reloader := New(nil, nil, &Options{
ReloadURL: reloadURL,
CfgFile: input,
CfgOutputFile: output,
WatchedDirs: nil,
WatchInterval: 10 * time.Second, // 10 seconds to make the reload of faulty config fail quick
RetryInterval: 100 * time.Millisecond,
DelayInterval: 1 * time.Millisecond,
})

testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm))

rctx, cancel2 := context.WithCancel(ctx)
g := sync.WaitGroup{}
g.Add(1)
go func() {
defer g.Done()
testutil.Ok(t, reloader.Watch(rctx))
}()

reloadsSeen := 0
faulty := false

for {
select {
case <-ctx.Done():
t.Fatalf("Timeout with faulty = %t, reloadsSeen = %d", faulty, reloadsSeen)
case <-time.After(300 * time.Millisecond):
}

rel := reloads.Load().(int)
reloadsSeen = rel

if reloadsSeen == 1 && !faulty {
// Initial apply seen (without doing anything).
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(correctConfig), string(f))

// Change to a faulty config
testutil.Ok(t, ioutil.WriteFile(input, faultyConfig, os.ModePerm))
faulty = true
} else if reloadsSeen == 1 && faulty {
// Faulty config will trigger a reload, but reload failed
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(faultyConfig), string(f))

// Rollback config
testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm))
} else if reloadsSeen >= 2 {
// Rollback to previous config should trigger a reload
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(correctConfig), string(f))

break
}
}
cancel2()
g.Wait()
}

func TestReloader_DirectoriesApply(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)
Expand Down

0 comments on commit c11de77

Please sign in to comment.