diff --git a/docs/img/tracing.png b/docs/img/tracing.png new file mode 100644 index 00000000000..e4ee5fc2cdd Binary files /dev/null and b/docs/img/tracing.png differ diff --git a/docs/img/tracing2.png b/docs/img/tracing2.png new file mode 100644 index 00000000000..d32ff6c319b Binary files /dev/null and b/docs/img/tracing2.png differ diff --git a/docs/tracing.md b/docs/tracing.md index 458b3a3cee9..62870224c7e 100644 --- a/docs/tracing.md +++ b/docs/tracing.md @@ -10,7 +10,7 @@ You can either pass YAML file defined below in `--tracing.config-file` or pass t Don't be afraid of multiline flags! -In Kubernetes it is as easy as (on Thanos sidecar example): +In Kubernetes it is as easy as (using Thanos sidecar example): ```yaml - args: @@ -38,9 +38,37 @@ In Kubernetes it is as easy as (on Thanos sidecar example): At that point, anyone can use your provider by spec. +See [this issue](https://github.com/thanos-io/thanos/issues/1972) to check our progress on moving to OpenTelemetry Go client library. + +## Usage + +Once tracing is enabled and sampling per backend is configured Thanos will generate traces for all gRPC and HTTP APIs thanks to generic "middlewares". Some more interesting to observe APIs like `query` or `query_range` have more low-level spans with focused metadata showing latency for important functionalities. For example Jaeger view of HTTP query_range API call might look as follows: + +![view](img/tracing2.png) + +As you can see it contains both HTTP request and spans around gRPC request, since [Querier](components/query.md) calls gRPC services to get fetch series data. + +Each Thanos component generates spans related to its work and sends them to central place e.g Jaeger or OpenTelemetry collector. Such place is then responsible to tie all spans to a single trace, showing a request execution path. + +### Obtaining Trace ID + +Single trace is tied to a single, unique request to the system and is composed of many spans from different components. Trace is identifiable using `Trace ID`, which is a unique hash e.g `131da78f02aa3525`. This information can be also referred as `request id` and `operation id` in other systems. In order to use trace data you want to find trace IDs that explains the requests you are interested in e.g request with interesting error, or longer latency, or just debug call you just made. + +When using tracing with Thanos, you can obtain trace ID in multiple ways: + +* Search by labels/attributes/tags/time/component/latency e.g. using Jaeger indexing. +* [Exemplars](https://www.bwplotka.dev/2021/correlations-exemplars/) +* If request was sampled, response will have `X-Thanos-Trace-Id` response header with trace ID of this request as value. + +![view](img/tracing.png) + +### Forcing Sampling + +Every request against any Thanos component's API with header `X-Thanos-Force-Tracing` will be sampled if tracing backend was configured. + ## Configuration -Current tracing supported backends: +Currently supported tracing supported backends: ### Jaeger diff --git a/examples/interactive/interactive_test.go b/examples/interactive/interactive_test.go index c3fd832bc21..b1756afcafc 100644 --- a/examples/interactive/interactive_test.go +++ b/examples/interactive/interactive_test.go @@ -18,6 +18,8 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/testutil" + tracingclient "github.com/thanos-io/thanos/pkg/tracing/client" + "github.com/thanos-io/thanos/pkg/tracing/jaeger" "gopkg.in/yaml.v2" ) @@ -84,7 +86,8 @@ func createData() (perr error) { return nil } -// Test args: -test.timeout 9999m +// TestReadOnlyThanosSetup runs read only Thanos setup that has data from `maxTimeFresh - 2w` to `maxTimeOld`, with extra monitoring and tracing for full playground experience. +// Run with test args `-test.timeout 9999m`. func TestReadOnlyThanosSetup(t *testing.T) { t.Skip("This is interactive test - it will until you will kill it or curl 'finish' endpoint. Uncomment and run as normal test to use it!") @@ -121,6 +124,21 @@ func TestReadOnlyThanosSetup(t *testing.T) { testutil.Ok(t, exec("cp", "-r", store1Data+"/.", filepath.Join(m1.Dir(), "bkt1"))) testutil.Ok(t, exec("cp", "-r", store2Data+"/.", filepath.Join(m1.Dir(), "bkt2"))) + // Setup Jaeger. + j := e.Runnable("tracing").WithPorts(map[string]int{"http-front": 16686, "jaeger.thrift": 14268}).Init(e2e.StartOptions{Image: "jaegertracing/all-in-one:1.25"}) + testutil.Ok(t, e2e.StartAndWaitReady(j)) + + jaegerConfig, err := yaml.Marshal(tracingclient.TracingConfig{ + Type: tracingclient.JAEGER, + Config: jaeger.Config{ + ServiceName: "thanos", + SamplerType: "const", + SamplerParam: 1, + Endpoint: "http://" + j.InternalEndpoint("jaeger.thrift") + "/api/traces", + }, + }) + testutil.Ok(t, err) + // Create two store gateways, one for each bucket (access point to long term storage). // ┌───────────┐ // │ │ @@ -144,7 +162,13 @@ func TestReadOnlyThanosSetup(t *testing.T) { }, }) testutil.Ok(t, err) - store1 := e2edb.NewThanosStore(e, "store1", bkt1Config, e2edb.WithImage("thanos:latest")) + store1 := e2edb.NewThanosStore( + e, + "store1", + bkt1Config, + e2edb.WithImage("thanos:latest"), + e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), + ) bkt2Config, err := yaml.Marshal(client.BucketConfig{ Type: client.S3, @@ -157,7 +181,14 @@ func TestReadOnlyThanosSetup(t *testing.T) { }, }) testutil.Ok(t, err) - store2 := e2edb.NewThanosStore(e, "store2", bkt2Config, e2edb.WithImage("thanos:latest")) + + store2 := e2edb.NewThanosStore( + e, + "store2", + bkt2Config, + e2edb.WithImage("thanos:latest"), + e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), + ) // Create two Prometheus replicas in HA, and one separate one (short term storage + scraping). // Add a Thanos sidecar. @@ -189,8 +220,8 @@ func TestReadOnlyThanosSetup(t *testing.T) { promHA1 := e2edb.NewPrometheus(e, "prom-ha1") prom2 := e2edb.NewPrometheus(e, "prom2") - sidecarHA0 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha0", promHA0, e2edb.WithImage("thanos:latest")) - sidecarHA1 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha1", promHA1, e2edb.WithImage("thanos:latest")) + sidecarHA0 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha0", promHA0, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)})) + sidecarHA1 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha1", promHA1, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)})) sidecar2 := e2edb.NewThanosSidecar(e, "sidecar2", prom2, e2edb.WithImage("thanos:latest")) testutil.Ok(t, exec("cp", "-r", prom1Data+"/.", promHA0.Dir())) @@ -273,7 +304,9 @@ global: sidecarHA0.InternalEndpoint("grpc"), sidecarHA1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc"), - }, e2edb.WithImage("thanos:latest"), + }, + e2edb.WithImage("thanos:latest"), + e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), ) testutil.Ok(t, e2e.StartAndWaitReady(query1)) @@ -285,6 +318,9 @@ global: testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s/%s", query1.Endpoint("http"), path))) testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s/%s", prom2.Endpoint("http"), path))) + // Tracing endpoint. + testutil.Ok(t, e2einteractive.OpenInBrowser("http://"+j.Endpoint("http-front"))) + // Monitoring Endpoint. testutil.Ok(t, m.OpenUserInterfaceInBrowser()) testutil.Ok(t, e2einteractive.RunUntilEndpointHit()) } diff --git a/pkg/api/api.go b/pkg/api/api.go index 600eef31f20..5e361ffc6fc 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -195,7 +195,7 @@ func GetRuntimeInfoFunc(logger log.Logger) RuntimeInfoFn { type InstrFunc func(name string, f ApiFunc) http.HandlerFunc -// Instr returns a http HandlerFunc with the instrumentation middleware. +// GetInstr returns a http HandlerFunc with the instrumentation middleware. func GetInstr( tracer opentracing.Tracer, logger log.Logger, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8255e1dc186..4563b029c47 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -745,6 +745,7 @@ func (s *bucketSeriesSet) Err() error { // blockSeries returns series matching given matchers, that have some data in given time range. func blockSeries( + ctx context.Context, extLset labels.Labels, // External labels added to the returned series labels. indexr *bucketIndexReader, // Index reader for block. chunkr *bucketChunkReader, // Chunk reader for block. @@ -755,7 +756,7 @@ func blockSeries( minTime, maxTime int64, // Series must have data in this time range to be returned. loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks. ) (storepb.SeriesSet, *queryStats, error) { - ps, err := indexr.ExpandedPostings(matchers) + ps, err := indexr.ExpandedPostings(ctx, matchers) if err != nil { return nil, nil, errors.Wrap(err, "expanded matching posting") } @@ -772,7 +773,7 @@ func blockSeries( // Preload all series index data. // TODO(bwplotka): Consider not keeping all series in memory all the time. // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. - if err := indexr.PreloadSeries(ps); err != nil { + if err := indexr.PreloadSeries(ctx, ps); err != nil { return nil, nil, errors.Wrap(err, "preload series") } @@ -829,7 +830,7 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats, nil } - if err := chunkr.load(res, loadAggregates); err != nil { + if err := chunkr.load(ctx, res, loadAggregates); err != nil { return nil, nil, errors.Wrap(err, "load chunks") } @@ -988,7 +989,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } s.mtx.RLock() - for _, bs := range s.blockSets { blockMatchers, ok := bs.labelMatchers(matchers...) if !ok { @@ -1003,6 +1003,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie for _, b := range blocks { b := b + gctx := gctx if s.enableSeriesResponseHints { // Keep track of queried blocks. @@ -1011,9 +1012,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie var chunkr *bucketChunkReader // We must keep the readers open until all their data has been sent. - indexr := b.indexReader(gctx) + indexr := b.indexReader() if !req.SkipChunks { - chunkr = b.chunkReader(gctx) + chunkr = b.chunkReader() defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") } @@ -1021,7 +1022,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer runutil.CloseWithLogOnErr(s.logger, indexr, "series block") g.Go(func() error { + span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ + "block.id": b.meta.ULID, + "block.mint": b.meta.MinTime, + "block.maxt": b.meta.MaxTime, + "block.resolution": b.meta.Thanos.Downsample.Resolution, + }) + defer span.Finish() + part, pstats, err := blockSeries( + newCtx, b.extLset, indexr, chunkr, @@ -1041,6 +1051,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie stats = stats.merge(pstats) mtx.Unlock() + // No info about samples exactly, so pass at least chunks. + span.SetTag("processed.series", pstats.seriesFetched) + span.SetTag("processed.chunks", pstats.chunksFetched) return nil }) } @@ -1187,6 +1200,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq for _, b := range s.blocks { b := b + gctx := gctx + if !b.overlapsClosedInterval(req.Start, req.End) { continue } @@ -1196,9 +1211,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader(gctx) + indexr := b.indexReader() g.Go(func() error { + span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ + "block.id": b.meta.ULID, + "block.mint": b.meta.MinTime, + "block.maxt": b.meta.MaxTime, + "block.resolution": b.meta.Thanos.Downsample.Resolution, + }) + defer span.Finish() defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") var result []string @@ -1220,7 +1242,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq result = strutil.MergeSlices(res, extRes) } else { - seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil) + seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) } @@ -1326,9 +1348,15 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader(gctx) - + indexr := b.indexReader() g.Go(func() error { + span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ + "block.id": b.meta.ULID, + "block.mint": b.meta.MinTime, + "block.maxt": b.meta.MaxTime, + "block.resolution": b.meta.Thanos.Downsample.Resolution, + }) + defer span.Finish() defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") var result []string @@ -1345,7 +1373,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } result = res } else { - seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil) + seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) } @@ -1667,14 +1695,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) } -func (b *bucketBlock) indexReader(ctx context.Context) *bucketIndexReader { +func (b *bucketBlock) indexReader() *bucketIndexReader { b.pendingReaders.Add(1) - return newBucketIndexReader(ctx, b) + return newBucketIndexReader(b) } -func (b *bucketBlock) chunkReader(ctx context.Context) *bucketChunkReader { +func (b *bucketBlock) chunkReader() *bucketChunkReader { b.pendingReaders.Add(1) - return newBucketChunkReader(ctx, b) + return newBucketChunkReader(b) } // matchRelabelLabels verifies whether the block matches the given matchers. @@ -1703,7 +1731,6 @@ func (b *bucketBlock) Close() error { // bucketIndexReader is a custom index reader (not conforming index.Reader interface) that reads index that is stored in // object storage without having to fully download it. type bucketIndexReader struct { - ctx context.Context block *bucketBlock dec *index.Decoder stats *queryStats @@ -1712,9 +1739,8 @@ type bucketIndexReader struct { loadedSeries map[uint64][]byte } -func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexReader { +func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { r := &bucketIndexReader{ - ctx: ctx, block: block, dec: &index.Decoder{ LookupSymbol: block.indexHeaderReader.LookupSymbol, @@ -1734,7 +1760,7 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher) ([]uint64, error) { var ( postingGroups []*postingGroup allRequested = false @@ -1783,7 +1809,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er keys = append(keys, allPostingsLabel) } - fetchedPostings, err := r.fetchPostings(keys) + fetchedPostings, err := r.fetchPostings(ctx, keys) if err != nil { return nil, errors.Wrap(err, "get postings") } @@ -1918,7 +1944,7 @@ type postingPtr struct { // fetchPostings fill postings requested by posting groups. // It returns one postings for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label) ([]index.Postings, error) { timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) defer timer.ObserveDuration() @@ -1927,7 +1953,7 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings output := make([]index.Postings, len(keys)) // Fetch postings from the cache with a single call. - fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys) + fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) // Iterate over all groups and fetch posting from cache. // If we have a miss, mark key to be fetched in `ptrs` slice. @@ -1990,7 +2016,7 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End) }) - g, ctx := errgroup.WithContext(r.ctx) + g, ctx := errgroup.WithContext(ctx) for _, part := range parts { i, j := part.ElemRng[0], part.ElemRng[1] @@ -2048,7 +2074,7 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings // Truncate first 4 bytes which are length of posting. output[p.keyID] = newBigEndianPostings(pBytes[4:]) - r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], dataToCache) + r.block.indexCache.StorePostings(ctx, r.block.meta.ULID, keys[p.keyID], dataToCache) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ @@ -2136,13 +2162,13 @@ func (it *bigEndianPostings) length() int { return len(it.list) / 4 } -func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { +func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []uint64) error { timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) defer timer.ObserveDuration() // Load series from cache, overwriting the list of ids to preload // with the missing ones. - fromCache, ids := r.block.indexCache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids) + fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids) for id, b := range fromCache { r.loadedSeries[id] = b } @@ -2150,7 +2176,7 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) { return ids[i], ids[i] + maxSeriesSize }) - g, ctx := errgroup.WithContext(r.ctx) + g, ctx := errgroup.WithContext(ctx) for _, p := range parts { s, e := p.Start, p.End i, j := p.ElemRng[0], p.ElemRng[1] @@ -2199,7 +2225,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc c = c[n : n+int(l)] r.mtx.Lock() r.loadedSeries[id] = c - r.block.indexCache.StoreSeries(r.ctx, r.block.meta.ULID, id, c) + r.block.indexCache.StoreSeries(ctx, r.block.meta.ULID, id, c) r.mtx.Unlock() } return nil @@ -2371,7 +2397,6 @@ type loadIdx struct { } type bucketChunkReader struct { - ctx context.Context block *bucketBlock toLoad [][]loadIdx @@ -2383,9 +2408,8 @@ type bucketChunkReader struct { chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. } -func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader { +func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { return &bucketChunkReader{ - ctx: ctx, block: block, stats: &queryStats{}, toLoad: make([][]loadIdx, len(block.chunkObjs)), @@ -2416,8 +2440,8 @@ func (r *bucketChunkReader) addLoad(id uint64, seriesEntry, chunk int) error { } // load loads all added chunks and saves resulting aggrs to res. -func (r *bucketChunkReader) load(res []seriesEntry, aggrs []storepb.Aggr) error { - g, ctx := errgroup.WithContext(r.ctx) +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr) error { + g, ctx := errgroup.WithContext(ctx) for seq, pIdxs := range r.toLoad { sort.Slice(pIdxs, func(i, j int) bool { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e449684457f..1e002b3d2ff 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1144,11 +1144,11 @@ func benchmarkExpandedPostings( partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } - indexr := newBucketIndexReader(context.Background(), b) + indexr := newBucketIndexReader(b) t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(c.matchers) + p, err := indexr.ExpandedPostings(context.Background(), c.matchers) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p)) } @@ -2225,8 +2225,6 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck } func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMeta *metadata.Meta, blk *bucketBlock, aggrs []storepb.Aggr) { - ctx := context.Background() - // Run the same number of queries per goroutine. queriesPerWorker := b.N / concurrency @@ -2263,10 +2261,10 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - indexReader := blk.indexReader(ctx) - chunkReader := blk.chunkReader(ctx) + indexReader := blk.indexReader() + chunkReader := blk.chunkReader() - seriesSet, _, err := blockSeries(nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates) + seriesSet, _, err := blockSeries(context.Background(), nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates) testutil.Ok(b, err) // Ensure at least 1 series has been returned (as expected). diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 894cc348930..9ad9dea6ad6 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -6,7 +6,6 @@ package store import ( "bytes" "context" - "encoding/json" "fmt" "io" "io/ioutil" @@ -22,7 +21,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" - "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -200,6 +198,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie } queryPrometheusSpan, ctx := tracing.StartSpan(s.Context(), "query_prometheus") + queryPrometheusSpan.SetTag("query.request", q.String()) httpResp, err := p.startPromRemoteRead(ctx, q) if err != nil { @@ -220,7 +219,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset) } -func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, extLset labels.Labels) error { +func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { ctx := s.Context() level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") @@ -266,7 +265,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series return nil } -func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, extLset labels.Labels) error { +func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") framesNum := 0 @@ -278,13 +277,14 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie }() defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body") - var ( - data = p.getBuffer() - ) + var data = p.getBuffer() defer p.putBuffer(data) + bodySizer := NewBytesRead(httpResp.Body) + seriesStats := &storepb.SeriesStatsCounter{} + // TODO(bwplotka): Put read limit as a flag. - stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, *data) + stream := remote.NewChunkedReader(bodySizer, remote.DefaultChunkedReadLimit, *data) for { res := &prompb.ChunkedReadResponse{} err := stream.NextProto(res) @@ -301,6 +301,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie framesNum++ for _, series := range res.ChunkedSeries { + seriesStats.CountSeries(series.Labels) thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) for i, chk := range series.Chunks { thanosChks[i] = storepb.AggrChunk{ @@ -314,6 +315,9 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie Type: storepb.Chunk_Encoding(chk.Type - 1), }, } + seriesStats.Samples += thanosChks[i].Raw.XORNumSamples() + seriesStats.Chunks++ + // Drop the reference to data from non protobuf for GC. series.Chunks[i].Data = nil } @@ -328,10 +332,34 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie } } } + + querySpan.SetTag("processed.series", seriesStats.Series) + querySpan.SetTag("processed.chunks", seriesStats.Chunks) + querySpan.SetTag("processed.samples", seriesStats.Samples) + querySpan.SetTag("processed.bytes", bodySizer.BytesCount()) level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum) return nil } +type BytesCounter struct { + io.ReadCloser + bytesCount int +} + +func NewBytesRead(rc io.ReadCloser) *BytesCounter { + return &BytesCounter{ReadCloser: rc} +} + +func (s *BytesCounter) Read(p []byte) (n int, err error) { + n, err = s.ReadCloser.Read(p) + s.bytesCount += n + return n, err +} + +func (s *BytesCounter) BytesCount() int { + return s.bytesCount +} + func (p *PrometheusStore) fetchSampledResponse(ctx context.Context, resp *http.Response) (_ *prompb.ReadResponse, err error) { defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "prom series request body") @@ -392,7 +420,7 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC return chks, nil } -func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Query) (presp *http.Response, _ error) { +func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Query) (presp *http.Response, err error) { reqb, err := proto.Marshal(&prompb.ReadRequest{ Queries: []*prompb.Query{q}, AcceptedResponseTypes: p.remoteReadAcceptableResponses, @@ -401,11 +429,6 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que return nil, errors.Wrap(err, "marshal read request") } - qjson, err := json.Marshal(q) - if err != nil { - return nil, errors.Wrap(err, "json encode query for tracing") - } - u := *p.base u.Path = path.Join(u.Path, "api/v1/read") @@ -418,10 +441,7 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") preq.Header.Set("User-Agent", thanoshttp.ThanosUserAgent) - tracing.DoInSpan(ctx, "query_prometheus_request", func(ctx context.Context) { - preq = preq.WithContext(ctx) - presp, err = p.client.Do(preq) - }, opentracing.Tag{Key: "prometheus.query", Value: string(qjson)}) + presp, err = p.client.Do(preq.WithContext(ctx)) if err != nil { return nil, errors.Wrap(err, "send request") } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 74d4612ed09..20287070771 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -38,13 +38,13 @@ const StoreMatcherKey = ctxKey(0) // Client holds meta information about a store. type Client interface { - // Client to access the store. + // StoreClient to access the store. storepb.StoreClient // LabelSets that each apply to some data exposed by the backing store. LabelSets() []labels.Labels - // Minimum and maximum time range of data in the store. + // TimeRange returns minimum and maximum time range of data in the store. TimeRange() (mint int64, maxt int64) String() string @@ -243,20 +243,27 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) - // This is used to cancel this stream when one operations takes too long. + // This is used to cancel this stream when one operation takes too long. seriesCtx, closeSeries := context.WithCancel(gctx) seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ "target": st.Addr(), }) defer closeSeries() + storeID := labelpb.PromLabelSetsToString(st.LabelSets()) + if storeID == "" { + storeID = "Store Gateway" + } + span, seriesCtx := tracing.StartSpan(seriesCtx, "proxy.series", tracing.Tags{ + "store.id": storeID, + "store.addr": st.Addr(), + }) + sc, err := st.Series(seriesCtx, r) if err != nil { - storeID := labelpb.PromLabelSetsToString(st.LabelSets()) - if storeID == "" { - storeID = "Store Gateway" - } err = errors.Wrapf(err, "fetch series for %s %s", storeID, st) + span.SetTag("err", err.Error()) + span.Finish() if r.PartialResponseDisabled { level.Error(reqLogger).Log("err", err, "msg", "partial response disabled; aborting request") return err @@ -267,7 +274,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe // Schedule streamSeriesSet that translates gRPC streamed response // into seriesSet (if series) or respCh if warnings. - seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, reqLogger, closeSeries, + seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, reqLogger, span, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses)) } @@ -354,6 +361,7 @@ func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFun func startStreamSeriesSet( ctx context.Context, logger log.Logger, + span tracing.Span, closeSeries context.CancelFunc, wg *sync.WaitGroup, stream storepb.Store_SeriesClient, @@ -377,8 +385,18 @@ func startStreamSeriesSet( wg.Add(1) go func() { - defer wg.Done() - defer close(s.recvCh) + seriesStats := &storepb.SeriesStatsCounter{} + bytesProcessed := 0 + + defer func() { + span.SetTag("processed.series", seriesStats.Series) + span.SetTag("processed.chunks", seriesStats.Chunks) + span.SetTag("processed.samples", seriesStats.Samples) + span.SetTag("processed.bytes", bytesProcessed) + span.Finish() + close(s.recvCh) + wg.Done() + }() numResponses := 0 defer func() { @@ -424,12 +442,15 @@ func startStreamSeriesSet( return } numResponses++ + bytesProcessed += rr.r.Size() if w := rr.r.GetWarning(); w != "" { s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) } if series := rr.r.GetSeries(); series != nil { + seriesStats.Count(series) + select { case s.recvCh <- series: case <-ctx.Done(): diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 63830d8eecd..3928f9d199d 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -5,6 +5,7 @@ package storepb import ( "bytes" + "encoding/binary" "fmt" "sort" "strconv" @@ -456,3 +457,62 @@ func CompareLabels(a, b []Label) int { func LabelsToPromLabelsUnsafe(lset []Label) labels.Labels { return labelpb.ZLabelsToPromLabels(lset) } + +// XORNumSamples return number of samples. Returns 0 if it's not XOR chunk. +func (m *Chunk) XORNumSamples() int { + if m.Type == Chunk_XOR { + return int(binary.BigEndian.Uint16(m.Data)) + } + return 0 +} + +type SeriesStatsCounter struct { + lastSeriesHash uint64 + + Series int + Chunks int + Samples int +} + +func (c *SeriesStatsCounter) CountSeries(seriesLabels []labelpb.ZLabel) { + seriesHash := labelpb.HashWithPrefix("", seriesLabels) + if c.lastSeriesHash != 0 || seriesHash != c.lastSeriesHash { + c.lastSeriesHash = seriesHash + c.Series++ + } +} + +func (c *SeriesStatsCounter) Count(series *Series) { + c.CountSeries(series.Labels) + for _, chk := range series.Chunks { + if chk.Raw == nil { + c.Chunks++ + c.Samples += chk.Raw.XORNumSamples() + } + + if chk.Count == nil { + c.Chunks++ + c.Samples += chk.Count.XORNumSamples() + } + + if chk.Counter == nil { + c.Chunks++ + c.Samples += chk.Counter.XORNumSamples() + } + + if chk.Max == nil { + c.Chunks++ + c.Samples += chk.Max.XORNumSamples() + } + + if chk.Min == nil { + c.Chunks++ + c.Samples += chk.Min.XORNumSamples() + } + + if chk.Sum == nil { + c.Chunks++ + c.Samples += chk.Sum.XORNumSamples() + } + } +} diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index ec9709737ad..867eef7ce02 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -9,11 +9,19 @@ import ( "github.com/opentracing/opentracing-go" ) -// ForceTracingBaggageKey - force sampling header. -const ForceTracingBaggageKey = "X-Thanos-Force-Tracing" +const ( + // ForceTracingBaggageKey is a request header name that forces tracing sampling. + ForceTracingBaggageKey = "X-Thanos-Force-Tracing" -// traceIdResponseHeader - Trace ID response header. -const traceIDResponseHeader = "X-Thanos-Trace-Id" + // traceIdResponseHeader is a response header name that stores trace ID. + traceIDResponseHeader = "X-Thanos-Trace-Id" +) + +// Aliases to avoid spreading opentracing package to Thanos code. + +type Tag = opentracing.Tag +type Tags = opentracing.Tags +type Span = opentracing.Span type contextKey struct{} @@ -49,14 +57,14 @@ func CopyTraceContext(trgt, src context.Context) context.Context { // StartSpan starts and returns span with `operationName` and hooking as child to a span found within given context if any. // It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer without notification. -func StartSpan(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { +func StartSpan(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (Span, context.Context) { tracer := tracerFromContext(ctx) if tracer == nil { // No tracing found, return noop span. return opentracing.NoopTracer{}.StartSpan(operationName), ctx } - var span opentracing.Span + var span Span if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { opts = append(opts, opentracing.ChildOf(parentSpan.Context())) } @@ -74,7 +82,7 @@ func DoInSpan(ctx context.Context, operationName string, doFn func(context.Conte // DoWithSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. // It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. -func DoWithSpan(ctx context.Context, operationName string, doFn func(context.Context, opentracing.Span), opts ...opentracing.StartSpanOption) { +func DoWithSpan(ctx context.Context, operationName string, doFn func(context.Context, Span), opts ...opentracing.StartSpanOption) { span, newCtx := StartSpan(ctx, operationName, opts...) defer span.Finish() doFn(newCtx, span)