diff --git a/CHANGELOG.md b/CHANGELOG.md index b54340a56a..2c42808d8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor. - [#6352](https://github.com/thanos-io/thanos/pull/6352) Store: Expose store gateway query stats in series response hints. - [#6420](https://github.com/thanos-io/thanos/pull/6420) Index Cache: Cache expanded postings. +- [#6441](https://github.com/thanos-io/thanos/pull/6441) Compact: Compactor will set `index_stats` in `meta.json` file with max series and chunk size information. ### Fixed diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 5b2dbfeca7..366eb16145 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -385,10 +385,29 @@ func processDownsampling( "from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds()) metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds()) - if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex { + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime) + if err == nil { + err = stats.AnyErr() + } + if err != nil && !acceptMalformedIndex { return errors.Wrap(err, "output block index not valid") } + meta, err := metadata.ReadFromDir(resdir) + if err != nil { + return errors.Wrap(err, "read meta") + } + + if stats.ChunkMaxSize > 0 { + meta.Thanos.IndexStats.ChunkMaxSize = stats.ChunkMaxSize + } + if stats.SeriesMaxSize > 0 { + meta.Thanos.IndexStats.SeriesMaxSize = stats.SeriesMaxSize + } + if err := meta.WriteToDir(logger, resdir); err != nil { + return errors.Wrap(err, "write meta") + } + begin = time.Now() err = block.Upload(ctx, logger, bkt, resdir, hashFunc) diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index b373124122..6d977d3044 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -142,7 +142,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 4, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) // File stats are gathered. testutil.Equals(t, fmt.Sprintf(`{ @@ -181,7 +181,8 @@ func TestUpload(t *testing.T) { { "rel_path": "meta.json" } - ] + ], + "index_stats": {} } } `, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) @@ -192,7 +193,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 4, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) } { // Upload with no external labels should be blocked. @@ -224,7 +225,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 8, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)])) - testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) + testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) } } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 787a03c241..89ce68c1a2 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -90,6 +90,14 @@ type Thanos struct { // Rewrites is present when any rewrite (deletion, relabel etc) were applied to this block. Optional. Rewrites []Rewrite `json:"rewrites,omitempty"` + + // IndexStats contains stats info related to block index. + IndexStats IndexStats `json:"index_stats,omitempty"` +} + +type IndexStats struct { + SeriesMaxSize int64 `json:"series_max_size,omitempty"` + ChunkMaxSize int64 `json:"chunk_max_size,omitempty"` } type Rewrite struct { diff --git a/pkg/block/metadata/meta_test.go b/pkg/block/metadata/meta_test.go index 0be53197e8..94de50da64 100644 --- a/pkg/block/metadata/meta_test.go +++ b/pkg/block/metadata/meta_test.go @@ -31,7 +31,8 @@ func TestMeta_ReadWrite(t *testing.T) { "downsample": { "resolution": 0 }, - "source": "" + "source": "", + "index_stats": {} } } `, b.String()) @@ -73,6 +74,10 @@ func TestMeta_ReadWrite(t *testing.T) { Downsample: ThanosDownsample{ Resolution: 123144, }, + IndexStats: IndexStats{ + SeriesMaxSize: 2000, + ChunkMaxSize: 1000, + }, }, } testutil.Ok(t, m1.Write(&b)) @@ -121,7 +126,11 @@ func TestMeta_ReadWrite(t *testing.T) { { "rel_path": "meta.json" } - ] + ], + "index_stats": { + "series_max_size": 2000, + "chunk_max_size": 1000 + } } } `, b.String()) @@ -199,7 +208,8 @@ func TestMeta_ReadWrite(t *testing.T) { "rel_path": "index", "size_bytes": 1313 } - ] + ], + "index_stats": {} } } `, b.String()) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 585d5d6b4d..69aa553713 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -818,7 +818,7 @@ func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksErro return OutOfOrderChunksError{err: err, id: brokenBlock} } -// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError. +// IsOutOfOrderChunkError returns true if the base error is a OutOfOrderChunkError. func IsOutOfOrderChunkError(err error) bool { _, ok := errors.Cause(err).(OutOfOrderChunksError) return ok @@ -1100,28 +1100,45 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp bdir := filepath.Join(dir, compID.String()) index := filepath.Join(bdir, block.IndexFilename) - newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ - Labels: cg.labels.Map(), - Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, - Source: metadata.CompactorSource, - SegmentFiles: block.GetSegmentFiles(bdir), - }, nil) - if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) + if err := os.Remove(filepath.Join(bdir, "tombstones")); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") } - if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") + newMeta, err := metadata.ReadFromDir(bdir) + if err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "read new meta") } + var stats block.HealthStats // Ensure the output block is valid. err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { - return block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) + stats, err = block.GatherIndexHealthStats(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) + if err != nil { + return err + } + return stats.AnyErr() }) if !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } + thanosMeta := metadata.Thanos{ + Labels: cg.labels.Map(), + Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, + Source: metadata.CompactorSource, + SegmentFiles: block.GetSegmentFiles(bdir), + } + if stats.ChunkMaxSize > 0 { + thanosMeta.IndexStats.ChunkMaxSize = stats.ChunkMaxSize + } + if stats.SeriesMaxSize > 0 { + thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize + } + newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil) + if err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) + } + // Ensure the output block is not overlapping with anything else, // unless vertical compaction is enabled. if !cg.enableVerticalCompaction { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 9b40da16dc..9a798ece61 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -399,6 +399,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Assert(t, labels.Equal(extLabels, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set") + // Only one chunk will be generated in that block, so we won't set chunk size. + testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set") } { meta, ok := others[groupKey2] @@ -415,6 +417,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Assert(t, labels.Equal(extLabels2, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set") + // Only one chunk will be generated in that block, so we won't set chunk size. + testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set") } }) }