Skip to content

Commit

Permalink
store: Postings fetching optimizations (#2294)
Browse files Browse the repository at this point in the history
* Avoid fetching duplicate keys.
Simplified groups with add/remove keys.

Signed-off-by: Peter Štibraný <[email protected]>

* Added shortcuts

Signed-off-by: Peter Štibraný <[email protected]>

* Optimize away fetching of ALL postings, if possible.
Only remove postings for each key once.

Signed-off-by: Peter Štibraný <[email protected]>

* Don't do individual index.Without, but merge them first.

Signed-off-by: Peter Štibraný <[email protected]>

* Don't use map for fetching postings, but return slice instead.

This is in line with original code. Using a map was nicer,
but more expensive in terms of allocations and hashing
labels.

Signed-off-by: Peter Štibraný <[email protected]>

* Renamed 'all' to 'allRequested'.

Signed-off-by: Peter Štibraný <[email protected]>

* Typo

Signed-off-by: Peter Štibraný <[email protected]>

* Make linter happy.

Signed-off-by: Peter Štibraný <[email protected]>

* Added comment to fetchPostings.

Signed-off-by: Peter Štibraný <[email protected]>

* Group vars

Signed-off-by: Peter Štibraný <[email protected]>

* Comments

Signed-off-by: Peter Štibraný <[email protected]>

* Use allPostings and emptyPostings variables for special cases.

Signed-off-by: Peter Štibraný <[email protected]>

* Unify terminology to "special All postings"

Signed-off-by: Peter Štibraný <[email protected]>

* Address feedback.

Signed-off-by: Peter Štibraný <[email protected]>

* Added CHANGELOG.md entry.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix check for empty group.

Signed-off-by: Peter Štibraný <[email protected]>

* Comment

Signed-off-by: Peter Štibraný <[email protected]>

* Special All postings is now added as a new group

No special handling required anymore.

Signed-off-by: Peter Štibraný <[email protected]>

* Updated comment

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Mar 23, 2020
1 parent 594fed9 commit 29251fe
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 89 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.
- [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most.

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02

Expand Down
217 changes: 128 additions & 89 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,12 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) {
var postingGroups []*postingGroup
var (
postingGroups []*postingGroup
allRequested = false
hasAdds = false
keys []labels.Label
)

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
Expand All @@ -1320,23 +1325,71 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
return nil, errors.Wrap(err, "toPostingGroup")
}

// If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty
// postings would return no postings anyway.
// E.g. label="non-existing-value" returns empty group.
if !pg.addAll && len(pg.addKeys) == 0 {
return nil, nil
}

postingGroups = append(postingGroups, pg)
allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
keys = append(keys, pg.addKeys...)
keys = append(keys, pg.removeKeys...)
}

if len(postingGroups) == 0 {
return nil, nil
}

if err := r.fetchPostings(postingGroups); err != nil {
// We only need special All postings if there are no other adds. If there are, we can skip fetching
// special All postings completely.
if allRequested && !hasAdds {
// add group with label to fetch "special All postings".
name, value := index.AllPostingsKey()
allPostingsLabel := labels.Label{Name: name, Value: value}

postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil))
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(keys)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}

var postings []index.Postings
// Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys
// again, and this is exactly the same order as before (when building the groups), so we can simply
// use one incrementing index to fetch postings from returned slice.
postingIndex := 0

var groupAdds, groupRemovals []index.Postings
for _, g := range postingGroups {
postings = append(postings, g.Postings())
// We cannot add empty set to groupAdds, since they are intersected.
if len(g.addKeys) > 0 {
toMerge := make([]index.Postings, 0, len(g.addKeys))
for _, l := range g.addKeys {
toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex]))
postingIndex++
}

groupAdds = append(groupAdds, index.Merge(toMerge...))
}

for _, l := range g.removeKeys {
groupRemovals = append(groupRemovals, checkNilPosting(l, fetchedPostings[postingIndex]))
postingIndex++
}
}

ps, err := index.ExpandPostings(index.Intersect(postings...))
result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...))

ps, err := index.ExpandPostings(result)
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -1352,150 +1405,136 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
return ps, nil
}

