Skip to content

Commit

Permalink
store: add initial symbol tables support
Browse files Browse the repository at this point in the history
Add initial support for symbol tables; symbol tables are sent via hints
that are used to deduplicate strings. The main benefit of doing this is
to reduce network traffic && reduce number of allocations needed for all
of the different strings.

Strings are referenced by their number. In total, there could be
math.MaxUint64 strings. To avoid blocking in querier, symbol tables /
references are automatically adjusted depending on which store it is in
the stores slice. In other words, the whole math.MaxUint64 space is
divided into equal number of strings for each StoreAPI. If that limit is
breached then raw strings are sent instead. This is taken care of by the
LookupTable builder. We ensure that old versions are compatible with
this one by passing over the maximum number of allowed unique strings
via StoreRequest. If that limit is zero then we disable the building of
the lookup table.

This compression is beneficial in almost all cases. The worst case is
when there are a lot of unique metrics with unique strings in each
metric. However, I strongly believe that this will only happen 1% of
the time due to the nature of monitoring. This is because a set of
metrics will always have some identical dimensions i.e. same labels with
only one or two changing.

I have also attempted an alternative implementation whereas a `Label`
could be `oneof` compressed labels or the regular labels. However, the
implementation wasn't quite as elegant, cumbersome. This is much
cleaner.

For now, only streaming Sidecar + Thanos Store is implemented. We could
add support for this in other components as a follow-up.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Nov 18, 2022
1 parent fc2b800 commit a7553a1
Show file tree
Hide file tree
Showing 17 changed files with 1,280 additions and 191 deletions.
65 changes: 65 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package query

import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/types"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -208,6 +211,47 @@ type seriesServer struct {
seriesSet []storepb.Series
seriesSetStats storepb.SeriesStatsCounter
warnings []string
symbolTables []map[uint64]string

compressedSeriesSet []storepb.CompressedSeries
}

func (s *seriesServer) DecompressSeries() error {
for _, cs := range s.compressedSeriesSet {
newSeries := &storepb.Series{
Chunks: cs.Chunks,
}

lbls := labels.Labels{}

for _, cLabel := range cs.Labels {
var name, val string
for _, symTable := range s.symbolTables {
if foundName, ok := symTable[uint64(cLabel.NameRef)]; ok {
name = foundName
}

if foundValue, ok := symTable[uint64(cLabel.ValueRef)]; ok {
val = foundValue
}
}
if name == "" {
return fmt.Errorf("found no reference for name ref %d", cLabel.NameRef)
}
if val == "" {
return fmt.Errorf("found no reference for value ref %d", cLabel.ValueRef)
}

lbls = append(lbls, labels.Label{
Name: name,
Value: val,
})
}

newSeries.Labels = labelpb.ZLabelsFromPromLabels(lbls)
s.seriesSet = append(s.seriesSet, *newSeries)
}
return nil
}

func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
Expand All @@ -222,6 +266,23 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
return nil
}

if r.GetCompressedSeries() != nil {
s.compressedSeriesSet = append(s.compressedSeriesSet, *r.GetCompressedSeries())
return nil
}

if r.GetHints() != nil {
var seriesResponseHints hintspb.SeriesResponseHints

// Some other, unknown type. Skip it.
if err := types.UnmarshalAny(r.GetHints(), &seriesResponseHints); err != nil {
return nil
}

s.symbolTables = append(s.symbolTables, seriesResponseHints.StringSymbolTable)
return nil
}

// Unsupported field, skip.
return nil
}
Expand Down Expand Up @@ -369,6 +430,10 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
warns = append(warns, errors.New(w))
}

if err := resp.DecompressSeries(); err != nil {
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "decompressing series")
}

// Delete the metric's name from the result because that's what the
// PromQL does either way and we want our iterator to work with data
// that was either pushed down or not.
Expand Down
47 changes: 40 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ type BucketStore struct {
postingOffsetsInMemSampling int

// Enables hints in the Series() response.
enableSeriesResponseHints bool
enableQueriedBlocksHints bool

enableChunkHashCalculation bool
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func NewBucketStore(
partitioner: partitioner,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
enableQueriedBlocksHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
}

Expand Down Expand Up @@ -1083,6 +1083,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}
}

lookupTable := newLookupTableBuilder(req.MaximumStringSlots)

s.mtx.RLock()
for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
Expand All @@ -1100,7 +1102,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
b := b
gctx := gctx

if s.enableSeriesResponseHints {
if s.enableQueriedBlocksHints {
// Keep track of queried blocks.
resHints.AddQueriedBlock(b.meta.ULID)
}
Expand Down Expand Up @@ -1231,9 +1233,38 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}
series.Labels = labelpb.ZLabelsFromPromLabels(lset)
if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
return

var compressedResponse bool
compressedLabels := make([]labelpb.CompressedLabel, 0, len(lset))

for _, lbl := range lset {
nameRef, nerr := lookupTable.putString(lbl.Name)
valueRef, verr := lookupTable.putString(lbl.Value)

if nerr != nil || verr != nil {
compressedResponse = false
break
} else if compressedResponse && nerr == nil && verr == nil {
compressedLabels = append(compressedLabels, labelpb.CompressedLabel{
NameRef: nameRef,
ValueRef: valueRef,
})
}
}

if compressedResponse {
if err = srv.Send(storepb.NewCompressedSeriesResponse(&storepb.CompressedSeries{
Labels: compressedLabels,
Chunks: series.Chunks,
})); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
return
}
} else {
if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
return
}
}
}
if set.Err() != nil {
Expand All @@ -1246,7 +1277,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
err = nil
})

if s.enableSeriesResponseHints {
resHints.StringSymbolTable = lookupTable.getTable()

{
var anyHints *types.Any

if anyHints, err = types.MarshalAny(resHints); err != nil {
Expand Down
Loading

0 comments on commit a7553a1

Please sign in to comment.