diff --git a/CHANGELOG.md b/CHANGELOG.md index 284db3b71b..e1be2cc3bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message. - [#6760](https://github.com/thanos-io/thanos/pull/6760) Query Frontend: Added TLS support in `--query-frontend.downstream-tripper-config` and `--query-frontend.downstream-tripper-config-file` - [#6749](https://github.com/thanos-io/thanos/pull/6308) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache. +- [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label. +- [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway. ### Changed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 969e2baa1d..44c0003b1d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -217,9 +217,9 @@ func registerQuery(app *extkingpin.App) { queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Float64List() queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Float64List() - tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).Hidden().String() - defaultTenant := cmd.Flag("query.default-tenant", "Name of the default tenant.").Default(tenancy.DefaultTenant).Hidden().String() - tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Hidden().Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) + tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).String() + defaultTenant := cmd.Flag("query.default-tenant-id", "Default tenant ID to use if tenant header is not present").Default(tenancy.DefaultTenant).String() + tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) diff --git a/docs/components/query.md b/docs/components/query.md index 237eb83794..2c4f473464 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -360,6 +360,9 @@ Flags: = max(rangeSeconds / 250, defaultStep)). This will not work from Grafana, but Grafana has __step variable which can be used. + --query.default-tenant-id="default-tenant" + Default tenant ID to use if tenant header is + not present --query.lookback-delta=QUERY.LOOKBACK-DELTA The maximum lookback duration for retrieving metrics during expression evaluations. @@ -404,6 +407,14 @@ Flags: --query.telemetry.request-series-seconds-quantiles=10... ... The quantiles for exporting metrics about the series count quantiles. + --query.tenant-certificate-field= + Use TLS client's certificate field to determine + tenant for write requests. Must be one of + organization, organizationalUnit or commonName. + This setting will cause the query.tenant-header + flag value to be ignored. + --query.tenant-header="THANOS-TENANT" + HTTP header to determine tenant. --query.timeout=2m Maximum time to process query by query node. --request.logging-config= Alternative to 'request.logging-config-file' diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index f564fade72..f06e9ce54e 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -105,7 +105,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent // req2xx sends a request to the given url.URL. If method is http.MethodPost then // the raw query is encoded in the body and the appropriate Content-Type is set. -func (c *Client) req2xx(ctx context.Context, u *url.URL, method string) (_ []byte, _ int, err error) { +func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) { var b io.Reader if method == http.MethodPost { rq := u.RawQuery @@ -117,6 +117,10 @@ func (c *Client) req2xx(ctx context.Context, u *url.URL, method string) (_ []byt if err != nil { return nil, 0, errors.Wrapf(err, "create %s request", method) } + if headers != nil { + req.Header = headers + } + if c.userAgent != "" { req.Header.Set("User-Agent", c.userAgent) } @@ -166,7 +170,7 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe span, ctx := tracing.StartSpan(ctx, "/prom_config HTTP[client]") defer span.Finish() - body, _, err := c.req2xx(ctx, &u, http.MethodGet) + body, _, err := c.req2xx(ctx, &u, http.MethodGet, nil) if err != nil { return nil, err } @@ -363,6 +367,7 @@ type QueryOptions struct { MaxSourceResolution string Engine string Explain bool + HTTPHeaders http.Header } func (p *QueryOptions) AddTo(values url.Values) error { @@ -423,7 +428,7 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string, method = http.MethodGet } - body, _, err := c.req2xx(ctx, &u, method) + body, _, err := c.req2xx(ctx, &u, method, opts.HTTPHeaders) if err != nil { return nil, nil, nil, errors.Wrap(err, "read query instant response") } @@ -529,7 +534,7 @@ func (c *Client) QueryRange(ctx context.Context, base *url.URL, query string, st span, ctx := tracing.StartSpan(ctx, "/prom_query_range HTTP[client]") defer span.Finish() - body, _, err := c.req2xx(ctx, &u, http.MethodGet) + body, _, err := c.req2xx(ctx, &u, http.MethodGet, opts.HTTPHeaders) if err != nil { return nil, nil, nil, errors.Wrap(err, "read query range response") } @@ -612,7 +617,7 @@ func (c *Client) AlertmanagerAlerts(ctx context.Context, base *url.URL) ([]*mode span, ctx := tracing.StartSpan(ctx, "/alertmanager_alerts HTTP[client]") defer span.Finish() - body, _, err := c.req2xx(ctx, &u, http.MethodGet) + body, _, err := c.req2xx(ctx, &u, http.MethodGet, nil) if err != nil { return nil, err } @@ -643,7 +648,7 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error defer span.Finish() // We get status code 404 or 405 for prometheus versions lower than 2.14.0 - body, code, err := c.req2xx(ctx, &u, http.MethodGet) + body, code, err := c.req2xx(ctx, &u, http.MethodGet, nil) if err != nil { if code == http.StatusNotFound { return "0", nil @@ -675,7 +680,7 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string span, ctx := tracing.StartSpan(ctx, spanName) defer span.Finish() - body, code, err := c.req2xx(ctx, u, http.MethodGet) + body, code, err := c.req2xx(ctx, u, http.MethodGet, nil) if err != nil { if code, exists := statusToCode[code]; exists && code != 0 { return status.Error(code, err.Error()) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 2e40d010a3..01f32f4366 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -127,16 +127,16 @@ type bucketStoreMetrics struct { seriesDataFetched *prometheus.HistogramVec seriesDataSizeTouched *prometheus.HistogramVec seriesDataSizeFetched *prometheus.HistogramVec - seriesBlocksQueried prometheus.Histogram - seriesGetAllDuration prometheus.Histogram - seriesMergeDuration prometheus.Histogram - resultSeriesCount prometheus.Histogram - chunkSizeBytes prometheus.Histogram - postingsSizeBytes prometheus.Histogram + seriesBlocksQueried *prometheus.HistogramVec + seriesGetAllDuration *prometheus.HistogramVec + seriesMergeDuration *prometheus.HistogramVec + resultSeriesCount *prometheus.HistogramVec + chunkSizeBytes *prometheus.HistogramVec + postingsSizeBytes *prometheus.HistogramVec queriesDropped *prometheus.CounterVec - seriesRefetches prometheus.Counter - chunkRefetches prometheus.Counter - emptyPostingCount prometheus.Counter + seriesRefetches *prometheus.CounterVec + chunkRefetches *prometheus.CounterVec + emptyPostingCount *prometheus.CounterVec lazyExpandedPostingsCount prometheus.Counter lazyExpandedPostingSizeBytes prometheus.Counter @@ -145,18 +145,18 @@ type bucketStoreMetrics struct { cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec cachedPostingsCompressionTimeSeconds *prometheus.CounterVec - cachedPostingsOriginalSizeBytes prometheus.Counter - cachedPostingsCompressedSizeBytes prometheus.Counter + cachedPostingsOriginalSizeBytes *prometheus.CounterVec + cachedPostingsCompressedSizeBytes *prometheus.CounterVec - seriesFetchDuration prometheus.Histogram + seriesFetchDuration *prometheus.HistogramVec // Counts time for fetching series across all batches. - seriesFetchDurationSum prometheus.Histogram - postingsFetchDuration prometheus.Histogram + seriesFetchDurationSum *prometheus.HistogramVec + postingsFetchDuration *prometheus.HistogramVec // chunkFetchDuration counts total time loading chunks, but since we spawn // multiple goroutines the actual latency is usually much lower than it. - chunkFetchDuration prometheus.Histogram + chunkFetchDuration *prometheus.HistogramVec // Actual absolute total time for loading chunks. - chunkFetchDurationSum prometheus.Histogram + chunkFetchDurationSum *prometheus.HistogramVec } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -196,138 +196,138 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_series_data_touched", Help: "Number of items of a data type touched to fulfill a single Store API series request.", Buckets: prometheus.ExponentialBuckets(200, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) m.seriesDataFetched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_data_fetched", Help: "Number of items of a data type retrieved to fulfill a single Store API series request.", Buckets: prometheus.ExponentialBuckets(200, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) m.seriesDataSizeTouched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_data_size_touched_bytes", Help: "Total size of items of a data type touched to fulfill a single Store API series request in Bytes.", Buckets: prometheus.ExponentialBuckets(1024, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) m.seriesDataSizeFetched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_data_size_fetched_bytes", Help: "Total size of items of a data type fetched to fulfill a single Store API series request in Bytes.", Buckets: prometheus.ExponentialBuckets(1024, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) - m.seriesBlocksQueried = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.seriesBlocksQueried = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_blocks_queried", Help: "Number of blocks in a bucket store that were touched to satisfy a query.", Buckets: prometheus.ExponentialBuckets(1, 2, 10), - }) - m.seriesGetAllDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{tenancy.MetricLabel}) + m.seriesGetAllDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_get_all_duration_seconds", Help: "Time it takes until all per-block prepares and loads for a query are finished.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) - m.seriesMergeDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{tenancy.MetricLabel}) + m.seriesMergeDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_merge_duration_seconds", Help: "Time it takes to merge sub-results from all queried blocks into a single result.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) - m.resultSeriesCount = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{tenancy.MetricLabel}) + m.resultSeriesCount = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_result_series", Help: "Number of series observed in the final result of a query.", Buckets: prometheus.ExponentialBuckets(1, 2, 15), - }) + }, []string{tenancy.MetricLabel}) - m.chunkSizeBytes = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.chunkSizeBytes = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_sent_chunk_size_bytes", Help: "Size in bytes of the chunks for the single series, which is adequate to the gRPC message size sent to querier.", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, }, - }) + }, []string{tenancy.MetricLabel}) - m.postingsSizeBytes = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.postingsSizeBytes = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_postings_size_bytes", Help: "Size in bytes of the postings for a single series call.", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, 768 * 1024 * 1024, 1024 * 1024 * 1024, }, - }) + }, []string{tenancy.MetricLabel}) m.queriesDropped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the limit.", - }, []string{"reason"}) - m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"reason", tenancy.MetricLabel}) + m.seriesRefetches = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: "Total number of cases where configured estimated series bytes was not enough was to fetch series from index, resulting in refetch.", - }) - m.chunkRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{tenancy.MetricLabel}) + m.chunkRefetches = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_chunk_refetches_total", Help: "Total number of cases where configured estimated chunk bytes was not enough was to fetch chunks from object store, resulting in refetch.", - }) + }, []string{tenancy.MetricLabel}) m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compressions_total", Help: "Number of postings compressions before storing to index cache.", - }, []string{"op"}) - m.cachedPostingsCompressions.WithLabelValues(labelEncode) - m.cachedPostingsCompressions.WithLabelValues(labelDecode) + }, []string{"op", tenancy.MetricLabel}) + m.cachedPostingsCompressions.WithLabelValues(labelEncode, tenancy.DefaultTenant) + m.cachedPostingsCompressions.WithLabelValues(labelDecode, tenancy.DefaultTenant) m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_errors_total", Help: "Number of postings compression errors.", - }, []string{"op"}) - m.cachedPostingsCompressionErrors.WithLabelValues(labelEncode) - m.cachedPostingsCompressionErrors.WithLabelValues(labelDecode) + }, []string{"op", tenancy.MetricLabel}) + m.cachedPostingsCompressionErrors.WithLabelValues(labelEncode, tenancy.DefaultTenant) + m.cachedPostingsCompressionErrors.WithLabelValues(labelDecode, tenancy.DefaultTenant) m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_time_seconds_total", Help: "Time spent compressing postings before storing them into postings cache.", - }, []string{"op"}) - m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode) - m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode) + }, []string{"op", tenancy.MetricLabel}) + m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode, tenancy.DefaultTenant) + m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode, tenancy.DefaultTenant) - m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_original_size_bytes_total", Help: "Original size of postings stored into cache.", - }) - m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{tenancy.MetricLabel}) + m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total", Help: "Compressed size of postings stored into cache.", - }) + }, []string{tenancy.MetricLabel}) - m.seriesFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.seriesFetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_fetch_duration_seconds", Help: "The time it takes to fetch series to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.seriesFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.seriesFetchDurationSum = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_fetch_duration_sum_seconds", Help: "The total time it takes to fetch series to respond to a request sent to a store gateway across all series batches. It includes both the time to fetch it from the cache and from storage in case of cache misses.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.postingsFetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_postings_fetch_duration_seconds", Help: "The time it takes to fetch postings to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.chunkFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.chunkFetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_chunks_fetch_duration_seconds", Help: "The total time spent fetching chunks within a single request for one block.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.chunkFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.chunkFetchDurationSum = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_chunks_fetch_duration_sum_seconds", Help: "The total absolute time spent fetching chunks within a single request for one block.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.emptyPostingCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.emptyPostingCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_empty_postings_total", Help: "Total number of empty postings when fetching block series.", - }) + }, []string{tenancy.MetricLabel}) m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_postings_total", @@ -423,18 +423,18 @@ func (s *BucketStore) validate() error { type noopCache struct{} -func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte) {} -func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { +func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string) {} +func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) { return map[labels.Label][]byte{}, keys } -func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte) {} -func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher) ([]byte, bool) { +func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {} +func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) { return []byte{}, false } -func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {} -func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { +func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {} +func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { return map[storage.SeriesRef][]byte{}, ids } @@ -955,9 +955,10 @@ type blockSeriesClient struct { shardMatcher *storepb.ShardMatcher blockMatchers []*labels.Matcher calculateChunkHash bool - seriesFetchDurationSum prometheus.Histogram - chunkFetchDuration prometheus.Histogram - chunkFetchDurationSum prometheus.Histogram + seriesFetchDurationSum *prometheus.HistogramVec + chunkFetchDuration *prometheus.HistogramVec + chunkFetchDurationSum *prometheus.HistogramVec + tenant string // Internal state. i uint64 @@ -982,14 +983,15 @@ func newBlockSeriesClient( shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, - seriesFetchDurationSum prometheus.Histogram, - chunkFetchDuration prometheus.Histogram, - chunkFetchDurationSum prometheus.Histogram, + seriesFetchDurationSum *prometheus.HistogramVec, + chunkFetchDuration *prometheus.HistogramVec, + chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, lazyExpandedPostingsCount prometheus.Counter, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, + tenant string, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -1029,6 +1031,7 @@ func newBlockSeriesClient( calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, + tenant: tenant, } } @@ -1068,7 +1071,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1099,16 +1102,16 @@ func (b *blockSeriesClient) ExpandPostings( func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { for len(b.entries) == 0 && b.hasMorePostings { - if err := b.nextBatch(); err != nil { + if err := b.nextBatch(b.tenant); err != nil { return nil, err } } if len(b.entries) == 0 { - b.seriesFetchDurationSum.Observe(b.indexr.stats.SeriesDownloadLatencySum.Seconds()) + b.seriesFetchDurationSum.WithLabelValues(b.tenant).Observe(b.indexr.stats.SeriesDownloadLatencySum.Seconds()) if b.chunkr != nil { - b.chunkFetchDuration.Observe(b.chunkr.stats.ChunksFetchDurationSum.Seconds()) - b.chunkFetchDurationSum.Observe(b.chunkr.stats.ChunksDownloadLatencySum.Seconds()) + b.chunkFetchDuration.WithLabelValues(b.tenant).Observe(b.chunkr.stats.ChunksFetchDurationSum.Seconds()) + b.chunkFetchDurationSum.WithLabelValues(b.tenant).Observe(b.chunkr.stats.ChunksDownloadLatencySum.Seconds()) } return nil, io.EOF } @@ -1122,7 +1125,7 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { }), nil } -func (b *blockSeriesClient) nextBatch() error { +func (b *blockSeriesClient) nextBatch(tenant string) error { start := b.i end := start + uint64(b.batchSize) if end > uint64(len(b.lazyPostings.postings)) { @@ -1143,7 +1146,7 @@ func (b *blockSeriesClient) nextBatch() error { b.expandedPostings[i] = b.expandedPostings[i] / 16 } } - b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings), tenant) } return nil } @@ -1153,7 +1156,7 @@ func (b *blockSeriesClient) nextBatch() error { b.chunkr.reset() } - if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter); err != nil { + if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "preload series") } @@ -1227,7 +1230,7 @@ OUTER: } if !b.skipChunks { - if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "load chunks") } } @@ -1376,7 +1379,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } tenant, _ := tenancy.GetTenantFromGRPCMetadata(srv.Context()) - level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant) matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) if err != nil { @@ -1386,7 +1388,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store req.MaxTime = s.limitMaxTime(req.MaxTime) var ( - bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) ctx = srv.Context() stats = &queryStats{} respSets []respSet @@ -1394,8 +1396,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false ) @@ -1467,6 +1469,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, + tenant, ) defer blockClient.Close() @@ -1505,7 +1508,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blockClient, shardMatcher, false, - s.metrics.emptyPostingCount, + s.metrics.emptyPostingCount.WithLabelValues(tenant), nil, ) @@ -1521,28 +1524,28 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.mtx.RUnlock() defer func() { - s.metrics.seriesDataTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouched)) - s.metrics.seriesDataFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.PostingsTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.PostingsFetchedSizeSum)) - s.metrics.seriesDataTouched.WithLabelValues("series").Observe(float64(stats.seriesTouched)) - s.metrics.seriesDataFetched.WithLabelValues("series").Observe(float64(stats.seriesFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.SeriesTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.SeriesFetchedSizeSum)) - s.metrics.seriesDataTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.ChunksTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.ChunksFetchedSizeSum)) - s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) - s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressions)) - s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions)) - s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors)) - s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors)) - s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds()) - s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds()) - s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum)) - s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum)) - s.metrics.postingsSizeBytes.Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) + s.metrics.seriesDataTouched.WithLabelValues("postings", tenant).Observe(float64(stats.postingsTouched)) + s.metrics.seriesDataFetched.WithLabelValues("postings", tenant).Observe(float64(stats.postingsFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("postings", tenant).Observe(float64(stats.PostingsTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("postings", tenant).Observe(float64(stats.PostingsFetchedSizeSum)) + s.metrics.seriesDataTouched.WithLabelValues("series", tenant).Observe(float64(stats.seriesTouched)) + s.metrics.seriesDataFetched.WithLabelValues("series", tenant).Observe(float64(stats.seriesFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("series", tenant).Observe(float64(stats.SeriesTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("series", tenant).Observe(float64(stats.SeriesFetchedSizeSum)) + s.metrics.seriesDataTouched.WithLabelValues("chunks", tenant).Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataFetched.WithLabelValues("chunks", tenant).Observe(float64(stats.chunksFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", tenant).Observe(float64(stats.ChunksTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("chunks", tenant).Observe(float64(stats.ChunksFetchedSizeSum)) + s.metrics.resultSeriesCount.WithLabelValues(tenant).Observe(float64(stats.mergedSeriesCount)) + s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode, tenant).Add(float64(stats.cachedPostingsCompressions)) + s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode, tenant).Add(float64(stats.cachedPostingsDecompressions)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode, tenant).Add(float64(stats.cachedPostingsCompressionErrors)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode, tenant).Add(float64(stats.cachedPostingsDecompressionErrors)) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode, tenant).Add(stats.CachedPostingsCompressionTimeSum.Seconds()) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode, tenant).Add(stats.CachedPostingsDecompressionTimeSum.Seconds()) + s.metrics.cachedPostingsOriginalSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsOriginalSizeSum)) + s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum)) + s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) level.Debug(s.logger).Log("msg", "stats query processed", "request", req, @@ -1564,8 +1567,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } stats.blocksQueried = len(respSets) stats.GetAllDuration = time.Since(begin) - s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) - s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) + s.metrics.seriesGetAllDuration.WithLabelValues(tenant).Observe(stats.GetAllDuration.Seconds()) + s.metrics.seriesBlocksQueried.WithLabelValues(tenant).Observe(float64(stats.blocksQueried)) } // Merge the sub-results from each selected block. @@ -1593,7 +1596,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store stats.mergedSeriesCount++ if !req.SkipChunks { stats.mergedChunksCount += len(series.Chunks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + s.metrics.chunkSizeBytes.WithLabelValues(tenant).Observe(float64(chunksSize(series.Chunks))) } } if err = srv.Send(at); err != nil { @@ -1602,7 +1605,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } } stats.MergeDuration = time.Since(begin) - s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds()) + s.metrics.seriesMergeDuration.WithLabelValues(tenant).Observe(stats.MergeDuration.Seconds()) err = nil }) @@ -1648,7 +1651,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx) - level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant) resHints := &hintspb.LabelNamesResponseHints{} @@ -1672,8 +1674,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var mtx sync.Mutex var sets [][]string - var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { b := b @@ -1750,6 +1752,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, + tenant, ) defer blockClient.Close() @@ -1848,7 +1851,6 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx) - level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant) resHints := &hintspb.LabelValuesResponseHints{} @@ -1872,8 +1874,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var mtx sync.Mutex var sets [][]string - var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { b := b @@ -1953,6 +1955,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, + tenant, ) defer blockClient.Close() @@ -2394,14 +2397,14 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { return nil, nil } - hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) + hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant) if err != nil { return nil, err } @@ -2418,7 +2421,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch return nil, errors.Wrap(err, "matchersToPostingGroups") } if postingGroups == nil { - r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) + r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0, tenant) return nil, nil } i := 0 @@ -2446,13 +2449,13 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } // If postings still have matchers to be applied lazily, cache expanded postings after filtering series so skip here. if !ps.lazyExpanded() { - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings), tenant) } if len(ps.postings) > 0 { @@ -2743,8 +2746,8 @@ type postingPtr struct { ptr index.Range } -func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) { - dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms) +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { + dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) if !hit { return false, nil, nil } @@ -2788,7 +2791,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, return true, ps, nil } -func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int) { +func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int, tenant string) { // Encode postings to cache. We compress and cache postings before adding // 16 bytes padding in order to make compressed size smaller. dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(ps, length) @@ -2797,7 +2800,7 @@ func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, p r.stats.CachedPostingsCompressionTimeSum += compressionDuration r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(length * 4) // Estimate the posting list size. - r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache) + r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant) } var bufioReaderPool = sync.Pool{ @@ -2809,10 +2812,10 @@ var bufioReaderPool = sync.Pool{ // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { var closeFns []func() - timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) + timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) defer timer.ObserveDuration() var ptrs []postingPtr @@ -2820,7 +2823,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab output := make([]index.Postings, len(keys)) // Fetch postings from the cache with a single call. - fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) + fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) @@ -2938,7 +2941,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression) r.mtx.Unlock() - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache) + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant) } r.mtx.Lock() @@ -3049,8 +3052,8 @@ func (it *bigEndianPostings) length() int { return len(it.list) / 4 } -func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error { - timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) +func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string) error { + timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration.WithLabelValues(tenant)) defer func() { d := timer.ObserveDuration() r.stats.SeriesDownloadLatencySum += d @@ -3058,7 +3061,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser // Load series from cache, overwriting the list of ids to preload // with the missing ones. - fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids) + fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant) for id, b := range fromCache { r.loadedSeries[id] = b if err := bytesLimiter.Reserve(uint64(len(b))); err != nil { @@ -3077,13 +3080,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser i, j := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { - return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter) + return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant) }) } return g.Wait() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string) error { begin := time.Now() if bytesLimiter != nil { @@ -3120,16 +3123,16 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series } // Inefficient, but should be rare. - r.block.metrics.seriesRefetches.Inc() + r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc() level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. - return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter) + return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant) } c = c[n : n+int(l)] r.mtx.Lock() r.loadedSeries[id] = c - r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c) + r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant) r.mtx.Unlock() } return nil @@ -3367,7 +3370,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) } // load loads all added chunks and saves resulting aggrs to refs. -func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { r.loadingChunksMtx.Lock() r.loadingChunks = true r.loadingChunksMtx.Unlock() @@ -3405,7 +3408,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ p := p indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter) + return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant) }) } } @@ -3414,7 +3417,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { var locked bool fetchBegin := time.Now() defer func() { @@ -3503,7 +3506,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a continue } - r.block.metrics.chunkRefetches.Inc() + r.block.metrics.chunkRefetches.WithLabelValues(tenant).Inc() // If we didn't fetch enough data for the chunk, fetch more. fetchBegin = time.Now() // Read entire chunk into new buffer. diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index ebd1ffa709..a78b81877f 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -59,28 +59,28 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } -func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.ptr.StorePostings(blockID, l, v) +func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.ptr.StorePostings(blockID, l, v, tenant) } -func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { - return c.ptr.FetchMultiPostings(ctx, blockID, keys) +func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) { + return c.ptr.FetchMultiPostings(ctx, blockID, keys, tenant) } -func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { - c.ptr.StoreExpandedPostings(blockID, matchers, v) +func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant) } -func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - return c.ptr.FetchExpandedPostings(ctx, blockID, matchers) +func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant) } -func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.ptr.StoreSeries(blockID, id, v) +func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.ptr.StoreSeries(blockID, id, v, tenant) } -func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { - return c.ptr.FetchMultiSeries(ctx, blockID, ids) +func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { + return c.ptr.FetchMultiSeries(ctx, blockID, ids, tenant) } type storeSuite struct { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f26bdd55b8..c77c9f9f91 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -60,6 +60,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -1017,31 +1018,31 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { } // Success with no refetches. - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil))) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), 24: []byte("cccccccccc"), }, r.loadedSeries) - testutil.Equals(t, float64(0), promtest.ToFloat64(s.seriesRefetches)) + testutil.Equals(t, float64(0), promtest.ToFloat64(s.seriesRefetches.WithLabelValues(tenancy.DefaultTenant))) // Success with 2 refetches. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil))) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), 24: []byte("cccccccccc"), }, r.loadedSeries) - testutil.Equals(t, float64(2), promtest.ToFloat64(s.seriesRefetches)) + testutil.Equals(t, float64(2), promtest.ToFloat64(s.seriesRefetches.WithLabelValues(tenancy.DefaultTenant))) // Success with refetch on first element. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil))) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), }, r.loadedSeries) - testutil.Equals(t, float64(3), promtest.ToFloat64(s.seriesRefetches)) + testutil.Equals(t, float64(3), promtest.ToFloat64(s.seriesRefetches.WithLabelValues(tenancy.DefaultTenant))) buf.Reset() buf.PutByte(0) @@ -1051,7 +1052,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) // Fail, but no recursion at least. - testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil))) + testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { @@ -1236,7 +1237,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1271,7 +1272,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") ctx := context.Background() dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -1488,7 +1489,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, for _, b := range st.blocks { // NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series. - testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches)) + testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches.WithLabelValues(tenancy.DefaultTenant))) } } } @@ -2754,7 +2755,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet testutil.Ok(b, err) sortedMatchers := newSortedMatchers(matchers) - dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{}) + dummyHistogram := promauto.NewHistogramVec(prometheus.HistogramOpts{}, []string{tenancy.MetricLabel}) blockClient := newBlockSeriesClient( ctx, nil, @@ -2774,6 +2775,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet dummyCounter, dummyCounter, dummyCounter, + tenancy.DefaultTenant, ) testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter)) defer blockClient.Close() @@ -3364,7 +3366,7 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter) + refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index 360cdd67e5..0811d89cc0 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -15,6 +15,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "golang.org/x/crypto/blake2b" + + "github.com/thanos-io/thanos/pkg/tenancy" ) const ( @@ -36,24 +38,24 @@ var ( // (potentially with a deadline) as in the original user's request. type IndexCache interface { // StorePostings stores postings for a single series. - StorePostings(blockID ulid.ULID, l labels.Label, v []byte) + StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. - FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) + FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) // StoreExpandedPostings stores expanded postings for a set of label matchers. - StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) + StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. - FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) + FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) // StoreSeries stores a single series. - StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) + StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. - FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) + FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) } // Common metrics that should be used by all cache implementations. @@ -69,23 +71,23 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { requestTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_requests_total", Help: "Total number of items requests to the cache.", - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), hitsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_hits_total", Help: "Total number of items requests to the cache that were a hit.", - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), dataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_store_index_cache_stored_data_size_bytes", Help: "Histogram to track item data size stored in index cache", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 64 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, }, - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_store_index_cache_fetch_duration_seconds", Help: "Histogram to track latency to fetch items from index cache", Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 30, 45, 60, 90, 120}, - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), } } diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index e0077acc35..3312c5faa0 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/tenancy" ) var ( @@ -115,9 +116,9 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMet c.added.WithLabelValues(cacheTypeSeries) c.added.WithLabelValues(cacheTypeExpandedPostings) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings) + c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_items_overflowed_total", @@ -127,9 +128,9 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMet c.overflow.WithLabelValues(cacheTypeSeries) c.overflow.WithLabelValues(cacheTypeExpandedPostings) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings) + c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) c.current = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_items", @@ -197,8 +198,8 @@ func (c *InMemoryIndexCache) onEvict(key, val interface{}) { c.curSize -= entrySize } -func (c *InMemoryIndexCache) get(typ string, key cacheKey) ([]byte, bool) { - c.commonMetrics.requestTotal.WithLabelValues(typ).Inc() +func (c *InMemoryIndexCache) get(typ string, key cacheKey, tenant string) ([]byte, bool) { + c.commonMetrics.requestTotal.WithLabelValues(typ, tenant).Inc() c.mtx.Lock() defer c.mtx.Unlock() @@ -207,7 +208,7 @@ func (c *InMemoryIndexCache) get(typ string, key cacheKey) ([]byte, bool) { if !ok { return nil, false } - c.commonMetrics.hitsTotal.WithLabelValues(typ).Inc() + c.commonMetrics.hitsTotal.WithLabelValues(typ, tenant).Inc() return v.([]byte), true } @@ -294,22 +295,22 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings).Observe(float64(len(v))) +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. -func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings)) +func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings, tenant)) defer timer.ObserveDuration() hits = map[labels.Label][]byte{} blockIDKey := blockID.String() for _, key := range keys { - if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}); ok { + if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}, tenant); ok { hits[key] = b continue } @@ -321,17 +322,17 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. } // StoreExpandedPostings stores expanded postings for a set of label matchers. -func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(len(v))) +func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v) } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. -func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings)) +func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() - if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { + if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}, tenant); ok { return b, true } return nil, false @@ -339,22 +340,22 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries).Observe(float64(len(v))) +func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. -func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries)) +func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries, tenant)) defer timer.ObserveDuration() hits = map[storage.SeriesRef][]byte{} blockIDKey := blockID.String() for _, id := range ids { - if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}); ok { + if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}, tenant); ok { hits[id] = b continue } diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index 16b76a20ea..1460993079 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -20,6 +20,8 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/efficientgo/core/testutil" + + "github.com/thanos-io/thanos/pkg/tenancy" ) func TestNewInMemoryIndexCache(t *testing.T) { @@ -79,14 +81,14 @@ func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) { testutil.Ok(t, err) cache.lru = l - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}, tenancy.DefaultTenant) testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) // This triggers deadlock logic. - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}, tenancy.DefaultTenant) testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -132,9 +134,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }{ { typ: cacheTypePostings, - set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b) }, + set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, tenancy.DefaultTenant) }, get: func(id storage.SeriesRef) ([]byte, bool) { - hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}) + hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, tenancy.DefaultTenant) b, ok := hits[lbl] return b, ok @@ -142,9 +144,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }, { typ: cacheTypeSeries, - set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b) }, + set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b, tenancy.DefaultTenant) }, get: func(id storage.SeriesRef) ([]byte, bool) { - hits, _ := cache.FetchMultiSeries(ctx, uid(id), []storage.SeriesRef{id}) + hits, _ := cache.FetchMultiSeries(ctx, uid(id), []storage.SeriesRef{id}, tenancy.DefaultTenant) b, ok := hits[id] return b, ok @@ -153,10 +155,10 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { { typ: cacheTypeExpandedPostings, set: func(id storage.SeriesRef, b []byte) { - cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b) + cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant) }, get: func(id storage.SeriesRef) ([]byte, bool) { - return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}) + return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant) }, }, } { @@ -220,9 +222,9 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { id := ulid.MustNew(0, nil) - cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}) - cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}) - cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}) + cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}, tenancy.DefaultTenant) + cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}, tenancy.DefaultTenant) + cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}, tenancy.DefaultTenant) testutil.Equals(t, uint64(2*sliceHeaderSize+4), cache.curSize) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) @@ -231,10 +233,10 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(3), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings))) - testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings))) - testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) } func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { @@ -253,12 +255,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { emptySeriesHits := map[storage.SeriesRef][]byte{} emptySeriesMisses := []storage.SeriesRef(nil) - pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) + pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, tenancy.DefaultTenant) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) // Add sliceHeaderSize + 2 bytes. - cache.StorePostings(id, lbls, []byte{42, 33}) + cache.StorePostings(id, lbls, []byte{42, 33}, tenancy.DefaultTenant) testutil.Equals(t, uint64(sliceHeaderSize+2), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -271,20 +273,20 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, tenancy.DefaultTenant) testutil.Equals(t, map[labels.Label][]byte{lbls: {42, 33}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}, tenancy.DefaultTenant) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}, tenancy.DefaultTenant) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{{Name: "test", Value: "124"}}, pMisses) // Add sliceHeaderSize + 3 more bytes. - cache.StoreSeries(id, 1234, []byte{222, 223, 224}) + cache.StoreSeries(id, 1234, []byte{222, 223, 224}, tenancy.DefaultTenant) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -297,7 +299,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - sHits, sMisses := cache.FetchMultiSeries(ctx, id, []storage.SeriesRef{1234}) + sHits, sMisses := cache.FetchMultiSeries(ctx, id, []storage.SeriesRef{1234}, tenancy.DefaultTenant) testutil.Equals(t, map[storage.SeriesRef][]byte{1234: {222, 223, 224}}, sHits, "key exists") testutil.Equals(t, emptySeriesMisses, sMisses) @@ -308,7 +310,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { for i := 0; i < sliceHeaderSize; i++ { v = append(v, 3) } - cache.StorePostings(id, lbls2, v) + cache.StorePostings(id, lbls2, v, tenancy.DefaultTenant) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -323,20 +325,20 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) // Eviction. // Evicted. - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, tenancy.DefaultTenant) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) - sHits, sMisses = cache.FetchMultiSeries(ctx, id, []storage.SeriesRef{1234}) + sHits, sMisses = cache.FetchMultiSeries(ctx, id, []storage.SeriesRef{1234}, tenancy.DefaultTenant) testutil.Equals(t, emptySeriesHits, sHits, "no such key") testutil.Equals(t, []storage.SeriesRef{1234}, sMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, tenancy.DefaultTenant) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add same item again. - cache.StorePostings(id, lbls2, v) + cache.StorePostings(id, lbls2, v, tenancy.DefaultTenant) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -350,12 +352,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, tenancy.DefaultTenant) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add too big item. - cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5)) + cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5), tenancy.DefaultTenant) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(2*sliceHeaderSize+5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -388,7 +390,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { lbls3 := labels.Label{Name: "test", Value: "124"} - cache.StorePostings(id, lbls3, []byte{}) + cache.StorePostings(id, lbls3, []byte{}, tenancy.DefaultTenant) testutil.Equals(t, uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -402,13 +404,13 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}, tenancy.DefaultTenant) testutil.Equals(t, map[labels.Label][]byte{lbls3: {}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) // nil works and still allocates empty slice. lbls4 := labels.Label{Name: "test", Value: "125"} - cache.StorePostings(id, lbls4, []byte(nil)) + cache.StorePostings(id, lbls4, []byte(nil), tenancy.DefaultTenant) testutil.Equals(t, 2*uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(2), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -422,15 +424,15 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}, tenancy.DefaultTenant) testutil.Equals(t, map[labels.Label][]byte{lbls4: {}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) // Other metrics. testutil.Equals(t, float64(4), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(9), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings))) - testutil.Equals(t, float64(2), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(5), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings))) - testutil.Equals(t, float64(1), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(9), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) + testutil.Equals(t, float64(5), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) } diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 104b936e8c..bc8bb5b52c 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/tenancy" ) const ( @@ -33,18 +34,10 @@ type RemoteIndexCache struct { compressionScheme string // Metrics. - postingRequests prometheus.Counter - seriesRequests prometheus.Counter - expandedPostingRequests prometheus.Counter - postingHits prometheus.Counter - seriesHits prometheus.Counter - expandedPostingHits prometheus.Counter - postingDataSizeBytes prometheus.Observer - expandedPostingDataSizeBytes prometheus.Observer - seriesDataSizeBytes prometheus.Observer - postingsFetchDuration prometheus.Observer - expandedPostingsFetchDuration prometheus.Observer - seriesFetchDuration prometheus.Observer + requestTotal *prometheus.CounterVec + hitsTotal *prometheus.CounterVec + dataSizeBytes *prometheus.HistogramVec + fetchLatency *prometheus.HistogramVec } // NewRemoteIndexCache makes a new RemoteIndexCache. @@ -59,21 +52,23 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli commonMetrics = newCommonMetrics(reg) } - c.postingRequests = commonMetrics.requestTotal.WithLabelValues(cacheTypePostings) - c.seriesRequests = commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries) - c.expandedPostingRequests = commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings) + c.requestTotal = commonMetrics.requestTotal + c.hitsTotal = commonMetrics.hitsTotal + c.dataSizeBytes = commonMetrics.dataSizeBytes + c.fetchLatency = commonMetrics.fetchLatency - c.postingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings) - c.seriesHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries) - c.expandedPostingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings) + // Init requestTtotal and hitsTotal with default tenant + c.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) - c.postingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings) - c.seriesDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries) - c.expandedPostingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings) + c.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) - c.postingsFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings) - c.seriesFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries) - c.expandedPostingsFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings) + c.fetchLatency.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) level.Info(logger).Log("msg", "created index cache") @@ -83,8 +78,8 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // StorePostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.postingDataSizeBytes.Observe(float64(len(v))) +func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -94,8 +89,8 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - timer := prometheus.NewTimer(c.postingsFetchDuration) +func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypePostings, tenant)) defer timer.ObserveDuration() keys := make([]string, 0, len(lbls)) @@ -107,7 +102,8 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. } // Fetch the keys from memcached in a single request. - c.postingRequests.Add(float64(len(keys))) + c.requestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(len(keys))) + results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { return nil, lbls @@ -127,16 +123,15 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. hits[lbl] = value } - - c.postingHits.Add(float64(len(hits))) + c.hitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(len(hits))) return hits, misses } // StoreExpandedPostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) { - c.expandedPostingDataSizeBytes.Observe(float64(len(v))) +func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte, tenant string) { + c.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { @@ -147,20 +142,20 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe // FetchExpandedPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) { - timer := prometheus.NewTimer(c.expandedPostingsFetchDuration) +func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), c.compressionScheme}.string() // Fetch the keys from memcached in a single request. - c.expandedPostingRequests.Add(1) + c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Add(1) results := c.memcached.GetMulti(ctx, []string{key}) if len(results) == 0 { return nil, false } if res, ok := results[key]; ok { - c.expandedPostingHits.Add(1) + c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Add(1) return res, true } return nil, false @@ -169,8 +164,8 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.seriesDataSizeBytes.Observe(float64(len(v))) +func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { @@ -181,8 +176,8 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - timer := prometheus.NewTimer(c.seriesFetchDuration) +func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypeSeries, tenant)) defer timer.ObserveDuration() keys := make([]string, 0, len(ids)) @@ -194,7 +189,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL } // Fetch the keys from memcached in a single request. - c.seriesRequests.Add(float64(len(ids))) + c.requestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(len(ids))) results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { return nil, ids @@ -214,8 +209,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL hits[id] = value } - - c.seriesHits.Add(float64(len(hits))) + c.hitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(len(hits))) return hits, misses } diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go index cda095a853..b2b9896cc8 100644 --- a/pkg/store/cache/memcached_test.go +++ b/pkg/store/cache/memcached_test.go @@ -16,6 +16,8 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/efficientgo/core/testutil" + + "github.com/thanos-io/thanos/pkg/tenancy" ) func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { @@ -92,19 +94,19 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StorePostings(p.block, p.label, p.value) + c.StorePostings(p.block, p.label, p.value, tenancy.DefaultTenant) } // Fetch postings from cached and assert on it. - hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels) + hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels, tenancy.DefaultTenant) testutil.Equals(t, testData.expectedHits, hits) testutil.Equals(t, testData.expectedMisses, misses) // Assert on metrics. - testutil.Equals(t, float64(len(testData.fetchLabels)), prom_testutil.ToFloat64(c.postingRequests)) - testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.postingHits)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.seriesRequests)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.seriesHits)) + testutil.Equals(t, float64(len(testData.fetchLabels)), prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) }) } } @@ -173,25 +175,25 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StoreExpandedPostings(p.block, p.matchers, p.value) + c.StoreExpandedPostings(p.block, p.matchers, p.value, tenancy.DefaultTenant) } // Fetch postings from cached and assert on it. - val, hit := c.FetchExpandedPostings(ctx, testData.fetchBlockID, testData.fetchMatchers) + val, hit := c.FetchExpandedPostings(ctx, testData.fetchBlockID, testData.fetchMatchers, tenancy.DefaultTenant) testutil.Equals(t, testData.expectedHit, hit) if hit { testutil.Equals(t, testData.expectedValue, val) } // Assert on metrics. - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.expandedPostingRequests)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant))) if testData.expectedHit { - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.expandedPostingHits)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant))) } - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.postingRequests)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.postingHits)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.seriesRequests)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.seriesHits)) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) }) } } @@ -268,19 +270,19 @@ func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) { // Store the series expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StoreSeries(p.block, p.id, p.value) + c.StoreSeries(p.block, p.id, p.value, tenancy.DefaultTenant) } // Fetch series from cached and assert on it. - hits, misses := c.FetchMultiSeries(ctx, testData.fetchBlockID, testData.fetchIds) + hits, misses := c.FetchMultiSeries(ctx, testData.fetchBlockID, testData.fetchIds, tenancy.DefaultTenant) testutil.Equals(t, testData.expectedHits, hits) testutil.Equals(t, testData.expectedMisses, misses) // Assert on metrics. - testutil.Equals(t, float64(len(testData.fetchIds)), prom_testutil.ToFloat64(c.seriesRequests)) - testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.seriesHits)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.postingRequests)) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.postingHits)) + testutil.Equals(t, float64(len(testData.fetchIds)), prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) + testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant))) }) } } diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 2e02836c0c..4fb4a155f9 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -148,6 +148,7 @@ func fetchLazyExpandedPostings( addAllPostings bool, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, + tenant string, ) (*lazyExpandedPostings, error) { var ( err error @@ -178,7 +179,7 @@ func fetchLazyExpandedPostings( } } - ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant) if err != nil { return nil, err } @@ -220,9 +221,9 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label return keys, lazyMatchers } -func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) { keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant) defer func() { for _, closeFn := range closeFns { closeFn() diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index 13775cf6e1..4a874855fc 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -24,6 +24,8 @@ const ( DefaultTenantLabel = "tenant_id" // This key is used to pass tenant information using Context. TenantKey contextKey = 0 + // MetricLabel is the label name used for adding tenant information to exported metrics. + MetricLabel = "tenant" ) // Allowed fields in client certificates. diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 9fec13287b..e7ac78f67e 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -37,6 +37,7 @@ import ( "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -135,9 +136,13 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) t.Run("query works", func(t *testing.T) { + tenant1Header := make(http.Header) + tenant1Header.Add("thanos-tenant", "test-tenant-1") queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return fmt.Sprintf("%s @ end()", testQuery) }, time.Now, promclient.QueryOptions{ Deduplicate: false, + HTTPHeaders: tenant1Header, + // map[string][]string{"thanos-tenant": "test-tenant-1"}, }, []model.Metric{ { @@ -166,9 +171,17 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_fetched")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_series_blocks_queried")) + // Test per tenant store metrics + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_fetched"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_blocks_queried"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")))) + + tenant2Header := make(http.Header) + tenant2Header.Add("thanos-tenant", "test-tenant-2") queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, time.Now, promclient.QueryOptions{ Deduplicate: true, + HTTPHeaders: tenant2Header, }, []model.Metric{ { @@ -187,6 +200,17 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(18), "thanos_bucket_store_series_data_touched")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_fetched")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3+3), "thanos_bucket_store_series_blocks_queried")) + + // Test tenant some tenant specific store metrics + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-2")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_data_fetched"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-2")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_blocks_queried"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-2")))) + + // the first tenants metrics should be unaffected by the additional query + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_fetched"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_blocks_queried"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")))) + }) t.Run("remove meta.json from id1 block", func(t *testing.T) { testutil.Ok(t, bkt.Delete(ctx, filepath.Join(id1.String(), block.MetaFilename)))