Skip to content

Commit

Permalink
sidecar: Added support for streaming, chunked remote read.
Browse files Browse the repository at this point in the history
Fixes: #488

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jun 21, 2019
1 parent 38a9da0 commit f5ddb66
Show file tree
Hide file tree
Showing 12 changed files with 1,962 additions and 538 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ test-deps:
$(foreach ver,$(PROM_VERSIONS),$(call fetch_go_bin_version,github.com/prometheus/prometheus/cmd/prometheus,$(ver)))
$(call fetch_go_bin_version,github.com/prometheus/alertmanager/cmd/alertmanager,$(ALERTMANAGER_VERSION))
$(call fetch_go_bin_version,github.com/minio/minio,$(MINIO_SERVER_VERSION))
$(call fetch_go_bin_version,github.com/bplotka/prometheus/cmd/prometheus,v2.10.0-rr1))

# vet vets the code.
.PHONY: vet
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
module github.com/improbable-eng/thanos

// v2.10.0-rr1
replace github.com/prometheus/prometheus => github.com/bplotka/prometheus v0.0.0-20190621085242-f7821d582002

require (
cloud.google.com/go v0.34.0
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c
Expand Down Expand Up @@ -32,15 +35,14 @@ require (
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/prometheus/common v0.4.0
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/prometheus/prometheus v2.9.2+incompatible
github.com/prometheus/tsdb v0.8.0
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
go.uber.org/atomic v1.4.0 // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
Expand Down
111 changes: 67 additions & 44 deletions go.sum

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,37 +246,37 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelValues()")
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Values, nil
return resp.Values, nil, nil
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *querier) LabelNames() ([]string, error) {
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Names, nil
return resp.Names, nil, nil
}

func (q *querier) Close() error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st
errs = append(errs, errors.Errorf("no updater found for %v", s))
continue
}
if err := updater.Update(evalInterval, fs); err != nil {
// TODO(bwplotka): Investigate if we should put ext labels here or not.
if err := updater.Update(evalInterval, fs, nil); err != nil {
errs = append(errs, err)
continue
}
Expand Down
232 changes: 164 additions & 68 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ import (
"net/url"
"path"
"sort"
"strings"
"sync"

"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/pkg/errors"
Expand Down Expand Up @@ -124,12 +127,12 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
if !match {
return nil
}
q := prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}
q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}

// TODO(fabxc): import common definitions from prompb once we have a stable gRPC
// query API there.
for _, m := range newMatchers {
pm := prompb.LabelMatcher{Name: m.Name, Value: m.Value}
pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value}

switch m.Type {
case storepb.LabelMatcher_EQ:
Expand All @@ -146,51 +149,169 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
q.Matchers = append(q.Matchers, pm)
}

resp, err := p.promSeries(s.Context(), q)
queryPrometheusSpan, ctx := tracing.StartSpan(s.Context(), "query_prometheus")

httpResp, err := p.startPromSeries(ctx, q)
if err != nil {
queryPrometheusSpan.Finish()
return errors.Wrap(err, "query Prometheus")
}

span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
defer span.Finish()
span.SetTag("series_count", len(resp.Results[0].Timeseries))

for _, e := range resp.Results[0].Timeseries {
lset := p.translateAndExtendLabels(e.Labels, ext)

if len(e.Samples) == 0 {
// As found in https://github.com/improbable-eng/thanos/issues/381
// Prometheus can give us completely empty time series. Ignore these with log until we figure out that
// this is expected from Prometheus perspective.
level.Warn(p.logger).Log(
"msg",
"found timeseries without any chunk. See https://github.com/improbable-eng/thanos/issues/381 for details",
"lset",
fmt.Sprintf("%v", lset),
)
continue
}
// Negotiate content. We requested streamed chunked response type, but still we need to support old versions of
// remote read.
contentType := httpResp.Header.Get("Content-Type")
if strings.HasPrefix(contentType, "application/x-protobuf;") {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_READ response type. Inefficient but the only present.")

// XOR encoding supports a max size of 2^16 - 1 samples, so we need
// to chunk all samples into groups of no more than 2^16 - 1
// See: https://github.com/improbable-eng/thanos/pull/718
aggregatedChunks, err := p.chunkSamples(e, math.MaxUint16)
resp, err := p.sampledPrometheusResponse(ctx, httpResp)
queryPrometheusSpan.Finish()
if err != nil {
return err
}

resp := storepb.NewSeriesResponse(&storepb.Series{
Labels: lset,
Chunks: aggregatedChunks,
})
if err := s.Send(resp); err != nil {
return err
span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
defer span.Finish()
span.SetTag("series_count", len(resp.Results[0].Timeseries))

for _, e := range resp.Results[0].Timeseries {
lset := p.translateAndExtendLabels(e.Labels, ext)

if len(e.Samples) == 0 {
// As found in https://github.com/improbable-eng/thanos/issues/381
// Prometheus can give us completely empty time series. Ignore these with log until we figure out that
// this is expected from Prometheus perspective.
level.Warn(p.logger).Log(
"msg",
"found timeseries without any chunk. See https://github.com/improbable-eng/thanos/issues/381 for details",
"lset",
fmt.Sprintf("%v", lset),
)
continue
}

// XOR encoding supports a max size of 2^16 - 1 samples, so we need
// to chunk all samples into groups of no more than 2^16 - 1
// See: https://github.com/improbable-eng/thanos/pull/718
aggregatedChunks, err := p.chunkSamples(e, math.MaxUint16)
if err != nil {
return err
}

if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: lset,
Chunks: aggregatedChunks,
})); err != nil {
return err
}
}
return nil
}

if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") {
return errors.Errorf("not supported remote read content type: %s", contentType)
}

