From 63ef382fc335969fa2fb3e9c9025eb0511fbc3af Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 28 May 2020 09:52:11 +0200 Subject: [PATCH] Removed legacy index.cache.json support (#2667) * Removed legacy index.cache.json support Signed-off-by: Marco Pracucci * Added CHANGELOG entry Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 + cmd/thanos/compact.go | 116 ------ cmd/thanos/main_test.go | 66 ---- cmd/thanos/store.go | 13 +- pkg/block/block.go | 8 - pkg/block/indexheader/binary_reader.go | 14 + pkg/block/indexheader/header_test.go | 161 +++----- pkg/block/indexheader/json_reader.go | 353 ------------------ pkg/compact/compact.go | 6 - .../downsample/streamed_block_writer.go | 9 - pkg/store/bucket.go | 17 +- pkg/store/bucket_e2e_test.go | 1 - pkg/store/bucket_test.go | 3 - 13 files changed, 83 insertions(+), 686 deletions(-) delete mode 100644 pkg/block/indexheader/json_reader.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a278eb566d..77396faa09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2658](https://github.com/thanos-io/thanos/pull/2658) Upgrade to Prometheus [@f4dd45609a05](https://github.com/prometheus/prometheus/commit/f4dd45609a05) which is after v2.18.1. - TSDB now does memory-mapping of Head chunks and reduces memory usage. +- [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed. +- [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed. ## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 55517f0862..6f572f2748 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -8,7 +8,6 @@ import ( "fmt" "os" "path" - "path/filepath" "strconv" "strings" "time" @@ -16,7 +15,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" - "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -25,15 +23,12 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/indexheader" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" - "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" @@ -42,11 +37,6 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) -const ( - metricIndexGenerateName = "thanos_compact_generated_index_total" - metricIndexGenerateHelp = "Total number of generated indexes." -) - var ( compactions = compactionSet{ 1 * time.Hour, @@ -265,7 +255,6 @@ func runCompact( var ( compactDir = path.Join(conf.dataDir, "compact") downsamplingDir = path.Join(conf.dataDir, "downsample") - indexCacheDir = path.Join(conf.dataDir, "index_cache") ) if err := os.RemoveAll(downsamplingDir); err != nil { @@ -345,17 +334,6 @@ func runCompact( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - // Generate index files. - // TODO(bwplotka): Remove this in next release. - if conf.generateMissingIndexCacheFiles { - if err := sy.SyncMetas(ctx); err != nil { - return err - } - if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, sy.Metas(), indexCacheDir); err != nil { - return err - } - } - if !conf.wait { return compactMainFn() } @@ -439,96 +417,6 @@ func runCompact( return nil } -// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. -func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string) error { - genIndex := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: metricIndexGenerateName, - Help: metricIndexGenerateHelp, - }) - - if err := os.RemoveAll(dir); err != nil { - return errors.Wrap(err, "clean index cache directory") - } - if err := os.MkdirAll(dir, 0777); err != nil { - return errors.Wrap(err, "create dir") - } - - defer func() { - if err := os.RemoveAll(dir); err != nil { - level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err) - } - }() - - level.Info(logger).Log("msg", "start index cache processing") - - for _, meta := range metas { - // New version of compactor pushes index cache along with data block. - // Skip uncompacted blocks. - if meta.Compaction.Level == 1 { - continue - } - - if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { - return err - } - genIndex.Inc() - } - - level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`") - return nil -} - -func generateIndexCacheFile( - ctx context.Context, - bkt objstore.Bucket, - logger log.Logger, - indexCacheDir string, - meta *metadata.Meta, -) error { - id := meta.ULID - - bdir := filepath.Join(indexCacheDir, id.String()) - if err := os.MkdirAll(bdir, 0777); err != nil { - return errors.Wrap(err, "create block dir") - } - - defer func() { - if err := os.RemoveAll(bdir); err != nil { - level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err) - } - }() - - cachePath := filepath.Join(bdir, block.IndexCacheFilename) - cache := path.Join(meta.ULID.String(), block.IndexCacheFilename) - - ok, err := bkt.Exists(ctx, cache) - if ok { - return nil - } - if err != nil { - return errors.Wrapf(err, "attempt to check if a cached index file exists") - } - - level.Debug(logger).Log("msg", "make index cache", "block", id) - - // Try to download index file from obj store. - indexPath := filepath.Join(bdir, block.IndexFilename) - index := path.Join(id.String(), block.IndexFilename) - - if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil { - return errors.Wrap(err, "download index file") - } - - if err := indexheader.WriteJSON(logger, indexPath, cachePath); err != nil { - return errors.Wrap(err, "write index cache") - } - - if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil { - return errors.Wrap(err, "upload index cache") - } - return nil -} - type compactConfig struct { haltOnError bool acceptMalformedIndex bool @@ -540,7 +428,6 @@ type compactConfig struct { retentionRaw, retentionFiveMin, retentionOneHr model.Duration wait bool waitInterval time.Duration - generateMissingIndexCacheFiles bool disableDownsampling bool blockSyncConcurrency int compactionConcurrency int @@ -584,9 +471,6 @@ func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) *compactConfig { cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified."). Default("5m").DurationVar(&cc.waitInterval) - cmd.Flag("index.generate-missing-cache-file", "DEPRECATED flag. Will be removed in next release. If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload."). - Hidden().Default("false").BoolVar(&cc.generateMissingIndexCacheFiles) - cmd.Flag("downsampling.disable", "Disables downsampling. This is not recommended "+ "as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway"). Default("false").BoolVar(&cc.disableDownsampling) diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index ffd6598bd8..7b119b87da 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -8,18 +8,15 @@ import ( "io/ioutil" "os" "path" - "path/filepath" "testing" "time" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/objstore" @@ -27,69 +24,6 @@ import ( "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) -func TestCleanupIndexCacheFolder(t *testing.T) { - logger := log.NewLogfmtLogger(os.Stderr) - dir, err := ioutil.TempDir("", "test-compact-cleanup") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) - - // Upload one compaction lvl = 2 block, one compaction lvl = 1. - // We generate index cache files only for lvl > 1 blocks. - { - id, err := e2eutil.CreateBlock( - ctx, - dir, - []labels.Labels{{{Name: "a", Value: "1"}}}, - 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. - labels.Labels{{Name: "e1", Value: "1"}}, - downsample.ResLevel0) - testutil.Ok(t, err) - - meta, err := metadata.Read(filepath.Join(dir, id.String())) - testutil.Ok(t, err) - - meta.Compaction.Level = 2 - - testutil.Ok(t, metadata.Write(logger, filepath.Join(dir, id.String()), meta)) - testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()))) - } - { - id, err := e2eutil.CreateBlock( - ctx, - dir, - []labels.Labels{{{Name: "a", Value: "1"}}}, - 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. - labels.Labels{{Name: "e1", Value: "1"}}, - downsample.ResLevel0) - testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()))) - } - - reg := prometheus.NewRegistry() - expReg := prometheus.NewRegistry() - genIndexExp := promauto.With(expReg).NewCounter(prometheus.CounterOpts{ - Name: metricIndexGenerateName, - Help: metricIndexGenerateHelp, - }) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil) - testutil.Ok(t, err) - - metas, _, err := metaFetcher.Fetch(ctx) - testutil.Ok(t, err) - testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metas, dir)) - - genIndexExp.Inc() - testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName) - - _, err = os.Stat(dir) - testutil.Assert(t, os.IsNotExist(err), "index cache dir should not exist at the end of execution") -} - func TestCleanupDownsampleCacheFolder(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) dir, err := ioutil.TempDir("", "test-compact-cleanup") diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7536681119..940bec6307 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -87,13 +87,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf := regSelectorRelabelFlags(cmd) - // TODO(bwplotka): Remove in v0.13.0 if no issues. - disableIndexHeader := cmd.Flag("store.disable-index-header", "If specified, Store Gateway will use index-cache.json for each block instead of recreating binary index-header"). - Hidden().Default("false").Bool() - postingOffsetsInMemSampling := cmd.Flag("store.index-header-posting-offsets-in-mem-sampling", "Controls what is the ratio of postings offsets store will hold in memory. "+ "Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings. It's meant for setups that want low baseline memory pressure and where less traffic is expected. "+ - "On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance. This works only when --store.disable-index-header is NOT specified."). + "On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance."). Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int() enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size."). @@ -146,7 +142,6 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { }, selectorRelabelConf, *advertiseCompatibilityLabel, - *disableIndexHeader, *enablePostingsCompression, time.Duration(*consistencyDelay), time.Duration(*ignoreDeletionMarksDelay), @@ -179,7 +174,7 @@ func runStore( blockSyncConcurrency int, filterConf *store.FilterConfig, selectorRelabelConf *extflag.PathOrContent, - advertiseCompatibilityLabel, disableIndexHeader, enablePostingsCompression bool, + advertiseCompatibilityLabel, enablePostingsCompression bool, consistencyDelay time.Duration, ignoreDeletionMarksDelay time.Duration, externalPrefix, prefixHeader string, @@ -281,9 +276,6 @@ func runStore( return errors.Wrap(err, "meta fetcher") } - if !disableIndexHeader { - level.Info(logger).Log("msg", "index-header instead of index-cache.json enabled") - } bs, err := store.NewBucketStore( logger, reg, @@ -298,7 +290,6 @@ func runStore( blockSyncConcurrency, filterConf, advertiseCompatibilityLabel, - !disableIndexHeader, enablePostingsCompression, postingOffsetsInMemSampling, false, diff --git a/pkg/block/block.go b/pkg/block/block.go index 2bc7f86aae..b2492fbf39 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -33,8 +33,6 @@ const ( MetaFilename = "meta.json" // IndexFilename is the known index file for block index. IndexFilename = "index" - // IndexCacheFilename is the canonical name for json index cache file that stores essential information. - IndexCacheFilename = "index.cache.json" // IndexHeaderFilename is the canonical name for binary index header file that stores essential information. IndexHeaderFilename = "index-header" // ChunksDirname is the known dir name for chunks with compressed samples. @@ -105,12 +103,6 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index")) } - if meta.Thanos.Source == metadata.CompactorSource { - if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil { - return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index cache")) - } - } - // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file // to be pending uploads. if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil { diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 0687e74d5d..42c33767f7 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -886,3 +886,17 @@ func (r BinaryReader) LabelNames() []string { } func (r *BinaryReader) Close() error { return r.c.Close() } + +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) index.ByteSlice { + return b[start:end] +} diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 2b1e2d802b..828ef7bb06 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -5,7 +5,6 @@ package indexheader import ( "context" - "io" "io/ioutil" "math" "os" @@ -13,17 +12,16 @@ import ( "testing" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/filesystem" - "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -170,24 +168,6 @@ func TestReaders(t *testing.T) { compareIndexToHeader(t, b, br) }) - - t.Run("json", func(t *testing.T) { - fn := filepath.Join(tmpDir, id.String(), block.IndexCacheFilename) - testutil.Ok(t, WriteJSON(log.NewNopLogger(), filepath.Join(tmpDir, id.String(), "index"), fn)) - - jr, err := NewJSONReader(ctx, log.NewNopLogger(), nil, tmpDir, id) - testutil.Ok(t, err) - - defer func() { testutil.Ok(t, jr.Close()) }() - - if id == id1 { - testutil.Equals(t, 14, len(jr.symbols)) - testutil.Equals(t, 2, len(jr.lvals)) - testutil.Equals(t, 15, len(jr.postings)) - } - - compareIndexToHeader(t, b, jr) - }) }) } @@ -331,9 +311,9 @@ func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *meta return m } -func BenchmarkJSONWrite(t *testing.B) { +func BenchmarkBinaryWrite(t *testing.B) { ctx := context.Background() - logger := log.NewNopLogger() + tmpDir, err := ioutil.TempDir("", "bench-indexheader") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -343,110 +323,93 @@ func BenchmarkJSONWrite(t *testing.B) { defer func() { testutil.Ok(t, bkt.Close()) }() m := prepareIndexV2Block(t, tmpDir, bkt) - testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "local", m.ULID.String()), os.ModePerm)) - fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexCacheFilename) + fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) + t.ResetTimer() for i := 0; i < t.N; i++ { - testutil.Ok(t, forceDownloadFile( - ctx, - logger, - bkt, - filepath.Join(m.ULID.String(), block.IndexFilename), - filepath.Join(tmpDir, "local", m.ULID.String(), block.IndexFilename), - )) - testutil.Ok(t, WriteJSON(logger, filepath.Join(tmpDir, "local", m.ULID.String(), block.IndexFilename), fn)) - } -} - -func forceDownloadFile(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, src, dst string) (err error) { - rc, err := bkt.Get(ctx, src) - if err != nil { - return errors.Wrapf(err, "get file %s", src) - } - defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader") - - f, err := os.OpenFile(dst, os.O_CREATE|os.O_RDWR, os.ModePerm) - if err != nil { - return errors.Wrap(err, "create file") - } - - if _, err := f.Seek(0, 0); err != nil { - return err - } - defer func() { - if err != nil { - if rerr := os.Remove(dst); rerr != nil { - level.Warn(logger).Log("msg", "failed to remove partially downloaded file", "file", dst, "err", rerr) - } - } - }() - defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") - - if _, err = io.Copy(f, rc); err != nil { - return errors.Wrap(err, "copy object to file") + testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) } - return nil } -func BenchmarkJSONReader(t *testing.B) { - logger := log.NewNopLogger() +func BenchmarkBinaryReader(t *testing.B) { + ctx := context.Background() tmpDir, err := ioutil.TempDir("", "bench-indexheader") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) - defer func() { testutil.Ok(t, bkt.Close()) }() m := prepareIndexV2Block(t, tmpDir, bkt) - fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexCacheFilename) - testutil.Ok(t, WriteJSON(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String(), block.IndexFilename), fn)) + fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) + testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) t.ResetTimer() for i := 0; i < t.N; i++ { - jr, err := newFileJSONReader(logger, fn) + br, err := newFileBinaryReader(fn, 32) testutil.Ok(t, err) - testutil.Ok(t, jr.Close()) + testutil.Ok(t, br.Close()) } } -func BenchmarkBinaryWrite(t *testing.B) { - ctx := context.Background() +func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { + version := int(b.Range(4, 5)[0]) - tmpDir, err := ioutil.TempDir("", "bench-indexheader") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + if version != 1 && version != 2 { + return nil, errors.Errorf("unknown index file version %d", version) + } - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - testutil.Ok(t, err) - defer func() { testutil.Ok(t, bkt.Close()) }() + toc, err := index.NewTOCFromByteSlice(b) + if err != nil { + return nil, errors.Wrap(err, "read TOC") + } - m := prepareIndexV2Block(t, tmpDir, bkt) - fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) + symbolsV2, symbolsV1, err := readSymbols(b, version, int(toc.Symbols)) + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } - t.ResetTimer() - for i := 0; i < t.N; i++ { - testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) + symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) + for o, s := range symbolsV1 { + symbolsTable[o] = s } + for o, s := range symbolsV2 { + symbolsTable[uint32(o)] = s + } + return symbolsTable, nil } -func BenchmarkBinaryReader(t *testing.B) { - ctx := context.Background() - tmpDir, err := ioutil.TempDir("", "bench-indexheader") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() - - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - testutil.Ok(t, err) +// readSymbols reads the symbol table fully into memory and allocates proper strings for them. +// Strings backed by the mmap'd memory would cause memory faults if applications keep using them +// after the reader is closed. +func readSymbols(bs index.ByteSlice, version int, off int) ([]string, map[uint32]string, error) { + if off == 0 { + return nil, nil, nil + } + d := encoding.NewDecbufAt(bs, off, castagnoliTable) + + var ( + origLen = d.Len() + cnt = d.Be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d.Len()) + symbolSlice []string + symbols = map[uint32]string{} + ) + if version == index.FormatV2 { + symbolSlice = make([]string, 0, cnt) + } - m := prepareIndexV2Block(t, tmpDir, bkt) - fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) - testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) + for d.Err() == nil && d.Len() > 0 && cnt > 0 { + s := d.UvarintStr() - t.ResetTimer() - for i := 0; i < t.N; i++ { - br, err := newFileBinaryReader(fn, 32) - testutil.Ok(t, err) - testutil.Ok(t, br.Close()) + if version == index.FormatV2 { + symbolSlice = append(symbolSlice, s) + } else { + symbols[nextPos] = s + nextPos = basePos + uint32(origLen-d.Len()) + } + cnt-- } + return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") } diff --git a/pkg/block/indexheader/json_reader.go b/pkg/block/indexheader/json_reader.go deleted file mode 100644 index fe54534972..0000000000 --- a/pkg/block/indexheader/json_reader.go +++ /dev/null @@ -1,353 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package indexheader - -import ( - "context" - "encoding/json" - "io/ioutil" - "os" - "path/filepath" - "sort" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/tsdb/encoding" - "github.com/prometheus/prometheus/tsdb/fileutil" - "github.com/prometheus/prometheus/tsdb/index" - "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/runutil" -) - -const ( - // JSONVersion1 is a enumeration of index-cache.json versions supported by Thanos. - JSONVersion1 = iota + 1 -) - -var ( - jsonUnmarshalError = errors.New("unmarshal index cache") -) - -type postingsRange struct { - Name, Value string - Start, End int64 -} - -type indexCache struct { - Version int - CacheVersion int - Symbols map[uint32]string - LabelValues map[string][]string - Postings []postingsRange -} - -type realByteSlice []byte - -func (b realByteSlice) Len() int { - return len(b) -} - -func (b realByteSlice) Range(start, end int) []byte { - return b[start:end] -} - -func (b realByteSlice) Sub(start, end int) index.ByteSlice { - return b[start:end] -} - -// readSymbols reads the symbol table fully into memory and allocates proper strings for them. -// Strings backed by the mmap'd memory would cause memory faults if applications keep using them -// after the reader is closed. -func readSymbols(bs index.ByteSlice, version int, off int) ([]string, map[uint32]string, error) { - if off == 0 { - return nil, nil, nil - } - d := encoding.NewDecbufAt(bs, off, castagnoliTable) - - var ( - origLen = d.Len() - cnt = d.Be32int() - basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d.Len()) - symbolSlice []string - symbols = map[uint32]string{} - ) - if version == index.FormatV2 { - symbolSlice = make([]string, 0, cnt) - } - - for d.Err() == nil && d.Len() > 0 && cnt > 0 { - s := d.UvarintStr() - - if version == index.FormatV2 { - symbolSlice = append(symbolSlice, s) - } else { - symbols[nextPos] = s - nextPos = basePos + uint32(origLen-d.Len()) - } - cnt-- - } - return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") -} - -func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { - version := int(b.Range(4, 5)[0]) - - if version != 1 && version != 2 { - return nil, errors.Errorf("unknown index file version %d", version) - } - - toc, err := index.NewTOCFromByteSlice(b) - if err != nil { - return nil, errors.Wrap(err, "read TOC") - } - - symbolsV2, symbolsV1, err := readSymbols(b, version, int(toc.Symbols)) - if err != nil { - return nil, errors.Wrap(err, "read symbols") - } - - symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) - for o, s := range symbolsV1 { - symbolsTable[o] = s - } - for o, s := range symbolsV2 { - symbolsTable[uint32(o)] = s - } - return symbolsTable, nil -} - -// WriteJSON writes a cache file containing the first lookup stages -// for an index file. -func WriteJSON(logger log.Logger, indexFn string, fn string) error { - indexFile, err := fileutil.OpenMmapFile(indexFn) - if err != nil { - return errors.Wrapf(err, "open mmap index file %s", indexFn) - } - defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn) - - b := realByteSlice(indexFile.Bytes()) - indexr, err := index.NewReader(b) - if err != nil { - return errors.Wrap(err, "open index reader") - } - defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader") - - // We assume reader verified index already. - symbols, err := getSymbolTable(b) - if err != nil { - return err - } - - f, err := os.Create(fn) - if err != nil { - return errors.Wrap(err, "create index cache file") - } - defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") - - v := indexCache{ - Version: indexr.Version(), - CacheVersion: JSONVersion1, - Symbols: symbols, - LabelValues: map[string][]string{}, - } - - // Extract label value indices. - lnames, err := indexr.LabelNames() - if err != nil { - return errors.Wrap(err, "read label indices") - } - for _, ln := range lnames { - vals, err := indexr.LabelValues(ln) - if err != nil { - return errors.Wrap(err, "get label values") - } - v.LabelValues[ln] = vals - } - - // Extract postings ranges. - pranges, err := indexr.PostingsRanges() - if err != nil { - return errors.Wrap(err, "read postings ranges") - } - for l, rng := range pranges { - v.Postings = append(v.Postings, postingsRange{ - Name: l.Name, - Value: l.Value, - Start: rng.Start, - End: rng.End, - }) - } - - if err := json.NewEncoder(f).Encode(&v); err != nil { - return errors.Wrap(err, "encode file") - } - return nil -} - -// JSONReader is a reader based on index-cache.json files. -type JSONReader struct { - indexVersion int - symbols []string - lvals map[string][]string - postings map[labels.Label]index.Range -} - -// NewJSONReader loads or builds new index-cache.json if not present on disk or object storage. -func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, id ulid.ULID) (*JSONReader, error) { - cachefn := filepath.Join(dir, id.String(), block.IndexCacheFilename) - jr, err := newFileJSONReader(logger, cachefn) - if err == nil { - return jr, nil - } - - if !os.IsNotExist(errors.Cause(err)) && errors.Cause(err) != jsonUnmarshalError { - return nil, errors.Wrap(err, "read index cache") - } - - // Just in case the dir was not created. - if err := os.MkdirAll(filepath.Join(dir, id.String()), os.ModePerm); err != nil { - return nil, errors.Wrap(err, "create dir") - } - - // Try to download index cache file from object store. - if err = objstore.DownloadFile(ctx, logger, bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr), filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil { - return newFileJSONReader(logger, cachefn) - } - - if !bkt.IsObjNotFoundErr(errors.Cause(err)) && errors.Cause(err) != jsonUnmarshalError { - return nil, errors.Wrap(err, "download index cache file") - } - - // No cache exists on disk yet, build it from the downloaded index and retry. - fn := filepath.Join(dir, id.String(), block.IndexFilename) - - if err := objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexFilename), fn); err != nil { - return nil, errors.Wrap(err, "download index file") - } - - defer func() { - if rerr := os.Remove(fn); rerr != nil { - level.Error(logger).Log("msg", "failed to remove temp index file", "path", fn, "err", rerr) - } - }() - - if err := WriteJSON(logger, fn, cachefn); err != nil { - return nil, errors.Wrap(err, "write index cache") - } - - return newFileJSONReader(logger, cachefn) -} - -// ReadJSON reads an index cache file. -func newFileJSONReader(logger log.Logger, fn string) (*JSONReader, error) { - f, err := os.Open(fn) - if err != nil { - return nil, errors.Wrap(err, "open file") - } - defer runutil.CloseWithLogOnErr(logger, f, "index cache json close") - - var v indexCache - - bytes, err := ioutil.ReadFile(fn) - if err != nil { - return nil, errors.Wrap(err, "read file") - } - - if err = json.Unmarshal(bytes, &v); err != nil { - return nil, errors.Wrap(jsonUnmarshalError, err.Error()) - } - - strs := map[string]string{} - var maxSymbolID uint32 - for o := range v.Symbols { - if o > maxSymbolID { - maxSymbolID = o - } - } - - jr := &JSONReader{ - indexVersion: v.Version, - lvals: make(map[string][]string, len(v.LabelValues)), - postings: make(map[labels.Label]index.Range, len(v.Postings)), - symbols: make([]string, maxSymbolID+1), - } - - // Most strings we encounter are duplicates. Dedup string objects that we keep - // around after the function returns to reduce total memory usage. - // NOTE(fabxc): it could even make sense to deduplicate globally. - getStr := func(s string) string { - if cs, ok := strs[s]; ok { - return cs - } - strs[s] = s - return s - } - - for o, s := range v.Symbols { - jr.symbols[o] = getStr(s) - } - for ln, vals := range v.LabelValues { - for i := range vals { - vals[i] = getStr(vals[i]) - } - jr.lvals[getStr(ln)] = vals - } - for _, e := range v.Postings { - l := labels.Label{ - Name: getStr(e.Name), - Value: getStr(e.Value), - } - jr.postings[l] = index.Range{Start: e.Start, End: e.End} - } - return jr, nil -} - -func (r *JSONReader) IndexVersion() int { - return r.indexVersion -} - -func (r *JSONReader) LookupSymbol(o uint32) (string, error) { - idx := int(o) - if idx >= len(r.symbols) { - return "", errors.Errorf("indexJSONReader: unknown symbol offset %d", o) - } - // NOTE: This is not entirely correct, symbols slice can have gaps. Not fixing as JSON reader - // is replaced by index-header. - return r.symbols[idx], nil -} - -func (r *JSONReader) PostingsOffset(name, value string) (index.Range, error) { - rng, ok := r.postings[labels.Label{Name: name, Value: value}] - if !ok { - return index.Range{}, NotFoundRangeErr - } - return rng, nil -} - -// LabelValues returns label values for single name. -func (r *JSONReader) LabelValues(name string) ([]string, error) { - vals, ok := r.lvals[name] - if !ok { - return nil, nil - } - res := make([]string, 0, len(vals)) - return append(res, vals...), nil -} - -// LabelNames returns a list of label names. -func (r *JSONReader) LabelNames() []string { - res := make([]string, 0, len(r.lvals)) - for ln := range r.lvals { - res = append(res, ln) - } - sort.Strings(res) - return res -} - -func (r *JSONReader) Close() error { return nil } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 26951b0124..4fa0048dfa 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -23,7 +23,6 @@ import ( "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/objstore" @@ -724,7 +723,6 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) index := filepath.Join(bdir, block.IndexFilename) - indexCache := filepath.Join(bdir, block.IndexCacheFilename) newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ Labels: cg.labels.Map(), @@ -752,10 +750,6 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } } - if err := indexheader.WriteJSON(cg.logger, index, indexCache); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "write index cache") - } - begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index a987c6fa95..8ea34794bb 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -160,14 +159,6 @@ func (w *streamedBlockWriter) Close() error { merr.Add(cl.Close()) } - if err := indexheader.WriteJSON( - w.logger, - filepath.Join(w.blockDir, block.IndexFilename), - filepath.Join(w.blockDir, block.IndexCacheFilename), - ); err != nil { - return errors.Wrap(err, "write index cache") - } - if err := w.writeMetaFile(); err != nil { return errors.Wrap(err, "write meta meta") } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c9a0e0aed3..ee04b97f8a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -252,7 +252,6 @@ type BucketStore struct { filterConfig *FilterConfig advLabelSets []storepb.LabelSet enableCompatibilityLabel bool - enableIndexHeader bool // Reencode postings using diff+varint+snappy when storing to cache. // This makes them smaller, but takes extra CPU and memory. @@ -280,7 +279,6 @@ func NewBucketStore( blockSyncConcurrency int, filterConfig *FilterConfig, enableCompatibilityLabel bool, - enableIndexHeader bool, enablePostingsCompression bool, postingOffsetsInMemSampling int, enableSeriesHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. @@ -318,7 +316,6 @@ func NewBucketStore( samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, - enableIndexHeader: enableIndexHeader, enablePostingsCompression: enablePostingsCompression, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesHints: enableSeriesHints, @@ -478,17 +475,9 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er lset := labels.FromMap(meta.Thanos.Labels) h := lset.Hash() - var indexHeaderReader indexheader.Reader - if s.enableIndexHeader { - indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling) - if err != nil { - return errors.Wrap(err, "create index header reader") - } - } else { - indexHeaderReader, err = indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID) - if err != nil { - return errors.Wrap(err, "create index cache reader") - } + indexHeaderReader, err := indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling) + if err != nil { + return errors.Wrap(err, "create index header reader") } defer func() { if err != nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 94e7ef5b26..6664c30825 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -167,7 +167,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m filterConf, true, true, - true, DefaultPostingOffsetInMemorySampling, true, ) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c3abb926fd..9dee06da0c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -482,7 +482,6 @@ func TestBucketStore_Info(t *testing.T) { allowAllFilterConf, true, true, - true, DefaultPostingOffsetInMemorySampling, false, ) @@ -733,7 +732,6 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul allowAllFilterConf, true, true, - true, DefaultPostingOffsetInMemorySampling, false, ) @@ -1685,7 +1683,6 @@ func TestSeries_HintsEnabled(t *testing.T) { nil, false, true, - true, DefaultPostingOffsetInMemorySampling, true, )