Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make compact lifecycle more flexible to be overridden for sharded compaction #5964

Merged
merged 26 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c61ac66
POC on sharded compaction
alexqyle Dec 14, 2022
45a4cfa
Refactored partition info to be part of meta
alexqyle Jan 3, 2023
05f1048
Merge branch 'main' into sharded-compaction
alexqyle Jan 3, 2023
9dbe33e
Override prometheus populate block func
alexqyle Mar 7, 2023
712e401
Merge branch 'main' into sharded-compaction
alexqyle Apr 11, 2023
c71ed9d
Make block populator plugable through CompactionLifecycleCallback
alexqyle Apr 13, 2023
554b19d
Uncommented code
alexqyle Apr 13, 2023
1fa6b60
Merge branch 'main' into sharded-compaction
alexqyle Apr 13, 2023
66f60d8
refactor
alexqyle Apr 13, 2023
42dbe8a
rename
alexqyle Apr 13, 2023
3408c8f
Merge branch 'main' into sharded-compaction
alexqyle Apr 20, 2023
ed90f79
make Group.partitionInfo nil-able
alexqyle Apr 20, 2023
390496b
Merge branch 'main' into sharded-compaction
alexqyle Apr 26, 2023
53b6992
Merge branch 'main' into sharded-compaction
alexqyle May 1, 2023
ed28417
Merge branch 'main' into sharded-compaction
alexqyle May 9, 2023
1766f20
Merge branch 'main' into sharded-compaction
alexqyle May 10, 2023
bc2be70
add extension field to thanos meta
alexqyle Jun 8, 2023
a8d2b0b
Merge branch 'main' into sharded-compaction
alexqyle Jun 8, 2023
28927a3
Merge branch 'main' into sharded-compaction
alexqyle Jul 11, 2023
ff552b1
fixed merge issue
alexqyle Jul 11, 2023
d20a9a4
Merge branch 'main' into sharded-compaction
alexqyle Jul 11, 2023
8b75147
Clean up
alexqyle Jul 12, 2023
8fe0d04
Merge branch 'main' into sharded-compaction
alexqyle Jul 12, 2023
fba2e9f
Merge branch 'main' into sharded-compaction
alexqyle Jul 13, 2023
c40a67f
Merge branch 'main' into sharded-compaction
alexqyle Jul 14, 2023
169f38e
Added comment to ConvertExtensions func
alexqyle Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
IndexHeaderFilename = "index-header"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"
// PartitionInfoFilename is JSON filename for partition information.
PartitionInfoFilename = "partition-info.json"
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

// DebugMetas is a directory for debug meta files that happen in the past. Useful for debugging.
DebugMetas = "debug/metas"
Expand Down Expand Up @@ -153,6 +155,14 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
}

// level 1 blocks should not have partition info file
if meta.Compaction.Level > 1 {
if err := objstore.UploadFile(ctx, logger, bkt, filepath.Join(bdir, PartitionInfoFilename), path.Join(id.String(), PartitionInfoFilename)); err != nil {
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
// Don't call cleanUp here. Partition info file acts in a similar way as meta file.
return errors.Wrap(err, "upload partition info")
}
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads.
if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err != nil {
// Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases,
Expand Down
4 changes: 4 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,10 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*

var _ MetadataFilter = &DeduplicateFilter{}

type IDeduplicateFilter interface {
DuplicateIDs() []ulid.ULID
}

// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data.
// Not go-routine safe.
type DeduplicateFilter struct {
Expand Down
79 changes: 79 additions & 0 deletions pkg/block/partition_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package block

import (
"encoding/json"
"io"
"os"
"path/filepath"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/tsdb/fileutil"

"github.com/thanos-io/thanos/pkg/runutil"
)

type PartitionInfo struct {
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
PartitionedGroupID uint32 `json:"partitionedGroupID"`
PartitionCount int `json:"partitionCount"`
PartitionID int `json:"partitionID"`
}

// WriteToDir writes the encoded partition info into <dir>/partition-info.json.
func (p PartitionInfo) WriteToDir(logger log.Logger, dir string) error {
// Make any changes to the file appear atomic.
path := filepath.Join(dir, PartitionInfoFilename)
tmp := path + ".tmp"

f, err := os.Create(tmp)
if err != nil {
return err
}

if err := p.Write(f); err != nil {
runutil.CloseWithLogOnErr(logger, f, "close partition info")
return err
}
if err := f.Close(); err != nil {
return err
}
return renameFile(logger, tmp, path)
}

// Write writes the given encoded partition info to writer.
func (p PartitionInfo) Write(w io.Writer) error {
enc := json.NewEncoder(w)
enc.SetIndent("", "\t")
return enc.Encode(&p)
}

func renameFile(logger log.Logger, from, to string) error {
if err := os.RemoveAll(to); err != nil {
return err
}
if err := os.Rename(from, to); err != nil {
return err
}

// Directory was renamed; sync parent dir to persist rename.
pdir, err := fileutil.OpenDir(filepath.Dir(to))
if err != nil {
return err
}

if err = fileutil.Fdatasync(pdir); err != nil {
runutil.CloseWithLogOnErr(logger, pdir, "close dir")
return err
}
return pdir.Close()
}

// Read the block partition info from the given reader.
func ReadPartitionInfo(rc io.ReadCloser) (_ *PartitionInfo, err error) {
defer runutil.ExhaustCloseWithErrCapture(&err, rc, "close partition info JSON")

var p PartitionInfo
if err = json.NewDecoder(rc).Decode(&p); err != nil {
return nil, err
}
return &p, nil
}
Loading