Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sidecar: fix sidecar crashing with lstat errors during a reload from a configmap update #2952

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2956](https://github.com/thanos-io/thanos/pull/2956) Store: Fix fetching of chunks bigger than 16000 bytes.
- [#2970](https://github.com/thanos-io/thanos/pull/2970) Store: Upgrade minio-go/v7 to fix slowness when running on EKS.
- [#2976](https://github.com/thanos-io/thanos/pull/2976) Query: Better rounding for incoming query timestamps.
- [#2952](https://github.com/thanos-io/thanos/pull/2952) Sidecar: Fix sidecar crashing with lstat errors during a reload from a configmap update

### Added

Expand Down
87 changes: 54 additions & 33 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ func (r *Reloader) Watch(ctx context.Context) error {
}

if err := r.apply(ctx); err != nil {
// Critical error.
// TODO(bwplotka): There is no need to get process down in this case and decrease availability, handle the error in different way.
return err
}
}
Expand Down Expand Up @@ -277,40 +275,20 @@ func (r *Reloader) apply(ctx context.Context) error {
}
}

h := sha256.New()
for _, ruleDir := range r.ruleDirs {
walkDir, err := filepath.EvalSymlinks(ruleDir)
if err != nil {
return errors.Wrap(err, "ruleDir symlink eval")
}
err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}

// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return err
}

if targetFile.IsDir() {
return nil
}
// Retry calculating the rule hash for a short while. This can hit errors due symlinks changing,
// especially if reading the config from a k8s volume mounted configmap
hashCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

if err := hashFile(h, path); err != nil {
return err
}
return nil
})
if err := runutil.RetryWithLog(r.logger, 1*time.Second, hashCtx.Done(), func() error {
var err error
ruleHash, err = hashRules(r.ruleDirs)
if err != nil {
return errors.Wrap(err, "build hash")
return err
}
}
if len(r.ruleDirs) > 0 {
ruleHash = h.Sum(nil)
return nil
}); err != nil {
return err
}

if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastRuleHash, ruleHash) {
Expand Down Expand Up @@ -344,6 +322,49 @@ func (r *Reloader) apply(ctx context.Context) error {
return nil
}

func hashRules(ruleDirs []string) ([]byte, error) {
ruleHash := []byte{}
h := sha256.New()

for _, ruleDir := range ruleDirs {
walkDir, err := filepath.EvalSymlinks(ruleDir)
if err != nil {
return nil, errors.Wrap(err, "ruleDir symlink eval")
}
err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}

// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return errors.Wrap(err, "stat")
}

if targetFile.IsDir() {
return nil
}

if err := hashFile(h, path); err != nil {
return errors.Wrap(err, "hashfile")
}
return nil
})
if err != nil {
return nil, err
}
}

if len(ruleDirs) > 0 {
ruleHash = h.Sum(nil)
}

return ruleHash, nil
}

func hashFile(h hash.Hash, fn string) error {
f, err := os.Open(fn)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,124 @@ func TestReloader_RuleApply(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 5, reloads.Load().(int))
}

func TestReloader_SymlinkWatch(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)

reloads := &atomic.Value{}
reloads.Store(0)
srv := &http.Server{}
srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) {
reloads.Store(reloads.Load().(int) + 1) // The only writer.
resp.WriteHeader(http.StatusOK)
})
go func() {
_ = srv.Serve(l)
}()
defer func() { testutil.Ok(t, srv.Close()) }()

reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String()))
testutil.Ok(t, err)

dir, err := ioutil.TempDir("", "reloader-watch-dir")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

// The following setup is similar to the one when k8s projects a configmap contents as a volume
// Directory setup ( -> = symlink):
// in/config.yaml -> in/data/config.yaml
// in/rules.yaml -> in/data/rules.yaml
// data -> first-config
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm))
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm))

// Setup first config
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in", "first-config"), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "first-config", "cfg.yaml"), []byte(`
config:
a: a
`), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "first-config", "rule.yaml"), []byte(`rule1`), os.ModePerm))
// Setup data
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "first-config"), path.Join(dir, "in", "data")))

// Setup config symlinks to data
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "data", "cfg.yaml"), path.Join(dir, "in", "cfg.yaml")))
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "data", "rule.yaml"), path.Join(dir, "in", "rule.yaml")))

reloader := New(nil, nil, reloadURL, path.Join(dir, "in", "cfg.yaml"), path.Join(dir, "out", "cfg.yaml"), []string{dir, path.Join(dir, "in")})

reloader.watchInterval = 100 * time.Millisecond
reloader.retryInterval = 100 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
g := sync.WaitGroup{}
g.Add(1)
go func() {
defer g.Done()
defer cancel()

reloadsSeen := 0
init := false
for {
select {
case <-ctx.Done():
return
case <-time.After(300 * time.Millisecond):
}

rel := reloads.Load().(int)
if init && rel <= reloadsSeen {
continue
}
init = true

reloadsSeen = rel

t.Log("Performing step number", rel)

switch rel {
case 1:
// The following steps try to reproduce the file change behavior when k8s projects a configmap as a volume
// and the configmap changes. https://github.com/kubernetes/kubernetes/blob/release-1.19/pkg/volume/util/atomic_writer.go#L87-L120
// Setup second config
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in", "second-config"), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "second-config", "cfg.yaml"), []byte(`
config:
b: b
`), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "in", "second-config", "rule.yaml"), []byte(`rule2`), os.ModePerm))

// Create temp data dir
testutil.Ok(t, os.Symlink(path.Join(dir, "in", "second-config"), path.Join(dir, "in", "tmp_data")))

// Rename data
testutil.Ok(t, os.Rename(path.Join(dir, "in", "tmp_data"), path.Join(dir, "in", "data")))

// Remove first config
testutil.Ok(t, os.RemoveAll(path.Join(dir, "in", "first-config", "rule.yaml")))
testutil.Ok(t, os.RemoveAll(path.Join(dir, "in", "first-config", "cfg.yaml")))
testutil.Ok(t, os.RemoveAll(path.Join(dir, "in", "first-config")))
}

if rel > 1 {
// All good.
return
}
}
}()
err = reloader.Watch(ctx)
cancel()
g.Wait()

testutil.Ok(t, err)
output, err := ioutil.ReadFile(path.Join(dir, "out", "cfg.yaml"))
testutil.Ok(t, err)
testutil.Equals(t, output, []byte(`
config:
b: b
`))
}