Skip to content

Commit

Permalink
reloader: Force trigger reload when config rollbacked
Browse files Browse the repository at this point in the history
Signed-off-by: heylongdacoder <[email protected]>
  • Loading branch information
heylongdacoder committed May 2, 2022
1 parent b452ce5 commit f144a6d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed
- [#5281](https://github.com/thanos-io/thanos/pull/5281) Blocks: Use correct separators for filesystem paths and object storage paths respectively.
- [#5300](https://github.com/thanos-io/thanos/pull/5300) Query: Ignore cache on queries with deduplication off.
- [#5324](https://github.com/thanos-io/thanos/pull/5324) Reloader: Force trigger reload when config rollbacked

### Added

Expand Down
5 changes: 4 additions & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Reloader struct {

lastCfgHash []byte
lastWatchedDirsHash []byte
forceReload bool

reloads prometheus.Counter
reloadErrors prometheus.Counter
Expand Down Expand Up @@ -352,7 +353,7 @@ func (r *Reloader) apply(ctx context.Context) error {
watchedDirsHash = h.Sum(nil)
}

if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
// Nothing to do.
return nil
}
Expand All @@ -363,11 +364,13 @@ func (r *Reloader) apply(ctx context.Context) error {
}
r.reloads.Inc()
if err := r.triggerReload(ctx); err != nil {
r.forceReload = true
r.reloadErrors.Inc()
r.lastReloadSuccess.Set(0)
return errors.Wrap(err, "trigger reload")
}

r.forceReload = false
r.lastCfgHash = cfgHash
r.lastWatchedDirsHash = watchedDirsHash
level.Info(r.logger).Log(
Expand Down
128 changes: 128 additions & 0 deletions pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,134 @@ config:
testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2"))
}

func TestReloader_ConfigRollback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)

correctConfig := []byte(`
config:
a: 1
`)
faultyConfig := []byte(`
faulty_config:
a: 1
`)

dir, err := ioutil.TempDir("", "reloader-cfg-test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm))
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm))

var (
input = filepath.Join(dir, "in", "cfg.yaml.tmpl")
output = filepath.Join(dir, "out", "cfg.yaml")
)

reloads := &atomic.Value{}
reloads.Store(0)
srv := &http.Server{}

seen := map[int]bool{}

srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) {
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)

fin, err := ioutil.ReadFile(input)
testutil.Ok(t, err)

rel := reloads.Load().(int)

if _, ok := seen[rel]; !ok {
t.Logf("reloadsSeen: %d\n", rel)
t.Logf("input: %s\n", string(fin))
t.Logf("output: %s\n", string(f))
// test
seen[rel] = true
}

if string(f) == string(faultyConfig) {
resp.WriteHeader(http.StatusServiceUnavailable)
return
}

reloads.Store(reloads.Load().(int) + 1) // The only writer.
resp.WriteHeader(http.StatusOK)
})
go func() { _ = srv.Serve(l) }()
defer func() { testutil.Ok(t, srv.Close()) }()

reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String()))
testutil.Ok(t, err)

reloader := New(nil, nil, &Options{
ReloadURL: reloadURL,
CfgFile: input,
CfgOutputFile: output,
WatchedDirs: nil,
WatchInterval: 10 * time.Second, // 10 seconds to make the reload of faulty config fail quick
RetryInterval: 100 * time.Millisecond,
DelayInterval: 1 * time.Millisecond,
})

testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm))

rctx, cancel2 := context.WithCancel(ctx)
g := sync.WaitGroup{}
g.Add(1)
go func() {
defer g.Done()
testutil.Ok(t, reloader.Watch(rctx))
}()

reloadsSeen := 0
i := 0

for {
select {
case <-ctx.Done():
t.Fatalf("Timeout with i = %d, reloadsSeen = %d", i, reloadsSeen)
case <-time.After(300 * time.Millisecond):
}

rel := reloads.Load().(int)
reloadsSeen = rel

if reloadsSeen == 1 && i == 0 {
// Initial apply seen (without doing nothing).
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(correctConfig), string(f))

// Change to a faulty config
testutil.Ok(t, ioutil.WriteFile(input, faultyConfig, os.ModePerm))
i++
} else if reloadsSeen == 1 && i == 1 {
// Faulty config will trigger a reload, but reload failed
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(faultyConfig), string(f))

// Rollback config
testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm))
} else if reloadsSeen == 2 {
// Rollback to previous config should trigger a reload
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(correctConfig), string(f))

break
}
}
cancel2()
g.Wait()
}

func TestReloader_DirectoriesApply(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)
Expand Down

0 comments on commit f144a6d

Please sign in to comment.