Skip to content

Commit

Permalink
sidecar: fix sidecar crashing with lstat errors during a reload from …
Browse files Browse the repository at this point in the history
…a configmap update

During a reload it's possible that files are added/removed while
we walk the directory and we try to lstat a file that doesn't exist.

This is especially common if running in k8s with the configuration mounted
as a configmap, and the configmap contents change.

With this change we will retry for a short while if we hit any
filesystem errors during the reload while trying to calculate the hash.

Signed-off-by: Ergin Babani <[email protected]>
  • Loading branch information
Ergin Babani committed Jul 31, 2020
1 parent ee52915 commit 464fb83
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.
- [#2936](https://github.com/thanos-io/thanos/pull/2936) Compact: Fix ReplicaLabelRemover panic when replicaLabels are not specified.
- [#2956](https://github.com/thanos-io/thanos/pull/2956) Store: Fix fetching of chunks bigger than 16000 bytes.
- [#2952](https://github.com/thanos-io/thanos/pull/2952) Sidecar: Fix sidecar crashing with lstat errors during a reload from a configmap update

### Added

Expand Down
87 changes: 53 additions & 34 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ func (r *Reloader) Watch(ctx context.Context) error {
}

if err := r.apply(ctx); err != nil {
// Critical error.
// TODO(bwplotka): There is no need to get process down in this case and decrease availability, handle the error in different way.
return err
}
}
Expand Down Expand Up @@ -277,40 +275,18 @@ func (r *Reloader) apply(ctx context.Context) error {
}
}

h := sha256.New()
for _, ruleDir := range r.ruleDirs {
walkDir, err := filepath.EvalSymlinks(ruleDir)
if err != nil {
return errors.Wrap(err, "ruleDir symlink eval")
}
err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}

// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return err
}

if targetFile.IsDir() {
return nil
}

if err := hashFile(h, path); err != nil {
return err
}
return nil
})
// Retry calculating the rule hash for a short while. This can hit errors due symlinks changing,
// especially if reading the config from a k8s volume mounted configmap
hashCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
if err := runutil.RetryWithLog(r.logger, 1*time.Second, hashCtx.Done(), func() error {
var err error
ruleHash, err = hashRules(r.ruleDirs)
if err != nil {
return errors.Wrap(err, "build hash")
return err
}
}
if len(r.ruleDirs) > 0 {
ruleHash = h.Sum(nil)
return nil
}); err != nil {
return err
}

if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastRuleHash, ruleHash) {
Expand Down Expand Up @@ -344,6 +320,49 @@ func (r *Reloader) apply(ctx context.Context) error {
return nil
}

func hashRules(ruleDirs []string) ([]byte, error) {
ruleHash := []byte{}
h := sha256.New()

for _, ruleDir := range ruleDirs {
walkDir, err := filepath.EvalSymlinks(ruleDir)
if err != nil {
return nil, errors.Wrap(err, "ruleDir symlink eval")
}
err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}

// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return errors.Wrap(err, "stat")
}

if targetFile.IsDir() {
return nil
}

if err := hashFile(h, path); err != nil {
return errors.Wrap(err, "hashfile")
}
return nil
})
if err != nil {
return nil, err
}
}

if len(ruleDirs) > 0 {
ruleHash = h.Sum(nil)
}

return ruleHash, nil
}

func hashFile(h hash.Hash, fn string) error {
f, err := os.Open(fn)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,124 @@ func TestReloader_RuleApply(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 5, reloads.Load().(int))
}

func TestReloader_SymlinkWatch(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)

reloads := &atomic.Value{}
reloads.Store(0)
srv := &http.Server{}
srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) {
reloads.Store(reloads.Load().(int) + 1) // The only writer.
resp.WriteHeader(http.StatusOK)
})
go func() {
_ = srv.Serve(l)
}()
defer func() { testutil.Ok(t, srv.Close()) }()

reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String()))
testutil.Ok(t, err)

dir, err := ioutil.TempDir("", "reloader-watch-dir")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

// The following setup is similar to the one when k8s projects a configmap contents as a volume
// Directory setup ( -> = symlink):
// in/config.yaml -> in/data/config.yaml
// in/rules.yaml -> in/data/rules.yaml
// data -> first-config
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm))
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm))

// Setup first config
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in", "first-config"), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "first-config", "cfg.yaml"), []byte(`
config:
a: a
`), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "first-config", "rule.yaml"), []byte(`rule1`), os.ModePerm))
// Setup data
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "first-config"), path.Join(dir, "in", "data")))

// Setup config symlinks to data
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "data", "cfg.yaml"), path.Join(dir, "in", "cfg.yaml")))
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "data", "rule.yaml"), path.Join(dir, "in", "rule.yaml")))

reloader := New(nil, nil, reloadURL, path.Join(dir, "in", "cfg.yaml"), path.Join(dir, "out", "cfg.yaml"), []string{dir, path.Join(dir, "in")})

reloader.watchInterval = 100 * time.Millisecond
reloader.retryInterval = 100 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
g := sync.WaitGroup{}
g.Add(1)
go func() {
defer g.Done()
defer cancel()

reloadsSeen := 0
init := false
for {
select {
case <-ctx.Done():
return
case <-time.After(300 * time.Millisecond):
}

rel := reloads.Load().(int)
if init && rel <= reloadsSeen {
continue
}
init = true

reloadsSeen = rel

t.Log("Performing step number", rel)

switch rel {
case 1:
// The following steps try to reproduce the file change behavior when k8s projects a configmap as a volume
// and the configmap changes. https://github.com/kubernetes/kubernetes/blob/release-1.19/pkg/volume/util/atomic_writer.go#L87-L120
// Setup second config
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in", "second-config"), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "second-config", "cfg.yaml"), []byte(`
config:
b: b
`), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "second-config", "rule.yaml"), []byte(`rule2`), os.ModePerm))

// Create temp data dir
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "second-config"), path.Join(dir, "in", "tmp_data")))

// Rename data
testutil.Ok(t, os.Rename(path.Join(dir, "in", "tmp_data"), path.Join(dir, "in", "data")))

// Remove first config
testutil.Ok(t, os.RemoveAll(path.Join(dir, "in", "first-config", "rule.yaml")))
testutil.Ok(t, os.RemoveAll(path.Join(dir, "in", "first-config", "cfg.yaml")))
testutil.Ok(t, os.RemoveAll(path.Join(dir, "in", "first-config")))
}

if rel > 1 {
// All good.
return
}
}
}()
err = reloader.Watch(ctx)
cancel()
g.Wait()

testutil.Ok(t, err)
output, err := ioutil.ReadFile(path.Join(dir, "out", "cfg.yaml"))
testutil.Ok(t, err)
testutil.Equals(t, output, []byte(`
config:
b: b
`))
}

0 comments on commit 464fb83

Please sign in to comment.