From cc862d1a1e4bb02e37984b14fc1a64c3ae998544 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 8 Jan 2019 15:45:01 -0700 Subject: [PATCH 1/3] fixes issue #396, split response into chunks no bigger than 2^16 to avoid overflowing XOR compression Signed-off-by: Robert Sullivan --- pkg/store/prometheus.go | 43 +++++++++++++++++------ pkg/store/prometheus_test.go | 66 ++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 10 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 990d0a2bbb..f060e51344 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "path" @@ -155,19 +156,24 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie continue } - // We generally expect all samples of the requested range to be traversed - // so we just encode all samples into one big chunk regardless of size. - enc, cb, err := p.encodeChunk(e.Samples) - if err != nil { - return status.Error(codes.Unknown, err.Error()) + sampleChunks := chunkSamples(e) + + var aggregatedChunks []storepb.AggrChunk + for _, sampleChunk := range sampleChunks { + enc, cb, err := p.encodeChunk(sampleChunk) + if err != nil { + return status.Error(codes.Unknown, err.Error()) + } + aggregatedChunks = append(aggregatedChunks, storepb.AggrChunk{ + MinTime: int64(sampleChunk[0].Timestamp), + MaxTime: int64(sampleChunk[len(sampleChunk)-1].Timestamp), + Raw: &storepb.Chunk{Type: enc, Data: cb}, + }) } + resp := storepb.NewSeriesResponse(&storepb.Series{ Labels: lset, - Chunks: []storepb.AggrChunk{{ - MinTime: int64(e.Samples[0].Timestamp), - MaxTime: int64(e.Samples[len(e.Samples)-1].Timestamp), - Raw: &storepb.Chunk{Type: enc, Data: cb}, - }}, + Chunks: aggregatedChunks, }) if err := s.Send(resp); err != nil { return err @@ -176,6 +182,23 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return nil } +// XoR encoding supports a max size of 2^16 - 1 samples, so we need +// to chunk all samples into groups of no more than 2^16 - 1 +func chunkSamples(series prompb.TimeSeries) [][]prompb.Sample { + var sampleChunks [][]prompb.Sample + var currentSampleChunk []prompb.Sample + for i, sample := range series.Samples { + if i%math.MaxUint16 == 0 && i != 0 { + sampleChunks = append(sampleChunks, currentSampleChunk) + currentSampleChunk = []prompb.Sample{sample} + } else { + currentSampleChunk = append(currentSampleChunk, sample) + } + } + sampleChunks = append(sampleChunks, currentSampleChunk) + return sampleChunks +} + func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prompb.ReadResponse, error) { span, ctx := tracing.StartSpan(ctx, "query_prometheus") defer span.Finish() diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 2bfca8876a..34a2a9a446 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -3,6 +3,7 @@ package store import ( "context" "fmt" + "math" "net/url" "testing" "time" @@ -272,3 +273,68 @@ func TestPrometheusStore_Info(t *testing.T) { testutil.Equals(t, int64(123), resp.MinTime) testutil.Equals(t, int64(456), resp.MaxTime) } + +// Regression test for https://github.com/improbable-eng/thanos/issues/396. +func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + p, err := testutil.NewPrometheus() + testutil.Ok(t, err) + + baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000 + + a := p.Appender() + + offset := int64(math.MaxUint16 + 5) + for i := int64(0); i < offset; i++ { + _, err = a.Add(labels.FromStrings("a", "b"), baseT + i, 1) + testutil.Ok(t, err) + } + + testutil.Ok(t, a.Commit()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testutil.Ok(t, p.Start()) + defer func() { testutil.Ok(t, p.Stop()) }() + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + proxy, err := NewPrometheusStore(nil, nil, u, + func() labels.Labels { + return labels.FromStrings("region", "eu-west") + }, nil) + testutil.Ok(t, err) + srv := newStoreSeriesServer(ctx) + + err = proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT, + MaxTime: baseT + offset, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, + }, + }, srv) + testutil.Ok(t, err) + + testutil.Equals(t, 1, len(srv.SeriesSet)) + + firstSeries := srv.SeriesSet[0] + + testutil.Equals(t, []storepb.Label{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + }, firstSeries.Labels) + + testutil.Equals(t, 2, len(firstSeries.Chunks)) + + chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, math.MaxUint16, chunk.NumSamples()) + + chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, 5, chunk.NumSamples()) +} From b2cf675ea8d98134f76094fa0acd2400deec57e2 Mon Sep 17 00:00:00 2001 From: Max Eshleman Date: Wed, 9 Jan 2019 10:54:44 -0700 Subject: [PATCH 2/3] use single underlying array for chunkSamples --- pkg/store/prometheus.go | 54 +++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index f060e51344..8c2010a0f2 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -156,19 +156,11 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie continue } - sampleChunks := chunkSamples(e) - - var aggregatedChunks []storepb.AggrChunk - for _, sampleChunk := range sampleChunks { - enc, cb, err := p.encodeChunk(sampleChunk) - if err != nil { - return status.Error(codes.Unknown, err.Error()) - } - aggregatedChunks = append(aggregatedChunks, storepb.AggrChunk{ - MinTime: int64(sampleChunk[0].Timestamp), - MaxTime: int64(sampleChunk[len(sampleChunk)-1].Timestamp), - Raw: &storepb.Chunk{Type: enc, Data: cb}, - }) + // XoR encoding supports a max size of 2^16 - 1 samples, so we need + // to chunk all samples into groups of no more than 2^16 - 1 + aggregatedChunks, err := p.chunkSamples(e, math.MaxUint16) + if err != nil { + return err } resp := storepb.NewSeriesResponse(&storepb.Series{ @@ -182,21 +174,31 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return nil } -// XoR encoding supports a max size of 2^16 - 1 samples, so we need -// to chunk all samples into groups of no more than 2^16 - 1 -func chunkSamples(series prompb.TimeSeries) [][]prompb.Sample { - var sampleChunks [][]prompb.Sample - var currentSampleChunk []prompb.Sample - for i, sample := range series.Samples { - if i%math.MaxUint16 == 0 && i != 0 { - sampleChunks = append(sampleChunks, currentSampleChunk) - currentSampleChunk = []prompb.Sample{sample} - } else { - currentSampleChunk = append(currentSampleChunk, sample) +func(p *PrometheusStore) chunkSamples(series prompb.TimeSeries, samplesPerChunk int) ([]storepb.AggrChunk, error) { + var aggregatedChunks []storepb.AggrChunk + samples := series.Samples + + for len(samples) > 0 { + chunkSize := len(samples) + if chunkSize > samplesPerChunk { + chunkSize = samplesPerChunk + } + + enc, cb, err := p.encodeChunk(samples[:chunkSize]) + if err != nil { + return nil, status.Error(codes.Unknown, err.Error()) } + + aggregatedChunks = append(aggregatedChunks, storepb.AggrChunk{ + MinTime: int64(samples[0].Timestamp), + MaxTime: int64(samples[chunkSize-1].Timestamp), + Raw: &storepb.Chunk{Type: enc, Data: cb}, + }) + + samples = samples[chunkSize:] } - sampleChunks = append(sampleChunks, currentSampleChunk) - return sampleChunks + + return aggregatedChunks, nil } func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prompb.ReadResponse, error) { From 6b7af4bb9af2867f47fb92632fc5fd932accff4f Mon Sep 17 00:00:00 2001 From: Max Eshleman Date: Wed, 9 Jan 2019 13:53:52 -0700 Subject: [PATCH 3/3] make SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e use 3 chunks --- pkg/store/prometheus.go | 2 +- pkg/store/prometheus_test.go | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 8c2010a0f2..cbcc8e0189 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -156,7 +156,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie continue } - // XoR encoding supports a max size of 2^16 - 1 samples, so we need + // XOR encoding supports a max size of 2^16 - 1 samples, so we need // to chunk all samples into groups of no more than 2^16 - 1 aggregatedChunks, err := p.chunkSamples(e, math.MaxUint16) if err != nil { diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 34a2a9a446..567d6a5fe1 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -285,9 +285,9 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t a := p.Appender() - offset := int64(math.MaxUint16 + 5) + offset := int64(2*math.MaxUint16 + 5) for i := int64(0); i < offset; i++ { - _, err = a.Add(labels.FromStrings("a", "b"), baseT + i, 1) + _, err = a.Add(labels.FromStrings("a", "b"), baseT+i, 1) testutil.Ok(t, err) } @@ -309,15 +309,14 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) - err = proxy.Series(&storepb.SeriesRequest{ + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ MinTime: baseT, MaxTime: baseT + offset, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, }, - }, srv) - testutil.Ok(t, err) + }, srv)) testutil.Equals(t, 1, len(srv.SeriesSet)) @@ -328,7 +327,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t {Name: "region", Value: "eu-west"}, }, firstSeries.Labels) - testutil.Equals(t, 2, len(firstSeries.Chunks)) + testutil.Equals(t, 3, len(firstSeries.Chunks)) chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data) testutil.Ok(t, err) @@ -336,5 +335,9 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data) testutil.Ok(t, err) + testutil.Equals(t, math.MaxUint16, chunk.NumSamples()) + + chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[2].Raw.Data) + testutil.Ok(t, err) testutil.Equals(t, 5, chunk.NumSamples()) }