Skip to content

Commit

Permalink
fixes issue thanos-io#396, split response into chunks no bigger than …
Browse files Browse the repository at this point in the history
…2^16 to avoid overflowing XOR compression

Signed-off-by: Robert Sullivan <[email protected]>
  • Loading branch information
trevorwhitney authored and robertjsullivan committed Jan 8, 2019
1 parent 270d81f commit cc862d1
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 10 deletions.
43 changes: 33 additions & 10 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"path"
Expand Down Expand Up @@ -155,19 +156,24 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
continue
}

// We generally expect all samples of the requested range to be traversed
// so we just encode all samples into one big chunk regardless of size.
enc, cb, err := p.encodeChunk(e.Samples)
if err != nil {
return status.Error(codes.Unknown, err.Error())
sampleChunks := chunkSamples(e)

var aggregatedChunks []storepb.AggrChunk
for _, sampleChunk := range sampleChunks {
enc, cb, err := p.encodeChunk(sampleChunk)
if err != nil {
return status.Error(codes.Unknown, err.Error())
}
aggregatedChunks = append(aggregatedChunks, storepb.AggrChunk{
MinTime: int64(sampleChunk[0].Timestamp),
MaxTime: int64(sampleChunk[len(sampleChunk)-1].Timestamp),
Raw: &storepb.Chunk{Type: enc, Data: cb},
})
}

resp := storepb.NewSeriesResponse(&storepb.Series{
Labels: lset,
Chunks: []storepb.AggrChunk{{
MinTime: int64(e.Samples[0].Timestamp),
MaxTime: int64(e.Samples[len(e.Samples)-1].Timestamp),
Raw: &storepb.Chunk{Type: enc, Data: cb},
}},
Chunks: aggregatedChunks,
})
if err := s.Send(resp); err != nil {
return err
Expand All @@ -176,6 +182,23 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return nil
}

// XoR encoding supports a max size of 2^16 - 1 samples, so we need
// to chunk all samples into groups of no more than 2^16 - 1
func chunkSamples(series prompb.TimeSeries) [][]prompb.Sample {
var sampleChunks [][]prompb.Sample
var currentSampleChunk []prompb.Sample
for i, sample := range series.Samples {
if i%math.MaxUint16 == 0 && i != 0 {
sampleChunks = append(sampleChunks, currentSampleChunk)
currentSampleChunk = []prompb.Sample{sample}
} else {
currentSampleChunk = append(currentSampleChunk, sample)
}
}
sampleChunks = append(sampleChunks, currentSampleChunk)
return sampleChunks
}

func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prompb.ReadResponse, error) {
span, ctx := tracing.StartSpan(ctx, "query_prometheus")
defer span.Finish()
Expand Down
66 changes: 66 additions & 0 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"fmt"
"math"
"net/url"
"testing"
"time"
Expand Down Expand Up @@ -272,3 +273,68 @@ func TestPrometheusStore_Info(t *testing.T) {
testutil.Equals(t, int64(123), resp.MinTime)
testutil.Equals(t, int64(456), resp.MaxTime)
}

// Regression test for https://github.com/improbable-eng/thanos/issues/396.
func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

p, err := testutil.NewPrometheus()
testutil.Ok(t, err)

baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000

a := p.Appender()

offset := int64(math.MaxUint16 + 5)
for i := int64(0); i < offset; i++ {
_, err = a.Add(labels.FromStrings("a", "b"), baseT + i, 1)
testutil.Ok(t, err)
}

testutil.Ok(t, a.Commit())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, u,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
}, nil)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT,
MaxTime: baseT + offset,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv)
testutil.Ok(t, err)

testutil.Equals(t, 1, len(srv.SeriesSet))

firstSeries := srv.SeriesSet[0]

testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, firstSeries.Labels)

testutil.Equals(t, 2, len(firstSeries.Chunks))

chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, math.MaxUint16, chunk.NumSamples())

chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 5, chunk.NumSamples())
}

0 comments on commit cc862d1

Please sign in to comment.