level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.")

framesNum := 0
seriesNum := 0

defer func() {
queryPrometheusSpan.SetTag("frames", framesNum)
queryPrometheusSpan.SetTag("series", seriesNum)
queryPrometheusSpan.Finish()

level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum)
}()
defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body")

lastSeries := ""
stream := remote.NewStreamReader(httpResp.Body)
for {
b, err := stream.Next()
if err == io.EOF {
return nil
}

res := &prompb.ChunkedReadResponse{}
if err := proto.Unmarshal(b, res); err != nil {
return errors.Wrap(err, "failed to unmarshal response to *prompb.ChunkedReadResponse")
}

if len(res.ChunkedSeries) != 1 {
level.Warn(p.logger).Log("msg", "Prometheus ReadRequest_STREAMED_XOR_CHUNKS returned non 1 series in frame", "series", len(res.ChunkedSeries))
}

framesNum++
for _, series := range res.ChunkedSeries {
hash := func() string {
var m []string
for _, l := range series.Labels {
m = append(m, l.String())
}
return strings.Join(m, ";")
}()
if hash != lastSeries {
seriesNum++
lastSeries = hash
}

thanosChks := make([]storepb.AggrChunk, len(series.Chunks))

for i, chk := range series.Chunks {
thanosChks[i] = storepb.AggrChunk{
MaxTime: chk.MaxTimeMs,
MinTime: chk.MinTimeMs,
Raw: &storepb.Chunk{
Data: chk.Data,
Type: storepb.Chunk_Encoding(chk.Type - 1),
},
}
series.Chunks[i].Data = nil
}

if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: p.translateAndExtendLabels(series.Labels, ext),
Chunks: thanosChks,
})); err != nil {
return err
}
}
}
return nil
}

func (p *PrometheusStore) chunkSamples(series prompb.TimeSeries, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) {
func (p *PrometheusStore) sampledPrometheusResponse(ctx context.Context, resp *http.Response) (*prompb.ReadResponse, error) {
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "prom series request body")

buf := bytes.NewBuffer(p.getBuffer())
defer func() {
p.putBuffer(buf.Bytes())
}()
if _, err := io.Copy(buf, resp.Body); err != nil {
return nil, errors.Wrap(err, "copy response")
}
spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response")
decomp, err := snappy.Decode(p.getBuffer(), buf.Bytes())
spanSnappyDecode.Finish()
defer p.putBuffer(decomp)
if err != nil {
return nil, errors.Wrap(err, "decompress response")
}

var data prompb.ReadResponse
spanUnmarshal, ctx := tracing.StartSpan(ctx, "unmarshal_response")
if err := proto.Unmarshal(decomp, &data); err != nil {
return nil, errors.Wrap(err, "unmarshal response")
}
spanUnmarshal.Finish()
if len(data.Results) != 1 {
return nil, errors.Errorf("unexpected result size %d", len(data.Results))
}

return &data, nil
}

func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) {
samples := series.Samples

for len(samples) > 0 {
Expand All @@ -216,11 +337,11 @@ func (p *PrometheusStore) chunkSamples(series prompb.TimeSeries, maxSamplesPerCh
return chks, nil
}

func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prompb.ReadResponse, error) {
span, ctx := tracing.StartSpan(ctx, "query_prometheus")
defer span.Finish()

reqb, err := proto.Marshal(&prompb.ReadRequest{Queries: []prompb.Query{q}})
func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) (*http.Response, error) {
reqb, err := proto.Marshal(&prompb.ReadRequest{
Queries: []*prompb.Query{q},
ResponseType: prompb.ReadRequest_STREAMED_XOR_CHUNKS,
})
if err != nil {
return nil, errors.Wrap(err, "marshal read request")
}
Expand All @@ -233,46 +354,21 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom
return nil, errors.Wrap(err, "unable to create request")
}
preq.Header.Add("Content-Encoding", "snappy")
preq.Header.Set("Content-Type", "application/x-protobuf")
preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
preq.Header.Set("Content-Type", "application/x-stream-protobuf")
preq.Header.Set("X-Prometheus-Remote-Read-Version", "1.0.0")
spanReqDo, ctx := tracing.StartSpan(ctx, "query_prometheus_request")
preq = preq.WithContext(ctx)
presp, err := p.client.Do(preq)
if err != nil {
return nil, errors.Wrap(err, "send request")
}
spanReqDo.Finish()
defer runutil.CloseWithLogOnErr(p.logger, presp.Body, "prom series request body")

if presp.StatusCode/100 != 2 {
return nil, errors.Errorf("request failed with code %s", presp.Status)
}

buf := bytes.NewBuffer(p.getBuffer())
defer func() {
p.putBuffer(buf.Bytes())
}()
if _, err := io.Copy(buf, presp.Body); err != nil {
return nil, errors.Wrap(err, "copy response")
}
spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response")
decomp, err := snappy.Decode(p.getBuffer(), buf.Bytes())
spanSnappyDecode.Finish()
defer p.putBuffer(decomp)
if err != nil {
return nil, errors.Wrap(err, "decompress response")
}

var data prompb.ReadResponse
spanUnmarshal, ctx := tracing.StartSpan(ctx, "unmarshal_response")
if err := proto.Unmarshal(decomp, &data); err != nil {
return nil, errors.Wrap(err, "unmarshal response")
}
spanUnmarshal.Finish()
if len(data.Results) != 1 {
return nil, errors.Errorf("unexepected result size %d", len(data.Results))
}
return &data, nil
return presp, nil
}

func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []storepb.LabelMatcher, error) {
Expand Down
Loading

0 comments on commit f5ddb66

Please sign in to comment.