// postingGroup keeps posting keys for single matcher. Logical result of the group is:
// If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case.
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
// This computation happens in ExpandedPostings.
type postingGroup struct {
keys labels.Labels
postings []index.Postings

aggregate func(postings []index.Postings) index.Postings
addAll bool
addKeys []labels.Label
removeKeys []labels.Label
}

func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup {
return &postingGroup{
keys: keys,
postings: make([]index.Postings, len(keys)),
aggregate: aggr,
addAll: addAll,
addKeys: addKeys,
removeKeys: removeKeys,
}
}

func (p *postingGroup) Fill(i int, posting index.Postings) {
p.postings[i] = posting
}

func (p *postingGroup) Postings() index.Postings {
if len(p.keys) == 0 {
return index.EmptyPostings()
func checkNilPosting(l labels.Label, p index.Postings) index.Postings {
if p == nil {
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
return index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l))
}

for i, posting := range p.postings {
if posting == nil {
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i]))
}
}

return p.aggregate(p.postings)
}

func merge(p []index.Postings) index.Postings {
return index.Merge(p...)
return p
}

func allWithout(p []index.Postings) index.Postings {
return index.Without(p[0], index.Merge(p[1:]...))
}
var (
allPostingsGroup = newPostingGroup(true, nil, nil)
emptyPostingsGroup = newPostingGroup(false, nil, nil)
)

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
var matchingLabels labels.Labels
// This matches any label value, and also series that don't have this label at all.
if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") {
return allPostingsGroup, nil
}

// NOT matching any value = match nothing. We can shortcut this easily.
if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") {
return emptyPostingsGroup, nil
}

// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555.
if m.Matches("") {
allName, allValue := index.AllPostingsKey()

matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
vals, err := lvalsFn(m.Name)
if err != nil {
return nil, err
}

var toRemove []labels.Label
for _, val := range vals {
if !m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val})
}
}

if len(matchingLabels) == 1 {
// This is known hack to return all series.
// Ask for x != <not existing value>. Allow for that as Prometheus does,
// even though it is expensive.
return newPostingGroup(matchingLabels, merge), nil
}

return newPostingGroup(matchingLabels, allWithout), nil
return newPostingGroup(true, nil, toRemove), nil
}

// Fast-path for equal matching.
if m.Type == labels.MatchEqual {
return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil
return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil
}

vals, err := lvalsFn(m.Name)
if err != nil {
return nil, err
}

var toAdd []labels.Label
for _, val := range vals {
if m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val})
}
}

return newPostingGroup(matchingLabels, merge), nil
return newPostingGroup(false, toAdd, nil), nil
}

type postingPtr struct {
groupID int
keyID int
ptr index.Range
keyID int
ptr index.Range
}

// fetchPostings fill postings requested by posting groups.
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
var ptrs []postingPtr

// Fetch postings from the cache with a single call.
keys := make([]labels.Label, 0)
for _, g := range groups {
keys = append(keys, g.keys...)
}
output := make([]index.Postings, len(keys))

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys)

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

_, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
for ix, key := range keys {
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

g.Fill(j, l)
continue
_, l, err := r.dec.Postings(b)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}

// Cache miss; save pointer for actual posting in index stored in object store.
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
if err == indexheader.NotFoundRangeErr {
// This block does not have any posting for given key.
g.Fill(j, index.EmptyPostings())
continue
}
output[ix] = l
continue
}

if err != nil {
return errors.Wrap(err, "index header PostingsOffset")
}
// Cache miss; save pointer for actual posting in index stored in object store.
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
if err == indexheader.NotFoundRangeErr {
// This block does not have any posting for given key.
output[ix] = index.EmptyPostings()
continue
}

r.stats.postingsToFetch++
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
if err != nil {
return nil, errors.Wrap(err, "index header PostingsOffset")
}

r.stats.postingsToFetch++
ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: ix})
}

sort.Slice(ptrs, func(i, j int) bool {
Expand Down Expand Up @@ -1543,8 +1582,8 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:]))
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes)
output[p.keyID] = newBigEndianPostings(pBytes[4:])
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1555,7 +1594,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
})
}

return g.Wait()
return output, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
Expand Down

0 comments on commit 29251fe

Please sign in to comment.