Skip to content

Commit

Permalink
compact: clean up directories thoroughly (#3869)
Browse files Browse the repository at this point in the history
* compact: clean up directories properly

I couldn't stop thinking about this code for some reason and I have
figured that I had missed one case in #3031. We need to also clean up
the directories in compaction groups. A compaction could fail leaving
some new directory with a random ULID on the disk. Before attempting to
do another compaction loop, we need to remove it as well because the
compaction process always produces a new, unique directory.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* Remove all non expected dirs.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Addressed comment.

Signed-off-by: Bartlomiej Plotka <[email protected]>

Co-authored-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
GiedriusS and bwplotka authored Mar 24, 2021
1 parent b446fae commit 1dce996
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 22 deletions.
4 changes: 3 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,9 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {

ignoreDirs := []string{}
for _, gr := range groups {
ignoreDirs = append(ignoreDirs, gr.Key())
for _, grID := range gr.IDs() {
ignoreDirs = append(ignoreDirs, filepath.Join(gr.Key(), grID.String()))
}
}

if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil {
Expand Down
31 changes: 24 additions & 7 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -162,13 +163,15 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ..

// DeleteAll deletes all files and directories inside the given
// dir except for the ignoreDirs directories.
// NOTE: DeleteAll is not idempotent.
func DeleteAll(dir string, ignoreDirs ...string) error {
entries, err := ioutil.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
var groupErrs errutil.MultiError

var matchingIgnores []string
for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
Expand All @@ -177,22 +180,36 @@ func DeleteAll(dir string, ignoreDirs ...string) error {
continue
}

var found bool
for _, id := range ignoreDirs {
if id == d.Name() {
found = true
break
// ignoreDirs might be multi-directory paths.
matchingIgnores = matchingIgnores[:0]
ignore := false
for _, ignoreDir := range ignoreDirs {
id := strings.Split(ignoreDir, "/")
if id[0] == d.Name() {
if len(id) == 1 {
ignore = true
break
}
matchingIgnores = append(matchingIgnores, filepath.Join(id[1:]...))
}
}

if !found {
if ignore {
continue
}

if len(matchingIgnores) == 0 {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}
if err := DeleteAll(filepath.Join(dir, d.Name()), matchingIgnores...); err != nil {
groupErrs.Add(err)
}
}

if groupErrs != nil {
if groupErrs.Err() != nil {
return errors.Wrap(groupErrs.Err(), "delete file/dir")
}
return nil
Expand Down
34 changes: 21 additions & 13 deletions pkg/runutil/runutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,35 +125,43 @@ func TestCloseMoreThanOnce(t *testing.T) {
testutil.Equals(t, true, lc.WasCalled)
}

func TestClearsDirectoriesFilesProperly(t *testing.T) {
func TestDeleteAll(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
testutil.Ok(t, err)

t.Cleanup(func() {
os.RemoveAll(dir)
testutil.Ok(t, os.RemoveAll(dir))
})

f, err := os.Create(filepath.Join(dir, "test123"))
f, err := os.Create(filepath.Join(dir, "file1"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm))
f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "a"), os.ModePerm))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "b"), os.ModePerm))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "c", "innerc"), os.ModePerm))
f, err = os.Create(filepath.Join(dir, "a", "file2"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())
f, err = os.Create(filepath.Join(dir, "c", "file3"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, DeleteAll(dir, "01EHBQRN4RF0HSRR1772KW0TN9", "01EHBQRN4RF0HSRR1772KW0TN8"))
testutil.Ok(t, DeleteAll(dir, "file1", "a", filepath.Join("c", "innerc")))

_, err = os.Stat(filepath.Join(dir, "test123"))
// Deleted.
_, err = os.Stat(filepath.Join(dir, "file1"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
_, err = os.Stat(filepath.Join(dir, "b/"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/"))
_, err = os.Stat(filepath.Join(dir, "file3"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/"))
// Exists.
_, err = os.Stat(filepath.Join(dir, "a", "file2"))
testutil.Ok(t, err)
_, err = os.Stat(filepath.Join(dir, "a/"))
testutil.Ok(t, err)
_, err = os.Stat(filepath.Join(dir, "c", "innerc"))
testutil.Ok(t, err)
}
24 changes: 23 additions & 1 deletion test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,26 @@ func TestCompactWithStoreGateway(t *testing.T) {

// No replica label with overlaps should halt compactor. This test is sequential
// because we do not want two Thanos Compact instances deleting the same partially
// uploaded blocks and blocks with deletion marks.
// uploaded blocks and blocks with deletion marks. We also check that Thanos Compactor
// deletes directories inside of a compaction group that do not belong there.
{
// Precreate a directory. It should be deleted.
// In a hypothetical scenario, the directory could be a left-over from
// a compaction that had crashed.
p := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt", "compact")

testutil.Assert(t, len(blocksWithHashes) > 0)

m, err := block.DownloadMeta(ctx, logger, bkt, blocksWithHashes[0])
testutil.Ok(t, err)

randBlockDir := filepath.Join(p, compact.DefaultGroupKey(m.Thanos), "ITISAVERYRANDULIDFORTESTS0")
testutil.Ok(t, os.MkdirAll(randBlockDir, os.ModePerm))

f, err := os.Create(filepath.Join(randBlockDir, "index"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
Expand Down Expand Up @@ -578,6 +596,10 @@ func TestCompactWithStoreGateway(t *testing.T) {
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded"))

testutil.Ok(t, s.Stop(c))

_, err = os.Stat(randBlockDir)
testutil.NotOk(t, err)
testutil.Assert(t, os.IsNotExist(err))
}

// Sequential because we want to check that Thanos Compactor does not
Expand Down

0 comments on commit 1dce996

Please sign in to comment.