Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move index values cache to bucket level to prevent race condition #306

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,6 @@ type postingOffset struct {
tableOff int
}

const valueSymbolsCacheSize = 1024

type BinaryReader struct {
b index.ByteSlice
toc *BinaryTOC
Expand All @@ -438,12 +436,6 @@ type BinaryReader struct {
// Cache of the label name symbol lookups,
// as there are not many and they are half of all lookups.
nameSymbols map[uint32]string
// Direct cache of values. This is much faster than an LRU cache and still provides
// a reasonable cache hit ratio.
valueSymbols [valueSymbolsCacheSize]struct {
index uint32
symbol string
}

dec *index.Decoder

Expand Down Expand Up @@ -646,12 +638,12 @@ func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) {
}, nil
}

func (r BinaryReader) IndexVersion() (int, error) {
func (r *BinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}

// TODO(bwplotka): Get advantage of multi value offset fetch.
func (r BinaryReader) PostingsOffset(name string, value string) (index.Range, error) {
func (r *BinaryReader) PostingsOffset(name string, value string) (index.Range, error) {
rngs, err := r.postingsOffset(name, value)
if err != nil {
return index.Range{}, err
Expand All @@ -674,7 +666,7 @@ func skipNAndName(d *encoding.Decbuf, buf *int) {
}
d.Skip(*buf)
}
func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) {
func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) {
rngs := make([]index.Range, 0, len(values))
if r.indexVersion == index.FormatV1 {
e, ok := r.postingsV1[name]
Expand Down Expand Up @@ -811,11 +803,6 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
}

func (r *BinaryReader) LookupSymbol(o uint32) (string, error) {
cacheIndex := o % valueSymbolsCacheSize
if cached := r.valueSymbols[cacheIndex]; cached.index == o && cached.symbol != "" {
return cached.symbol, nil
}

if s, ok := r.nameSymbols[o]; ok {
return s, nil
}
Expand All @@ -826,16 +813,10 @@ func (r *BinaryReader) LookupSymbol(o uint32) (string, error) {
o += headerLen - index.HeaderLen
}

s, err := r.symbols.Lookup(o)
if err != nil {
return s, err
}
r.valueSymbols[cacheIndex].index = o
r.valueSymbols[cacheIndex].symbol = s
return s, nil
return r.symbols.Lookup(o)
}

func (r BinaryReader) LabelValues(name string) ([]string, error) {
func (r *BinaryReader) LabelValues(name string) ([]string, error) {
if r.indexVersion == index.FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
Expand Down Expand Up @@ -891,7 +872,7 @@ func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}

func (r BinaryReader) LabelNames() ([]string, error) {
func (r *BinaryReader) LabelNames() ([]string, error) {
allPostingsKeyName, _ := index.AllPostingsKey()
labelNames := make([]string, 0, len(r.postings))
for name := range r.postings {
Expand Down
41 changes: 38 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,15 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
return res, true
}

const valueSymbolsCacheSize = 1024

var zeroIndexSymbol = indexSymbol{index: 0, symbol: ""}

type indexSymbol struct {
index uint32
symbol string
}

// bucketBlock represents a block that is located in a bucket. It holds intermediate
// state for the block on local disk.
type bucketBlock struct {
Expand All @@ -1388,6 +1397,9 @@ type bucketBlock struct {
// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
// request hints' BlockMatchers.
relabelLabels labels.Labels

valuesMutex sync.RWMutex
valueSymbols [valueSymbolsCacheSize]indexSymbol
}

func newBucketBlock(
Expand Down Expand Up @@ -1535,6 +1547,7 @@ type bucketIndexReader struct {

mtx sync.Mutex
loadedSeries map[uint64][]byte
valueSymbols [valueSymbolsCacheSize]indexSymbol
}

func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexReader {
Expand All @@ -1547,6 +1560,11 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR
stats: &queryStats{},
loadedSeries: map[uint64][]byte{},
}
block.valuesMutex.RLock()
for k, v := range block.valueSymbols {
r.valueSymbols[k] = v
}
block.valuesMutex.RUnlock()
return r
}

Expand Down Expand Up @@ -2104,6 +2122,15 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref uint64, lset *[]symbolizedLabe
// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()

r.block.valuesMutex.Lock()
for k, v := range r.valueSymbols {
// Only update the cache if the block cache entry is not set.
if v != zeroIndexSymbol && r.block.valueSymbols[k] == zeroIndexSymbol {
r.block.valueSymbols[k] = v
}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a problem. Not sure which way is better. Overwrite the entry every time?

}
r.block.valuesMutex.Unlock()
return nil
}

Expand All @@ -2115,9 +2142,17 @@ func (r *bucketIndexReader) LookupLabelsSymbols(symbolized []symbolizedLabel, lb
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := r.dec.LookupSymbol(s.value)
if err != nil {
return errors.Wrap(err, "lookup label value")
var lv string
cacheIndex := s.value % valueSymbolsCacheSize
if cached := r.valueSymbols[cacheIndex]; cached.index == s.value && cached.symbol != "" {
lv = cached.symbol
} else {
lv, err = r.dec.LookupSymbol(s.value)
if err != nil {
return errors.Wrap(err, "lookup label value")
}
r.valueSymbols[cacheIndex].index = s.value
r.valueSymbols[cacheIndex].symbol = lv
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
Expand Down