Skip to content

Commit

Permalink
Make compact lifecycle more flexible to be overridden for sharded com…
Browse files Browse the repository at this point in the history
…paction (#5964)
  • Loading branch information
alexqyle authored Jul 14, 2023
1 parent dc337b2 commit 723dfd0
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 50 deletions.
24 changes: 14 additions & 10 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,24 +586,28 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*
return nil
}

var _ MetadataFilter = &DeduplicateFilter{}
var _ MetadataFilter = &DefaultDeduplicateFilter{}

// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data.
type DeduplicateFilter interface {
DuplicateIDs() []ulid.ULID
}

// DefaultDeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data.
// Not go-routine safe.
type DeduplicateFilter struct {
type DefaultDeduplicateFilter struct {
duplicateIDs []ulid.ULID
concurrency int
mu sync.Mutex
}

// NewDeduplicateFilter creates DeduplicateFilter.
func NewDeduplicateFilter(concurrency int) *DeduplicateFilter {
return &DeduplicateFilter{concurrency: concurrency}
// NewDeduplicateFilter creates DefaultDeduplicateFilter.
func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter {
return &DefaultDeduplicateFilter{concurrency: concurrency}
}

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
f.duplicateIDs = f.duplicateIDs[:0]

var wg sync.WaitGroup
Expand Down Expand Up @@ -635,7 +639,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
return nil
}

func (f *DeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) {
func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down Expand Up @@ -677,8 +681,8 @@ childLoop:
f.mu.Unlock()
}

// DuplicateIDs returns slice of block ids that are filtered out by DeduplicateFilter.
func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID {
// DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter.
func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID {
return f.duplicateIDs
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,36 @@ type Thanos struct {

// IndexStats contains stats info related to block index.
IndexStats IndexStats `json:"index_stats,omitempty"`

// Extensions are used for plugin any arbitrary additional information for block. Optional.
Extensions any `json:"extensions,omitempty"`
}

type IndexStats struct {
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
}

func (m *Thanos) ParseExtensions(v any) (any, error) {
return ConvertExtensions(m.Extensions, v)
}

// ConvertExtensions converts extensions with `any` type into specific type `v`
// that the caller expects.
func ConvertExtensions(extensions any, v any) (any, error) {
if extensions == nil {
return nil, nil
}
extensionsContent, err := json.Marshal(extensions)
if err != nil {
return nil, err
}
if err = json.Unmarshal(extensionsContent, v); err != nil {
return nil, err
}
return v, nil
}

type Rewrite struct {
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
Expand Down
123 changes: 123 additions & 0 deletions pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,127 @@ func TestMeta_ReadWrite(t *testing.T) {
m1.Thanos.Labels = map[string]string{}
testutil.Equals(t, m1, *retMeta)
})

t.Run("extensions write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Labels: map[string]string{"ext": "lset1"},
Source: ReceiveSource,
Downsample: ThanosDownsample{
Resolution: 123144,
},
Extensions: &TestExtensions{
Field1: 1,
Field2: "test_string",
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123
},
"version": 1,
"thanos": {
"labels": {
"ext": "lset1"
},
"downsample": {
"resolution": 123144
},
"source": "receive",
"index_stats": {},
"extensions": {
"field1": 1,
"field2": "test_string"
}
}
}
`, b.String())
retMeta, err := Read(io.NopCloser(&b))
testutil.Ok(t, err)
retExtensions, err := retMeta.Thanos.ParseExtensions(&TestExtensions{})
_, ok := retExtensions.(*TestExtensions)
testutil.Equals(t, true, ok)
testutil.Ok(t, err)
testutil.Equals(t, m1.Thanos.Extensions, retExtensions)
})

t.Run("empty extensions write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Labels: map[string]string{"ext": "lset1"},
Source: ReceiveSource,
Downsample: ThanosDownsample{
Resolution: 123144,
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123
},
"version": 1,
"thanos": {
"labels": {
"ext": "lset1"
},
"downsample": {
"resolution": 123144
},
"source": "receive",
"index_stats": {}
}
}
`, b.String())
retMeta, err := Read(io.NopCloser(&b))
testutil.Ok(t, err)
retExtensions, err := retMeta.Thanos.ParseExtensions(&TestExtensions{})
testutil.Ok(t, err)
testutil.Equals(t, m1.Thanos.Extensions, retExtensions)
})
}

type TestExtensions struct {
Field1 int `json:"field1"`
Field2 string `json:"field2"`
}
Loading

0 comments on commit 723dfd0

Please sign in to comment.