From 9f6868fde26b6db40a71e79dd293a4319f226cb6 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 20 Mar 2023 14:28:25 -0700 Subject: [PATCH] Update Thanos to fix issues with vertical sharding Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- .../thanos-io/thanos/pkg/extkingpin/flags.go | 27 ------ .../pkg/extprom/http/instrument_server.go | 4 +- .../thanos/pkg/querysharding/analyzer.go | 18 +++- .../thanos-io/thanos/pkg/store/bucket.go | 6 +- .../thanos/pkg/store/storepb/custom.go | 28 ------ .../pkg/store/storepb/prompb/samples.go | 93 +++++++++++++++++-- .../thanos-io/thanos/pkg/store/tsdb.go | 6 +- .../thanos/pkg/testutil/e2eutil/prometheus.go | 3 +- vendor/modules.txt | 2 +- 12 files changed, 118 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aeca128bd8e..28a4e1fc7c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ * [BUGFIX] Querier: Fix `/api/v1/series` returning 5XX instead of 4XX when limits are hit. #5169 * [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188 * [FEATURE] Alertmanager: Add support for time_intervals. #5102 +* [BUGFIX] Query Frontend: Disable `absent`, `absent_over_time` and `scalar` for vertical sharding. #5221 ## 1.14.0 2022-12-02 diff --git a/go.mod b/go.mod index 4672d75823e..7d489751f9f 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/thanos-community/promql-engine v0.0.0-20230224075812-ae04bbea7613 github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204 - github.com/thanos-io/thanos v0.31.0-rc.1 + github.com/thanos-io/thanos v0.29.1-0.20230314065129-06d9da40244f github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.7 diff --git a/go.sum b/go.sum index 9baa28c9906..9a558b94ce5 100644 --- a/go.sum +++ b/go.sum @@ -1513,8 +1513,8 @@ github.com/thanos-community/promql-engine v0.0.0-20230224075812-ae04bbea7613 h1: github.com/thanos-community/promql-engine v0.0.0-20230224075812-ae04bbea7613/go.mod h1:gREn4JarQ2DZdWirOtqZQd3p+c1xH+UVpGRjGKVoWx8= github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204 h1:W4w5Iph7j32Sf1QFWLJDCqvO0WgZS0jHGID+qnq3wV0= github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204/go.mod h1:STSgpY8M6EKF2G/raUFdbIMf2U9GgYlEjAEHJxjvpAo= -github.com/thanos-io/thanos v0.31.0-rc.1 h1:0BXE8CTwx6/MSfOMEc0Lz8r35OkxQGZEgHDyxrdy+60= -github.com/thanos-io/thanos v0.31.0-rc.1/go.mod h1:5ux+jb2oKr59+3XsCC0mX+JuAbPGJEMijjhcmnL/PMo= +github.com/thanos-io/thanos v0.29.1-0.20230314065129-06d9da40244f h1:zp6JvrGDrUb6FNUXF++7EGg9JpETUY6GdLNuKRZbIK4= +github.com/thanos-io/thanos v0.29.1-0.20230314065129-06d9da40244f/go.mod h1:AARtONvIIYyIm7w452siKssMT71kTAe4E3cORnLBj2g= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go index 59aaccf8b87..cee8c5df765 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go +++ b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go @@ -6,7 +6,6 @@ package extkingpin import ( "fmt" "strings" - "time" extflag "github.com/efficientgo/tools/extkingpin" "github.com/pkg/errors" @@ -73,32 +72,6 @@ func validateAddrs(addrs addressSlice) error { return nil } -// RegisterGRPCFlags registers flags commonly used to configure gRPC servers with. -func RegisterGRPCFlags(cmd FlagClause) ( - grpcBindAddr *string, - grpcGracePeriod *model.Duration, - grpcTLSSrvCert *string, - grpcTLSSrvKey *string, - grpcTLSSrvClientCA *string, - grpcMaxConnectionAge *time.Duration, -) { - grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components."). - Default("0.0.0.0:10901").String() - grpcGracePeriod = ModelDuration(cmd.Flag("grpc-grace-period", "Time to wait after an interrupt received for GRPC Server.").Default("2m")) // by default it's the same as query.timeout. - - grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String() - grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String() - grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String() - grpcMaxConnectionAge = cmd.Flag("grpc-server-max-connection-age", "The grpc server max connection age. This controls how often to re-read the tls certificates and redo the TLS handshake ").Default("60m").Duration() - - return grpcBindAddr, - grpcGracePeriod, - grpcTLSSrvCert, - grpcTLSSrvKey, - grpcTLSSrvClientCA, - grpcMaxConnectionAge -} - // RegisterCommonObjStoreFlags register flags commonly used to configure http servers with. func RegisterHTTPFlags(cmd FlagClause) (httpBindAddr *string, httpGracePeriod *model.Duration, httpTLSConfig *string) { httpBindAddr = cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String() diff --git a/vendor/github.com/thanos-io/thanos/pkg/extprom/http/instrument_server.go b/vendor/github.com/thanos-io/thanos/pkg/extprom/http/instrument_server.go index d06c5261ddb..6c0e6262674 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/extprom/http/instrument_server.go +++ b/vendor/github.com/thanos-io/thanos/pkg/extprom/http/instrument_server.go @@ -97,8 +97,8 @@ func httpInstrumentationHandler(baseLabels prometheus.Labels, metrics *defaultMe // If OpenTracing span not found, try OTEL. if !OTfound { span := trace.SpanFromContext(r.Context()) - if span != nil { - traceID = span.SpanContext().SpanID().String() + if span != nil && span.SpanContext().IsSampled() { + traceID = span.SpanContext().TraceID().String() } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go b/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go index dc8b16ca72f..12cb1b9a1a0 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go +++ b/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go @@ -17,11 +17,17 @@ package querysharding import ( + "fmt" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" ) +var ( + notShardableErr = fmt.Errorf("expressions are not shardable") +) + type Analyzer interface { Analyze(string) (QueryAnalysis, error) } @@ -72,7 +78,9 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // Analyze uses the following algorithm: // - if a query has functions which cannot be sharded such as -// label_join or label_replace, then treat the query as non shardable. +// absent or absent_over_time, then treat the query as non shardable. +// - if a query has functions `label_join` or `label_replace`, +// calculate the shard labels based on grouping labels. // - Walk the query and find the least common labelset // used in grouping expressions. If non-empty, treat the query // as shardable by those labels. @@ -89,6 +97,7 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { analysis QueryAnalysis dynamicLabels []string ) + isShardable := true parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { switch n := node.(type) { case *parser.Call: @@ -96,6 +105,9 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { if n.Func.Name == "label_join" || n.Func.Name == "label_replace" { dstLabel := stringFromArg(n.Args[1]) dynamicLabels = append(dynamicLabels, dstLabel) + } else if n.Func.Name == "absent_over_time" || n.Func.Name == "absent" || n.Func.Name == "scalar" { + isShardable = false + return notShardableErr } } case *parser.BinaryExpr: @@ -117,6 +129,10 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { return nil }) + if !isShardable { + return nonShardableQuery(), nil + } + // If currently it is shard by, it is still shardable if there is // any label left after removing the dynamic labels. // If currently it is shard without, it is still shardable if we diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index cc0ca30e385..dfe99e2ee3c 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "github.com/weaveworks/common/httpgrpc" + "github.com/cespare/xxhash" "github.com/alecthomas/units" @@ -932,7 +934,7 @@ func (b *blockSeriesClient) ExpandPostings( } if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { - return errors.Wrap(err, "exceeded series limit") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } b.postings = ps @@ -1031,7 +1033,7 @@ func (b *blockSeriesClient) nextBatch() error { // Ensure sample limit through chunksLimiter if we return chunks. if err := b.chunksLimiter.Reserve(uint64(len(b.chkMetas))); err != nil { - return errors.Wrap(err, "exceeded chunks limit") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded chunks limit: %s", err) } b.entries = append(b.entries, s) diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go index 85f858562bb..c1f4b9b8bfe 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go @@ -13,12 +13,10 @@ import ( "github.com/gogo/protobuf/types" "github.com/pkg/errors" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc/codes" "github.com/thanos-io/thanos/pkg/store/labelpb" - prompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) var PartialResponseStrategyValues = func() []string { @@ -554,29 +552,3 @@ func (m *QueryHints) IsSafeToExecute() bool { return false } - -// HistogramProtoToHistogram extracts a (normal integer) Histogram from the -// provided proto message. The caller has to make sure that the proto message -// represents an interger histogram and not a float histogram. -func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram { - return &histogram.Histogram{ - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountInt(), - Count: hp.GetCountInt(), - Sum: hp.Sum, - PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveDeltas(), - NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeDeltas(), - } -} - -func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span { - spans := make([]histogram.Span, len(s)) - for i := 0; i < len(s); i++ { - spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} - } - - return spans -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/samples.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/samples.go index 6ec77d58e67..ac0aa57e16d 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/samples.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/samples.go @@ -5,6 +5,7 @@ package prompb import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql" ) @@ -24,14 +25,92 @@ func SamplesFromSamplePairs(samples []model.SamplePair) []Sample { // SamplesFromPromqlPoints converts a slice of promql.Point // to a slice of Sample. -func SamplesFromPromqlPoints(samples []promql.Point) []Sample { - result := make([]Sample, 0, len(samples)) +func SamplesFromPromqlPoints(samples []promql.Point) ([]Sample, []Histogram) { + floats := make([]Sample, 0, len(samples)) + histograms := make([]Histogram, 0, len(samples)) for _, s := range samples { - result = append(result, Sample{ - Value: s.V, - Timestamp: s.T, - }) + if s.H == nil { + floats = append(floats, Sample{ + Value: s.V, + Timestamp: s.T, + }) + } else { + histograms = append(histograms, FloatHistogramToHistogramProto(s.T, s.H)) + } } - return result + return floats, histograms +} + +// HistogramProtoToHistogram extracts a (normal integer) Histogram from the +// provided proto message. The caller has to make sure that the proto message +// represents an interger histogram and not a float histogram. +// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542. +func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram { + return &histogram.Histogram{ + Schema: hp.Schema, + ZeroThreshold: hp.ZeroThreshold, + ZeroCount: hp.GetZeroCountInt(), + Count: hp.GetCountInt(), + Sum: hp.Sum, + PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), + PositiveBuckets: hp.GetPositiveDeltas(), + NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), + NegativeBuckets: hp.GetNegativeDeltas(), + } +} + +// FloatHistogramToHistogramProto converts a float histogram to a protobuf type. +// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601. +func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram { + return Histogram{ + Count: &Histogram_CountFloat{CountFloat: fh.Count}, + Sum: fh.Sum, + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, + NegativeSpans: spansToSpansProto(fh.NegativeSpans), + NegativeCounts: fh.NegativeBuckets, + PositiveSpans: spansToSpansProto(fh.PositiveSpans), + PositiveCounts: fh.PositiveBuckets, + ResetHint: Histogram_ResetHint(fh.CounterResetHint), + Timestamp: timestamp, + } +} + +// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the +// provided proto message to a Float Histogram. The caller has to make sure that +// the proto message represents an float histogram and not a integer histogram. +// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560. +func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram { + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(hp.ResetHint), + Schema: hp.Schema, + ZeroThreshold: hp.ZeroThreshold, + ZeroCount: hp.GetZeroCountFloat(), + Count: hp.GetCountFloat(), + Sum: hp.Sum, + PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), + PositiveBuckets: hp.GetPositiveCounts(), + NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), + NegativeBuckets: hp.GetNegativeCounts(), + } +} + +func spansToSpansProto(s []histogram.Span) []*BucketSpan { + spans := make([]*BucketSpan, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +func spansProtoToSpans(s []*BucketSpan) []histogram.Span { + spans := make([]histogram.Span, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go b/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go index 00d21b59196..1d3a6879153 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go @@ -202,15 +202,13 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) } - chunkBytes := make([]byte, len(chk.Chunk.Bytes())) - copy(chunkBytes, chk.Chunk.Bytes()) c := storepb.AggrChunk{ MinTime: chk.MinTime, MaxTime: chk.MaxTime, Raw: &storepb.Chunk{ Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one. - Data: chunkBytes, - Hash: hashChunk(hasher, chunkBytes, enableChunkHashCalculation), + Data: chk.Chunk.Bytes(), + Hash: hashChunk(hasher, chk.Chunk.Bytes(), enableChunkHashCalculation), }, } frameBytesLeft -= c.Size() diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go index e35f424590d..98c009ac3cb 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go +++ b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go @@ -484,6 +484,7 @@ func createBlock( var g errgroup.Group var timeStepSize = (maxt - mint) / int64(numSamples+1) var batchSize = len(series) / runtime.GOMAXPROCS(0) + r := rand.New(rand.NewSource(int64(numSamples))) for len(series) > 0 { l := batchSize @@ -506,7 +507,7 @@ func createBlock( var err error if sampleType == chunkenc.ValFloat { - _, err = app.Append(0, lset, t, rand.Float64()) + _, err = app.Append(0, lset, t, r.Float64()) } else if sampleType == chunkenc.ValHistogram { _, err = app.AppendHistogram(0, lset, t, &histogramSample, nil) } diff --git a/vendor/modules.txt b/vendor/modules.txt index b9de790e133..d54b2ef9fe4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -845,7 +845,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing -# github.com/thanos-io/thanos v0.31.0-rc.1 +# github.com/thanos-io/thanos v0.29.1-0.20230314065129-06d9da40244f ## explicit; go 1.18 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader