diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 990d0a2bbb..cbcc8e0189 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "path" @@ -155,19 +156,16 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie continue } - // We generally expect all samples of the requested range to be traversed - // so we just encode all samples into one big chunk regardless of size. - enc, cb, err := p.encodeChunk(e.Samples) + // XOR encoding supports a max size of 2^16 - 1 samples, so we need + // to chunk all samples into groups of no more than 2^16 - 1 + aggregatedChunks, err := p.chunkSamples(e, math.MaxUint16) if err != nil { - return status.Error(codes.Unknown, err.Error()) + return err } + resp := storepb.NewSeriesResponse(&storepb.Series{ Labels: lset, - Chunks: []storepb.AggrChunk{{ - MinTime: int64(e.Samples[0].Timestamp), - MaxTime: int64(e.Samples[len(e.Samples)-1].Timestamp), - Raw: &storepb.Chunk{Type: enc, Data: cb}, - }}, + Chunks: aggregatedChunks, }) if err := s.Send(resp); err != nil { return err @@ -176,6 +174,33 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return nil } +func(p *PrometheusStore) chunkSamples(series prompb.TimeSeries, samplesPerChunk int) ([]storepb.AggrChunk, error) { + var aggregatedChunks []storepb.AggrChunk + samples := series.Samples + + for len(samples) > 0 { + chunkSize := len(samples) + if chunkSize > samplesPerChunk { + chunkSize = samplesPerChunk + } + + enc, cb, err := p.encodeChunk(samples[:chunkSize]) + if err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + + aggregatedChunks = append(aggregatedChunks, storepb.AggrChunk{ + MinTime: int64(samples[0].Timestamp), + MaxTime: int64(samples[chunkSize-1].Timestamp), + Raw: &storepb.Chunk{Type: enc, Data: cb}, + }) + + samples = samples[chunkSize:] + } + + return aggregatedChunks, nil +} + func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prompb.ReadResponse, error) { span, ctx := tracing.StartSpan(ctx, "query_prometheus") defer span.Finish() diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 2bfca8876a..567d6a5fe1 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -3,6 +3,7 @@ package store import ( "context" "fmt" + "math" "net/url" "testing" "time" @@ -272,3 +273,71 @@ func TestPrometheusStore_Info(t *testing.T) { testutil.Equals(t, int64(123), resp.MinTime) testutil.Equals(t, int64(456), resp.MaxTime) } + +// Regression test for https://github.com/improbable-eng/thanos/issues/396. +func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + p, err := testutil.NewPrometheus() + testutil.Ok(t, err) + + baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000 + + a := p.Appender() + + offset := int64(2*math.MaxUint16 + 5) + for i := int64(0); i < offset; i++ { + _, err = a.Add(labels.FromStrings("a", "b"), baseT+i, 1) + testutil.Ok(t, err) + } + + testutil.Ok(t, a.Commit()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testutil.Ok(t, p.Start()) + defer func() { testutil.Ok(t, p.Stop()) }() + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + proxy, err := NewPrometheusStore(nil, nil, u, + func() labels.Labels { + return labels.FromStrings("region", "eu-west") + }, nil) + testutil.Ok(t, err) + srv := newStoreSeriesServer(ctx) + + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT, + MaxTime: baseT + offset, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, + }, + }, srv)) + + testutil.Equals(t, 1, len(srv.SeriesSet)) + + firstSeries := srv.SeriesSet[0] + + testutil.Equals(t, []storepb.Label{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + }, firstSeries.Labels) + + testutil.Equals(t, 3, len(firstSeries.Chunks)) + + chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, math.MaxUint16, chunk.NumSamples()) + + chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, math.MaxUint16, chunk.NumSamples()) + + chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[2].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, 5, chunk.NumSamples()) +}