Skip to content

Commit

Permalink
update code again
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jul 21, 2023
1 parent 513126a commit 9aa311e
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 107 deletions.
2 changes: 1 addition & 1 deletion pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Reader interface {
IndexVersion() (int, error)

// PostingsOffsets returns start and end offsets for postings for given name and values.
// Input values need to be sorted. If a posting doesn't exist, posting wiht start and end
// Input values need to be sorted. If a posting doesn't exist, posting with start and end
// both set to -1 will be returned.
PostingsOffsets(name string, value ...string) ([]index.Range, error)

Expand Down
4 changes: 3 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2252,6 +2252,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
return r
}

// IndexVersion caches the index header version.
func (r *bucketIndexReader) IndexVersion() (int, error) {
if r.indexVersion != 0 {
return r.indexVersion, nil
Expand Down Expand Up @@ -2308,9 +2309,10 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
for _, pg := range postingGroups {
allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

// If a posting group doesn't have any keys, like posting group created
// from `=~".*"`, we don't have to keep the posting group as long as we
// we keep track of whether we need to add all postings or not.
// keep track of whether we need to add all postings or not.
if len(pg.addKeys) == 0 && len(pg.removeKeys) == 0 {
continue
}
Expand Down
122 changes: 60 additions & 62 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,71 +1686,69 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {

func TestSeries_RequestAndResponseHints(t *testing.T) {
tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t)
_ = seriesSet2
_ = block2
defer close()

testCases := []*storetestutil.SeriesCase{
//{
// Name: "querying a range containing 1 block should return 1 block in the response hints",
// Req: &storepb.SeriesRequest{
// MinTime: 0,
// MaxTime: 1,
// Matchers: []storepb.LabelMatcher{
// {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
// },
// },
// ExpectedSeries: seriesSet1,
// ExpectedHints: []hintspb.SeriesResponseHints{
// {
// QueriedBlocks: []hintspb.Block{
// {Id: block1.String()},
// },
// },
// },
//},
//{
// Name: "querying a range containing multiple blocks should return multiple blocks in the response hints",
// Req: &storepb.SeriesRequest{
// MinTime: 0,
// MaxTime: 3,
// Matchers: []storepb.LabelMatcher{
// {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
// },
// },
// ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
// ExpectedHints: []hintspb.SeriesResponseHints{
// {
// QueriedBlocks: []hintspb.Block{
// {Id: block1.String()},
// {Id: block2.String()},
// },
// },
// },
//},
//{
// Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block",
// Req: &storepb.SeriesRequest{
// MinTime: 0,
// MaxTime: 3,
// Matchers: []storepb.LabelMatcher{
// {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
// },
// Hints: mustMarshalAny(&hintspb.SeriesRequestHints{
// BlockMatchers: []storepb.LabelMatcher{
// {Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()},
// },
// }),
// },
// ExpectedSeries: seriesSet1,
// ExpectedHints: []hintspb.SeriesResponseHints{
// {
// QueriedBlocks: []hintspb.Block{
// {Id: block1.String()},
// },
// },
// },
//},
{
Name: "querying a range containing 1 block should return 1 block in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
ExpectedSeries: seriesSet1,
ExpectedHints: []hintspb.SeriesResponseHints{
{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
},
},
},
{
Name: "querying a range containing multiple blocks should return multiple blocks in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: []hintspb.SeriesResponseHints{
{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
{Id: block2.String()},
},
},
},
},
{
Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
Hints: mustMarshalAny(&hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()},
},
}),
},
ExpectedSeries: seriesSet1,
ExpectedHints: []hintspb.SeriesResponseHints{
{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
},
},
},
{
Name: "Query Stats Enabled",
Req: &storepb.SeriesRequest{
Expand Down
89 changes: 46 additions & 43 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil}

// lazyExpandedPostings contains expanded postings (series IDs). If lazy posting expansion is
// enabled, it might contain matchers that can be lazily applied during series filtering time.
type lazyExpandedPostings struct {
postings []storage.SeriesRef
matchers []*labels.Matcher
Expand All @@ -33,7 +35,9 @@ func (p *lazyExpandedPostings) lazyExpanded() bool {
}

func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64) ([]*postingGroup, error) {
// Collect posting cardinality of each posting groups.
for _, pg := range postingGroups {
// A posting group can have either add keys or remove keys but not both the same time.
vals := pg.addKeys
if len(pg.removeKeys) > 0 {
vals = pg.removeKeys
Expand All @@ -51,54 +55,53 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups
})

/*
Algorithm of choosing what postings we need to fetch right now and what
postings we expand lazily.
Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size.
The algorithm focuses on fetching fewer data, including postings and series.
We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group,
the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1
and S is the size per series. 4 is the byte size per posting.
If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully).
Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is
formula F2: P1 * 4 + P2 * 4 + P1 * S * R.
We can get formula F3 if we are going to fetch 3 posting groups:
F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2.
Let's compare formula F2 and F1 first.
P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S
=> P2 * 4 < P1 * S * (1 - R)
Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch
by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that
the additional postings size is smaller.
Let's compare formula F3 and F2.
P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R
=> P3 * 4 < P1 * S * R * (1 - R)
Same as the previous formula.
Compare formula F4 (Cost to fetch up to 4 posting groups) and F3.
P4 * 4 < P1 * S * R^2 * (1 - R)
We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R)
The idea of the algorithm:
By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group,
the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group,
otherwise we stop.
This ensures that when we stop at one posting group, posting groups after it always need to fetch more data.
Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger
posting groups while right hand side value is always decreasing as R < 1.
Algorithm of choosing what postings we need to fetch right now and what
postings we expand lazily.
Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size.
The algorithm focuses on fetching fewer data, including postings and series.
We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group,
the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1
and S is the size per series. 4 is the byte size per posting.
If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully).
Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is
formula F2: P1 * 4 + P2 * 4 + P1 * S * R.
We can get formula F3 if we are going to fetch 3 posting groups:
F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2.
Let's compare formula F2 and F1 first.
P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S
=> P2 * 4 < P1 * S * (1 - R)
Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch
by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that
the additional postings size is smaller.
Let's compare formula F3 and F2.
P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R
=> P3 * 4 < P1 * S * R * (1 - R)
Same as the previous formula.
Compare formula F4 (Cost to fetch up to 4 posting groups) and F3.
P4 * 4 < P1 * S * R^2 * (1 - R)
We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R)
The idea of the algorithm:
By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group,
the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group,
otherwise we stop.
This ensures that when we stop at one posting group, posting groups after it always need to fetch more data.
Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger
posting groups while right hand side value is always decreasing as R < 1.
*/
// The maximum number of series we could match for this query can be estimated
// to the posting group with the lowest cardinality times * series size.
seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize
p := float64(1)
i := 1
i := 1 // Start from index 1 as we always need to fetch the smallest posting group.
for i < len(postingGroups) {
pg := postingGroups[i]
// Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers.
if pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) {
break
}
Expand Down

0 comments on commit 9aa311e

Please sign in to comment.