Skip to content

Commit

Permalink
Resort store response set on internal label dedup
Browse files Browse the repository at this point in the history
When deduplicating on labels which are stored internally in TSDB,
the store response set needs to be resorted after replica labels are removed.

In order to detect when deduplication by internal labels happens, this PR adds a
bloom filter with all label names to the Info response. When a replica label is present
in this bloom filter for an individual store, the proxy heap would resort a response set from
that store before merging in the result with the rest of the set.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Apr 25, 2023
1 parent 6d838e7 commit e0a0529
Show file tree
Hide file tree
Showing 16 changed files with 548 additions and 81 deletions.
60 changes: 51 additions & 9 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/thanos-io/objstore/client"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/bloom"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/extkingpin"
Expand Down Expand Up @@ -109,8 +110,9 @@ func runSidecar(
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
labelNamesBloom: bloom.NewAlwaysTrueFilter(),
}

confContentYaml, err := conf.objStore.Content()
Expand Down Expand Up @@ -234,6 +236,19 @@ func runSidecar(
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names bloom filter update")

m.UpdateLabelNamesBloom(context.Background())

level.Debug(logger).Log("msg", "Finished label names bloom filter update")
return nil
})
}, func(err error) {
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -246,7 +261,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesBloom, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand All @@ -267,11 +282,13 @@ func runSidecar(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
labelNamesBloom := promStore.LabelNamesBloom()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
LabelNamesBloom: infopb.NewBloomFilter(labelNamesBloom),
}
}
return nil
Expand Down Expand Up @@ -405,15 +422,16 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string
limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client

labelNamesBloom bloom.Filter
}

func (s *promMetadata) UpdateLabels(ctx context.Context) error {
Expand Down Expand Up @@ -441,6 +459,30 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNamesBloom(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labelNamesBloom = bloom.NewAlwaysTrueFilter()
return
}

filter := bloom.NewFilterForStrings(labelNames...)
s.mtx.Lock()
s.labelNamesBloom = filter
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesBloom() bloom.Filter {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labelNamesBloom
}

func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ require (
golang.org/x/exp v0.0.0-20230307190834-24139beb5833
)

require go4.org/unsafe/assume-no-moving-gc v0.0.0-20230209150437-ee73d164e760 // indirect
require (
github.com/bits-and-blooms/bloom v2.0.3+incompatible // indirect
github.com/willf/bitset v1.1.11 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230209150437-ee73d164e760 // indirect
)

require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bloom v2.0.3+incompatible h1:3ONZFjJoMyfHDil5iCcNkcPJ//PNNo+55RHvPrfUGnY=
github.com/bits-and-blooms/bloom v2.0.3+incompatible/go.mod h1:nEmPH2pqJb3sCXfd7cyDSKC4iPfCAt312JHgNrtnnDE=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
Expand Down Expand Up @@ -920,6 +922,8 @@ github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d h1:9Z/HiqeGN+LOn
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d/go.mod h1:Fnq3+U51tMkPRMC6Wr7zKGUeFFYX4YjNrNK50iU0fcE=
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA=
github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
Expand Down
67 changes: 67 additions & 0 deletions pkg/bloom/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package bloom

import (
"bytes"

"github.com/bits-and-blooms/bloom"
)

const FilterErrorRate = 0.01

type Filter interface {
Test(string) bool
Bytes() []byte
}

type filter struct {
bloom *bloom.BloomFilter
}

func NewFromBytes(bloomBytes []byte) Filter {
if bloomBytes == nil {
return NewAlwaysTrueFilter()
}

bloomFilter := &bloom.BloomFilter{}
byteReader := bytes.NewReader(bloomBytes)
if _, err := bloomFilter.ReadFrom(byteReader); err != nil {
return NewAlwaysTrueFilter()
}

return &filter{bloom: bloomFilter}
}

func NewFilterForStrings(items ...string) Filter {
bloomFilter := bloom.NewWithEstimates(uint(len(items)), FilterErrorRate)
for _, label := range items {
bloomFilter.AddString(label)
}

return &filter{bloom: bloomFilter}
}

func (f filter) Bytes() []byte {
var buf bytes.Buffer
if _, err := f.bloom.WriteTo(&buf); err != nil {
return nil
}
return buf.Bytes()
}

func (f filter) Test(s string) bool {
return f.bloom.TestString(s)
}

type alwaysTrueFilter struct{}

func NewAlwaysTrueFilter() *alwaysTrueFilter {
return &alwaysTrueFilter{}
}

func (e alwaysTrueFilter) Test(s string) bool {
return true
}

func (e alwaysTrueFilter) Bytes() []byte {
return nil
}
11 changes: 11 additions & 0 deletions pkg/info/infopb/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package infopb

import (
"github.com/thanos-io/thanos/pkg/bloom"
)

func NewBloomFilter(filter bloom.Filter) *BloomFilter {
return &BloomFilter{
BloomFilterData: filter.Bytes(),
}
}
Loading

0 comments on commit e0a0529

Please sign in to comment.