Skip to content

Commit

Permalink
Optimize postings fetching by checking postings and series size (#6465)
Browse files Browse the repository at this point in the history
* optimize postings fetching by checking postings and series size

Signed-off-by: Ben Ye <[email protected]>

* address some review comments

Signed-off-by: Ben Ye <[email protected]>

* add acceptance test and fixed bug of skipping posting groups with add keys

Signed-off-by: Ben Ye <[email protected]>

* add lazy postings param to block series clinet

Signed-off-by: Ben Ye <[email protected]>

* switch to use block estimated max series size

Signed-off-by: Ben Ye <[email protected]>

* added two more metrics

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Sep 11, 2023
1 parent 0bf30e1 commit f6a3950
Show file tree
Hide file tree
Showing 14 changed files with 1,449 additions and 247 deletions.
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout)

cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb)

cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").
Expand Down Expand Up @@ -382,6 +386,7 @@ func runStore(
}
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
}

if conf.debugLogging {
Expand Down
5 changes: 5 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.enable-lazy-expanded-postings
If true, Store Gateway will estimate postings
size and try to lazily expand postings if
it downloads less data than expanding all
postings.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Expand Down
10 changes: 6 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
Expand Down Expand Up @@ -184,7 +184,9 @@ func TestUpload(t *testing.T) {
"rel_path": "meta.json"
}
],
"index_stats": {}
"index_stats": {
"series_max_size": 16
}
}
}
`, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand All @@ -195,7 +197,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down Expand Up @@ -227,7 +229,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
postingLengthFieldSize = 4
)

var NotFoundRange = index.Range{Start: -1, End: -1}

// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
Expand Down Expand Up @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}

// PostingsOffsets implements Reader.
func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
return r.postingsOffset(name, values...)
}

// TODO(bwplotka): Get advantage of multi value offset fetch.
func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) {
rngs, err := r.postingsOffset(name, value)
if err != nil {
return index.Range{}, err
}
if len(rngs) != 1 {
if len(rngs) != 1 || rngs[0] == NotFoundRange {
return index.Range{}, NotFoundRangeErr
}
return rngs[0], nil
Expand Down Expand Up @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
rngs = append(rngs, NotFoundRange)
valueIndex++
}

Expand All @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue })
if i == len(e.offsets) {
// We're past the end.
for len(rngs) < len(values) {
rngs = append(rngs, NotFoundRange)
}
break
}
if i > 0 && e.offsets[i].value != wantedValue {
Expand Down Expand Up @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
// Record on the way if wanted value is equal to the current value.
if string(value) == wantedValue {
newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
} else {
rngs = append(rngs, NotFoundRange)
}
valueIndex++
if valueIndex == len(values) {
Expand All @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
}

if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value {
// Increment i when wanted value is same as next offset.
if wantedValue == e.offsets[i+1].value {
i++
}
// wantedValue is smaller or same as the next offset we know about, let's iterate further to add those.
continue
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ type Reader interface {
// IndexVersion returns version of index.
IndexVersion() (int, error)

// PostingsOffsets returns start and end offsets for postings for given name and values.
// Input values need to be sorted.
// If the requested label name doesn't exist, then no posting and error will be returned.
// If the requested label name exists, but some values don't exist, the corresponding index range
// will be set to -1 for both start and end.
PostingsOffsets(name string, value ...string) ([]index.Range, error)

// PostingsOffset returns start and end offsets of postings for given name and value.
// The end offset might be bigger than the actual posting ending, but not larger than the whole index file.
// NotFoundRangeErr is returned when no index can be found for given name and value.
// TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark.
PostingsOffset(name string, value string) (index.Range, error)

// LookupSymbol returns string based on given reference.
Expand Down
32 changes: 32 additions & 0 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

// single value
rngs, err := br.PostingsOffsets("a", "9")
testutil.Ok(t, err)
for _, rng := range rngs {
testutil.Assert(t, rng.End > rng.Start)
}

rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9")
testutil.Ok(t, err)
for _, rng := range rngs {
testutil.Assert(t, rng.End > rng.Start)
}

rngs, err = br.PostingsOffsets("a", "0")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 1)
testutil.Equals(t, NotFoundRange, rngs[0])

rngs, err = br.PostingsOffsets("a", "0", "10", "99")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 3)
for _, rng := range rngs {
testutil.Equals(t, NotFoundRange, rng)
}

rngs, err = br.PostingsOffsets("a", "1", "10", "9")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 3)
testutil.Assert(t, rngs[0].End > rngs[0].Start)
testutil.Assert(t, rngs[2].End > rngs[2].Start)
testutil.Equals(t, NotFoundRange, rngs[1])

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
Expand Down
13 changes: 13 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) {
return r.reader.IndexVersion()
}

// PostingsOffsets implements Reader.
func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()

if err := r.load(); err != nil {
return nil, err
}

r.usedAt.Store(time.Now().UnixNano())
return r.reader.PostingsOffsets(name, values...)
}

// PostingsOffset implements Reader.
func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) {
r.readerMx.RLock()
Expand Down
148 changes: 78 additions & 70 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,78 +722,86 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
func TestBucketStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")

testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) })

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, h.Close()) })
logger := log.NewNopLogger()

appendFn(h.Appender(context.Background()))

if h.NumSeries() == 0 {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)
for _, lazyExpandedPosting := range []bool{false, true} {
testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")

testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) })

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, h.Close()) })
logger := log.NewNopLogger()

appendFn(h.Appender(context.Background()))

if h.NumSeries() == 0 {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(tt, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
WithLazyExpandedPostings(lazyExpandedPosting),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
})
testutil.Ok(tt, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
})
}
}

func TestPrometheusStore_Acceptance(t *testing.T) {
Expand Down
Loading

0 comments on commit f6a3950

Please sign in to comment.