Skip to content

Commit

Permalink
Disable dedup proxy in multi-tsdb
Browse files Browse the repository at this point in the history
The receiver manages independent TSDBs which do not have duplicated series.
For this reason it should be safe to disable deduplication of chunks and
reduce CPU usage for this path.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Oct 1, 2024
1 parent b31a637 commit 17a1c92
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func runReceive(

options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
store.WithoutDedup(),
}

proxy := store.NewProxyStore(
Expand Down
14 changes: 13 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
enableDedup bool
}

type proxyStoreMetrics struct {
Expand Down Expand Up @@ -126,6 +127,13 @@ func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
}
}

// WithoutDedup disabled chunk deduplication when streaming series.
func WithoutDedup() ProxyStoreOption {
return func(s *ProxyStore) {
s.enableDedup = false
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand Down Expand Up @@ -156,6 +164,7 @@ func NewProxyStore(
metrics: metrics,
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
enableDedup: true,
}

for _, option := range options {
Expand Down Expand Up @@ -307,7 +316,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.

level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";"))

respHeap := NewResponseDeduplicator(NewProxyResponseLoserTree(storeResponses...))
var respHeap seriesStream = NewProxyResponseLoserTree(storeResponses...)
if s.enableDedup {
respHeap = NewResponseDeduplicator(respHeap)
}

i := 0
for respHeap.Next() {
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

type seriesStream interface {
Next() bool
At() *storepb.SeriesResponse
}

type responseDeduplicator struct {
h *losertree.Tree[*storepb.SeriesResponse, respSet]
h seriesStream

bufferedSameSeries []*storepb.SeriesResponse

Expand All @@ -42,7 +47,7 @@ type responseDeduplicator struct {

// NewResponseDeduplicator returns a wrapper around a loser tree that merges duplicated series messages into one.
// It also deduplicates identical chunks identified by the same checksum from each series message.
func NewResponseDeduplicator(h *losertree.Tree[*storepb.SeriesResponse, respSet]) *responseDeduplicator {
func NewResponseDeduplicator(h seriesStream) *responseDeduplicator {
ok := h.Next()
var prev *storepb.SeriesResponse
if ok {
Expand Down

0 comments on commit 17a1c92

Please sign in to comment.