diff --git a/CHANGELOG.md b/CHANGELOG.md index 014a220550..2383be3121 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5255](https://github.com/thanos-io/thanos/pull/5296) Query: Use k-way merging for the proxying logic. The proxying sub-system now uses much less resources (~25-80% less CPU usage, ~30-50% less RAM usage according to our benchmarks). Reduces query duration by a few percent on queries with lots of series. - [#5690](https://github.com/thanos-io/thanos/pull/5690) Compact: update `--debug.accept-malformed-index` flag to apply to downsampling. Previously the flag only applied to compaction, and fatal errors would still occur when downsampling was attempted. - [#5707](https://github.com/thanos-io/thanos/pull/5707) Objstore: Update objstore to latest version which includes a refactored Azure Storage Account implementation with a new SDK. +- [#5685](https://github.com/thanos-io/thanos/pull/5685) Receive: Make active/head series limiting configuration per tenant by adding it to new limiting config. ### Removed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c6e0b8ed56..4733fa26a1 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -5,7 +5,6 @@ package main import ( "context" - "net/url" "os" "path" "strings" @@ -205,9 +204,7 @@ func runReceive( return errors.Wrap(err, "parse limit configuration") } } - - // Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided. - seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 + limiter := receive.NewLimiter(limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) dbs := receive.NewMultiTSDB( conf.dataDir, @@ -222,28 +219,23 @@ func runReceive( ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Writer: writer, - ListenAddress: conf.rwAddress, - Registry: reg, - Endpoint: conf.endpoint, - TenantHeader: conf.tenantHeader, - TenantField: conf.tenantField, - DefaultTenantID: conf.defaultTenantID, - ReplicaHeader: conf.replicaHeader, - ReplicationFactor: conf.replicationFactor, - RelabelConfigs: relabelConfig, - ReceiverMode: receiveMode, - Tracer: tracer, - TLSConfig: rwTLSConfig, - DialOpts: dialOpts, - ForwardTimeout: time.Duration(*conf.forwardTimeout), - TSDBStats: dbs, - LimitsConfig: limitsConfig, - SeriesLimitSupported: seriesLimitSupported, - MaxPerTenantLimit: conf.maxPerTenantLimit, - MetaMonitoringUrl: conf.metaMonitoringUrl, - MetaMonitoringHttpClient: conf.metaMonitoringHttpClient, - MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery, + Writer: writer, + ListenAddress: conf.rwAddress, + Registry: reg, + Endpoint: conf.endpoint, + TenantHeader: conf.tenantHeader, + TenantField: conf.tenantField, + DefaultTenantID: conf.defaultTenantID, + ReplicaHeader: conf.replicaHeader, + ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, + ReceiverMode: receiveMode, + Tracer: tracer, + TLSConfig: rwTLSConfig, + DialOpts: dialOpts, + ForwardTimeout: time.Duration(*conf.forwardTimeout), + TSDBStats: dbs, + Limiter: limiter, }) grpcProbe := prober.NewGRPC() @@ -373,13 +365,13 @@ func runReceive( ) } - if seriesLimitSupported { + if limitsConfig.AreHeadSeriesLimitsConfigured() { level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache") { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(15*time.Second, ctx.Done(), func() error { - if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil { + if err := limiter.HeadSeriesLimiter.QueryMetaMonitoring(ctx); err != nil { level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error()) } return nil @@ -737,11 +729,6 @@ type receiveConfig struct { rwClientServerCA string rwClientServerName string - maxPerTenantLimit uint64 - metaMonitoringLimitQuery string - metaMonitoringUrl *url.URL - metaMonitoringHttpClient *extflag.PathOrContent - dataDir string labelStrs []string @@ -842,14 +829,6 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor) - cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit) - - cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl) - - cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery) - - rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent(cmd, "receive.tenant-limits.meta-monitoring-client", "YAML file or string with http client configs for meta-monitoring.", extflag.WithHidden()) - rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution()) diff --git a/docs/components/receive.md b/docs/components/receive.md index c292de6369..9fc1440e91 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -101,16 +101,17 @@ The configuration file follows a few standards: All the configuration for the remote write endpoint of Receive is contained in the `write` key. Inside it there are 3 subsections: -- `global`: limits and/or gates that are applied considering all the requests. +- `global`: limits, gates and/or options that are applied considering all the requests. - `default`: the default values for limits in case a given tenant doesn't have any specified. - `tenants`: the limits for a given tenant. -From the example configuration below, it's understood that: +For a Receive instance with configuration like below, it's understood that: -1. This Receive instance has a max concurrency of 30. -2. This Receive instance has some default request limits that apply of all tenants, **unless** a given tenant has their own limits (i.e. the `acme` tenant and partially for the `ajax` tenant). -3. Tenant `acme` has no request limits. -4. Tenant `ajax` has a request series limit of 50000 and samples limit of 500. Their request size bytes limit is inherited from the default, 1024 bytes. +1. The Receive instance has a max concurrency of 30. +2. The Receive instance has head series limiting enabled as it has `meta_monitoring_.*` options in `global`. +3. The Receive instance has some default request limits as well as head series limits that apply of all tenants, **unless** a given tenant has their own limits (i.e. the `acme` tenant and partially for the `ajax` tenant). +4. Tenant `acme` has no request limits, but has a higher head_series limit. +5. Tenant `ajax` has a request series limit of 50000 and samples limit of 500. Their request size bytes limit is inherited from the default, 1024 bytes. Their head series are also inherited from default i.e, 1000. The next sections explain what each configuration value means. @@ -118,17 +119,21 @@ The next sections explain what each configuration value means. write: global: max_concurrency: 30 + meta_monitoring_url: "http://localhost:9090" + meta_monitoring_limit_query: "sum(prometheus_tsdb_head_series) by (tenant)" default: request: size_bytes_limit: 1024 series_limit: 1000 samples_limit: 10 + head_series_limit: 1000 tenants: acme: request: size_bytes_limit: 0 series_limit: 0 samples_limit: 0 + head_series_limit: 2000 ajax: request: series_limit: 50000 @@ -168,11 +173,15 @@ Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant activ Every Receive Router/RouterIngestor node, queries meta-monitoring for active series of all tenants, every 15 seconds, and caches the results in a map. This cached result is used to limit all incoming remote write requests. -To use the feature, one should specify the following (hidden) flags: -- `--receive.tenant-limits.max-head-series`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. -- `--receive.tenant-limits.meta-monitoring-url`: Specifies Prometheus Query API compatible meta-monitoring endpoint. -- `--receive.tenant-limits.meta-monitoring-query`: Optional flag to specify PromQL query to execute against meta-monitoring. -- `--receive.tenant-limits.meta-monitoring-client`: Optional YAML file/string specifying HTTP client config for meta-monitoring. +To use the feature, one should specify the following limiting config options: + +Under `global`: +- `meta_monitoring_url`: Specifies Prometheus Query API compatible meta-monitoring endpoint. +- `meta_monitoring_limit_query`: Option to specify PromQL query to execute against meta-monitoring. If not specified it is set to `sum(prometheus_tsdb_head_series) by (tenant)` by default. +- `meta_monitoring_http_client`: Optional YAML field specifying HTTP client config for meta-monitoring. + +Under `default` and per `tenant`: +- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. NOTE: - It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times. diff --git a/pkg/httpconfig/http.go b/pkg/httpconfig/http.go index ed5fdda13a..de5dc6b3b1 100644 --- a/pkg/httpconfig/http.go +++ b/pkg/httpconfig/http.go @@ -97,6 +97,10 @@ var defaultTransportConfig TransportConfig = TransportConfig{ TLSHandshakeTimeout: int64(10 * time.Second), } +func NewDefaultClientConfig() ClientConfig { + return ClientConfig{TransportConfig: defaultTransportConfig} +} + func NewClientConfigFromYAML(cfg []byte) (*ClientConfig, error) { conf := &ClientConfig{TransportConfig: defaultTransportConfig} if err := yaml.Unmarshal(cfg, conf); err != nil { diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index cd25c464c6..156bb74566 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -12,18 +12,14 @@ import ( stdlog "log" "net" "net/http" - "net/url" "sort" "strconv" "sync" "time" - extflag "github.com/efficientgo/tools/extkingpin" "github.com/thanos-io/thanos/pkg/api" statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/logging" - "github.com/thanos-io/thanos/pkg/promclient" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -87,34 +83,23 @@ var ( // Options for the web Handler. type Options struct { - Writer *Writer - ListenAddress string - Registry *prometheus.Registry - TenantHeader string - TenantField string - DefaultTenantID string - ReplicaHeader string - Endpoint string - ReplicationFactor uint64 - ReceiverMode ReceiverMode - Tracer opentracing.Tracer - TLSConfig *tls.Config - DialOpts []grpc.DialOption - ForwardTimeout time.Duration - RelabelConfigs []*relabel.Config - TSDBStats TSDBStats - LimitsConfig *RootLimitsConfig - SeriesLimitSupported bool - MaxPerTenantLimit uint64 - MetaMonitoringUrl *url.URL - MetaMonitoringHttpClient *extflag.PathOrContent - MetaMonitoringLimitQuery string -} - -// activeSeriesLimiter encompasses active series limiting logic. -type activeSeriesLimiter interface { - QueryMetaMonitoring(context.Context, log.Logger) error - isUnderLimit(string, log.Logger) (bool, error) + Writer *Writer + ListenAddress string + Registry *prometheus.Registry + TenantHeader string + TenantField string + DefaultTenantID string + ReplicaHeader string + Endpoint string + ReplicationFactor uint64 + ReceiverMode ReceiverMode + Tracer opentracing.Tracer + TLSConfig *tls.Config + DialOpts []grpc.DialOption + ForwardTimeout time.Duration + RelabelConfigs []*relabel.Config + TSDBStats TSDBStats + Limiter *limiter } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -125,13 +110,12 @@ type Handler struct { options *Options listener net.Listener - mtx sync.RWMutex - hashring Hashring - peers *peerGroup - expBackoff backoff.Backoff - peerStates map[string]*retryState - receiverMode ReceiverMode - ActiveSeriesLimit activeSeriesLimiter + mtx sync.RWMutex + hashring Hashring + peers *peerGroup + expBackoff backoff.Backoff + peerStates map[string]*retryState + receiverMode ReceiverMode forwardRequests *prometheus.CounterVec replications *prometheus.CounterVec @@ -166,7 +150,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Max: 30 * time.Second, Jitter: true, }, - limiter: newLimiter(o.LimitsConfig, registerer), + limiter: o.Limiter, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -216,11 +200,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { h.replicationFactor.Set(1) } - h.ActiveSeriesLimit = NewNopSeriesLimit() - if h.options.SeriesLimitSupported { - h.ActiveSeriesLimit = NewActiveSeriesLimit(h.options, registerer, h.receiverMode, logger) - } - ins := extpromhttp.NewNopInstrumentationMiddleware() if o.Registry != nil { ins = extpromhttp.NewTenantInstrumentationMiddleware( @@ -438,7 +417,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } defer h.limiter.writeGate.Done() - under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger) + under, err := h.limiter.HeadSeriesLimiter.isUnderLimit(tenant) if err != nil { level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) } @@ -553,141 +532,6 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples)) } -// activeSeriesLimit implements activeSeriesLimiter interface. -type activeSeriesLimit struct { - mtx sync.RWMutex - limit uint64 - tenantCurrentSeriesMap map[string]float64 - - metaMonitoringURL *url.URL - metaMonitoringClient *http.Client - metaMonitoringQuery string - - configuredTenantLimit prometheus.Gauge - limitedRequests *prometheus.CounterVec - metaMonitoringErr prometheus.Counter -} - -func NewActiveSeriesLimit(o *Options, registerer prometheus.Registerer, r ReceiverMode, logger log.Logger) *activeSeriesLimit { - limit := &activeSeriesLimit{ - limit: o.MaxPerTenantLimit, - metaMonitoringURL: o.MetaMonitoringUrl, - metaMonitoringQuery: o.MetaMonitoringLimitQuery, - configuredTenantLimit: promauto.With(registerer).NewGauge( - prometheus.GaugeOpts{ - Name: "thanos_receive_tenant_head_series_limit", - Help: "The configured limit for active (head) series of tenants.", - }, - ), - limitedRequests: promauto.With(registerer).NewCounterVec( - prometheus.CounterOpts{ - Name: "thanos_receive_head_series_limited_requests_total", - Help: "The total number of remote write requests that have been dropped due to active series limiting.", - }, []string{"tenant"}, - ), - metaMonitoringErr: promauto.With(registerer).NewCounter( - prometheus.CounterOpts{ - Name: "thanos_receive_metamonitoring_failed_queries_total", - Help: "The total number of meta-monitoring queries that failed while limiting.", - }, - ), - } - - limit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit)) - limit.tenantCurrentSeriesMap = map[string]float64{} - - // Use specified HTTPConfig to make requests to meta-monitoring. - httpConfContentYaml, err := o.MetaMonitoringHttpClient.Content() - if err != nil { - level.Error(logger).Log("msg", "getting http client config", "err", err.Error()) - } - - httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml) - if err != nil { - level.Error(logger).Log("msg", "parsing http config YAML", "err", err.Error()) - } - - limit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit") - if err != nil { - level.Error(logger).Log("msg", "improper http client config", "err", err.Error()) - } - - return limit -} - -// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring -// solution with the configured query for getting current active (head) series of all tenants. -// It then populates tenantCurrentSeries map with result. -func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error { - c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent) - - vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{}) - if err != nil { - a.metaMonitoringErr.Inc() - return err - } - - level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes)) - - a.mtx.Lock() - defer a.mtx.Unlock() - // Construct map of tenant name and current HEAD series. - for _, e := range vectorRes { - for k, v := range e.Metric { - if k == "tenant" { - a.tenantCurrentSeriesMap[string(v)] = float64(e.Value) - level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value) - } - } - } - - return nil -} - -// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit. -// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits. -// TODO(saswatamcode): Add capability to configure different limits for different tenants. -func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) { - a.mtx.RLock() - defer a.mtx.RUnlock() - if a.limit == 0 || a.metaMonitoringURL.Host == "" { - return true, nil - } - - // In such limiting flow, we ingest the first remote write request - // and then check meta-monitoring metric to ascertain current active - // series. As such metric is updated in intervals, it is possible - // that Receive ingests more series than the limit, before detecting that - // a tenant has exceeded the set limits. - v, ok := a.tenantCurrentSeriesMap[tenant] - if !ok { - return true, errors.New("tenant not in current series map") - } - - if v >= float64(a.limit) { - level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", a.limit) - a.limitedRequests.WithLabelValues(tenant).Inc() - return false, nil - } - - return true, nil -} - -// nopSeriesLimit implements activeSeriesLimiter interface as no-op. -type nopSeriesLimit struct{} - -func NewNopSeriesLimit() *nopSeriesLimit { - return &nopSeriesLimit{} -} - -func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context, _ log.Logger) error { - return nil -} - -func (a *nopSeriesLimit) isUnderLimit(_ string, _ log.Logger) (bool, error) { - return true, nil -} - // forward accepts a write request, batches its time series by // corresponding endpoint, and forwards them in parallel to the // correct endpoint. Requests destined for the local node are written diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 81406b3585..44076de141 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -369,6 +369,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin ReplicationFactor: replicationFactor, ForwardTimeout: 5 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), + Limiter: NewLimiter(nil, nil, RouterIngestor, nil), }) handlers = append(handlers, h) h.peers = peers @@ -775,11 +776,11 @@ func TestReceiveWriteRequestLimits(t *testing.T) { handlers, _ := newTestHandlerHashring(appendables, 3) handler := handlers[0] tenant := "test" - handler.limiter = newLimiter( + handler.limiter = NewLimiter( &RootLimitsConfig{ - WriteLimits: writeLimitsConfig{ - TenantsLimits: tenantsWriteLimitsConfig{ - tenant: &writeLimitConfig{ + WriteLimits: WriteLimitsConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenant: &WriteLimitConfig{ RequestLimits: newEmptyRequestLimitsConfig(). SetSizeBytesLimit(int64(1 * units.Megabyte)). SetSeriesLimit(20). @@ -789,6 +790,8 @@ func TestReceiveWriteRequestLimits(t *testing.T) { }, }, nil, + RouterIngestor, + log.NewNopLogger(), ) wreq := &prompb.WriteRequest{ diff --git a/pkg/receive/head_series_limiter.go b/pkg/receive/head_series_limiter.go new file mode 100644 index 0000000000..746b92d990 --- /dev/null +++ b/pkg/receive/head_series_limiter.go @@ -0,0 +1,180 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "net/http" + "net/url" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/httpconfig" + "github.com/thanos-io/thanos/pkg/promclient" +) + +// headSeriesLimit implements headSeriesLimiter interface. +type headSeriesLimit struct { + mtx sync.RWMutex + limitsPerTenant map[string]uint64 + tenantCurrentSeriesMap map[string]float64 + defaultLimit uint64 + + metaMonitoringURL *url.URL + metaMonitoringClient *http.Client + metaMonitoringQuery string + + configuredTenantLimit *prometheus.GaugeVec + limitedRequests *prometheus.CounterVec + metaMonitoringErr prometheus.Counter + + logger log.Logger +} + +func NewHeadSeriesLimit(w WriteLimitsConfig, registerer prometheus.Registerer, logger log.Logger) *headSeriesLimit { + limit := &headSeriesLimit{ + metaMonitoringURL: w.GlobalLimits.metaMonitoringURL, + metaMonitoringQuery: w.GlobalLimits.MetaMonitoringLimitQuery, + defaultLimit: w.DefaultLimits.HeadSeriesLimit, + configuredTenantLimit: promauto.With(registerer).NewGaugeVec( + prometheus.GaugeOpts{ + Name: "thanos_receive_head_series_limit", + Help: "The configured limit for active (head) series of tenants.", + }, []string{"tenant"}, + ), + limitedRequests: promauto.With(registerer).NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_receive_head_series_limited_requests_total", + Help: "The total number of remote write requests that have been dropped due to active series limiting.", + }, []string{"tenant"}, + ), + metaMonitoringErr: promauto.With(registerer).NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_metamonitoring_failed_queries_total", + Help: "The total number of meta-monitoring queries that failed while limiting.", + }, + ), + logger: logger, + } + + // Record default limit with empty tenant label. + limit.configuredTenantLimit.WithLabelValues("").Set(float64(limit.defaultLimit)) + + // Initialize map for configured limits of each tenant. + limit.limitsPerTenant = map[string]uint64{} + for t, w := range w.TenantsLimits { + // No limit set for tenant so inherit default, which could be unlimited as well. + if w.HeadSeriesLimit == nil { + limit.limitsPerTenant[t] = limit.defaultLimit + limit.configuredTenantLimit.WithLabelValues(t).Set(float64(limit.defaultLimit)) + continue + } + + // Limit set to provided one for tenant that could be unlimited or some value. + // Default not inherited. + limit.limitsPerTenant[t] = *w.HeadSeriesLimit + limit.configuredTenantLimit.WithLabelValues(t).Set(float64(*w.HeadSeriesLimit)) + } + + // Initialize map for current head series of each tenant. + limit.tenantCurrentSeriesMap = map[string]float64{} + + // Use specified HTTPConfig (if any) to make requests to meta-monitoring. + c := httpconfig.NewDefaultClientConfig() + if w.GlobalLimits.MetaMonitoringHTTPClient != nil { + c = *w.GlobalLimits.MetaMonitoringHTTPClient + } + + var err error + limit.metaMonitoringClient, err = httpconfig.NewHTTPClient(c, "meta-mon-for-limit") + if err != nil { + level.Error(logger).Log("msg", "improper http client config", "err", err.Error()) + } + + return limit +} + +// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring +// solution with the configured query for getting current active (head) series of all tenants. +// It then populates tenantCurrentSeries map with result. +func (h *headSeriesLimit) QueryMetaMonitoring(ctx context.Context) error { + c := promclient.NewWithTracingClient(h.logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent) + + vectorRes, _, err := c.QueryInstant(ctx, h.metaMonitoringURL, h.metaMonitoringQuery, time.Now(), promclient.QueryOptions{}) + if err != nil { + h.metaMonitoringErr.Inc() + return err + } + + level.Debug(h.logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes)) + + h.mtx.Lock() + defer h.mtx.Unlock() + // Construct map of tenant name and current head series. + for _, e := range vectorRes { + for k, v := range e.Metric { + if k == "tenant" { + h.tenantCurrentSeriesMap[string(v)] = float64(e.Value) + level.Debug(h.logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value) + } + } + } + + return nil +} + +// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit. +// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits. +func (h *headSeriesLimit) isUnderLimit(tenant string) (bool, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + if len(h.limitsPerTenant) == 0 && h.defaultLimit == 0 { + return true, nil + } + + // In such limiting flow, we ingest the first remote write request + // and then check meta-monitoring metric to ascertain current active + // series. As such metric is updated in intervals, it is possible + // that Receive ingests more series than the limit, before detecting that + // a tenant has exceeded the set limits. + v, ok := h.tenantCurrentSeriesMap[tenant] + if !ok { + return true, errors.Newf("tenant not in current series map") + } + + var limit uint64 + limit, ok = h.limitsPerTenant[tenant] + if !ok { + // Tenant has not been defined in config, so fallback to default. + limit = h.defaultLimit + } + + if v >= float64(limit) { + level.Error(h.logger).Log("msg", "tenant above limit", "tenant", tenant, "currentSeries", v, "limit", limit) + h.limitedRequests.WithLabelValues(tenant).Inc() + return false, nil + } + + return true, nil +} + +// nopSeriesLimit implements activeSeriesLimiter interface as no-op. +type nopSeriesLimit struct{} + +func NewNopSeriesLimit() *nopSeriesLimit { + return &nopSeriesLimit{} +} + +func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context) error { + return nil +} + +func (a *nopSeriesLimit) isUnderLimit(_ string) (bool, error) { + return true, nil +} diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index c90a79ab0f..bc3c4d8358 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -4,28 +4,38 @@ package receive import ( + "context" + + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" ) type limiter struct { - requestLimiter requestLimiter - writeGate gate.Gate - // TODO: extract active series limiting logic into a self-contained type and - // move it here. + requestLimiter requestLimiter + writeGate gate.Gate + HeadSeriesLimiter headSeriesLimiter } +// requestLimiter encompasses logic for limiting remote write requests. type requestLimiter interface { AllowSizeBytes(tenant string, contentLengthBytes int64) bool AllowSeries(tenant string, amount int64) bool AllowSamples(tenant string, amount int64) bool } -func newLimiter(root *RootLimitsConfig, reg prometheus.Registerer) *limiter { +// headSeriesLimiter encompasses active/head series limiting logic. +type headSeriesLimiter interface { + QueryMetaMonitoring(context.Context) error + isUnderLimit(tenant string) (bool, error) +} + +func NewLimiter(root *RootLimitsConfig, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) *limiter { limiter := &limiter{ - writeGate: gate.NewNoop(), - requestLimiter: &noopRequestLimiter{}, + writeGate: gate.NewNoop(), + requestLimiter: &noopRequestLimiter{}, + HeadSeriesLimiter: NewNopSeriesLimit(), } if root == nil { return limiter @@ -43,5 +53,11 @@ func newLimiter(root *RootLimitsConfig, reg prometheus.Registerer) *limiter { } limiter.requestLimiter = newConfigRequestLimiter(reg, &root.WriteLimits) + // Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided. + seriesLimitSupported := (r == RouterOnly || r == RouterIngestor) && (len(root.WriteLimits.TenantsLimits) != 0 || root.WriteLimits.DefaultLimits.HeadSeriesLimit != 0) + if seriesLimitSupported { + limiter.HeadSeriesLimiter = NewHeadSeriesLimit(root.WriteLimits, reg, logger) + } + return limiter } diff --git a/pkg/receive/limiter_config.go b/pkg/receive/limiter_config.go index 8e84e1653b..67aa5ef93a 100644 --- a/pkg/receive/limiter_config.go +++ b/pkg/receive/limiter_config.go @@ -4,14 +4,17 @@ package receive import ( + "net/url" + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/httpconfig" "gopkg.in/yaml.v2" ) // RootLimitsConfig is the root configuration for limits. type RootLimitsConfig struct { // WriteLimits hold the limits for writing data. - WriteLimits writeLimitsConfig `yaml:"write"` + WriteLimits WriteLimitsConfig `yaml:"write"` } // ParseRootLimitConfig parses the root limit configuration. Even though @@ -21,37 +24,84 @@ func ParseRootLimitConfig(content []byte) (*RootLimitsConfig, error) { if err := yaml.UnmarshalStrict(content, &root); err != nil { return nil, errors.Wrapf(err, "parsing config YAML file") } + + if root.WriteLimits.GlobalLimits.MetaMonitoringURL != "" { + u, err := url.Parse(root.WriteLimits.GlobalLimits.MetaMonitoringURL) + if err != nil { + return nil, errors.Wrapf(err, "parsing meta-monitoring URL") + } + + // url.Parse might pass a URL with only path, so need to check here for scheme and host. + // As per docs: https://pkg.go.dev/net/url#Parse. + if u.Host == "" || u.Scheme == "" { + return nil, errors.Newf("%s is not a valid meta-monitoring URL (scheme: %s,host: %s)", u, u.Scheme, u.Host) + } + root.WriteLimits.GlobalLimits.metaMonitoringURL = u + } + + // Set default query if none specified. + if root.WriteLimits.GlobalLimits.MetaMonitoringLimitQuery == "" { + root.WriteLimits.GlobalLimits.MetaMonitoringLimitQuery = "sum(prometheus_tsdb_head_series) by (tenant)" + } + return &root, nil } -type writeLimitsConfig struct { +func (r RootLimitsConfig) AreHeadSeriesLimitsConfigured() bool { + return r.WriteLimits.GlobalLimits.MetaMonitoringURL != "" && (len(r.WriteLimits.TenantsLimits) != 0 || r.WriteLimits.DefaultLimits.HeadSeriesLimit != 0) +} + +type WriteLimitsConfig struct { // GlobalLimits are limits that are shared across all tenants. - GlobalLimits globalLimitsConfig `yaml:"global"` + GlobalLimits GlobalLimitsConfig `yaml:"global"` // DefaultLimits are the default limits for tenants without specified limits. - DefaultLimits defaultLimitsConfig `yaml:"default"` + DefaultLimits DefaultLimitsConfig `yaml:"default"` // TenantsLimits are the limits per tenant. - TenantsLimits tenantsWriteLimitsConfig `yaml:"tenants"` + TenantsLimits TenantsWriteLimitsConfig `yaml:"tenants"` } -type globalLimitsConfig struct { +type GlobalLimitsConfig struct { // MaxConcurrency represents the maximum concurrency during write operations. MaxConcurrency int64 `yaml:"max_concurrency"` + // MetaMonitoring options specify the query, url and client for Query API address used in head series limiting. + MetaMonitoringURL string `yaml:"meta_monitoring_url"` + MetaMonitoringHTTPClient *httpconfig.ClientConfig `yaml:"meta_monitoring_http_client"` + MetaMonitoringLimitQuery string `yaml:"meta_monitoring_limit_query"` + + metaMonitoringURL *url.URL } -type defaultLimitsConfig struct { +type DefaultLimitsConfig struct { // RequestLimits holds the difficult per-request limits. RequestLimits requestLimitsConfig `yaml:"request"` - // HeadSeriesConfig *headSeriesLimiter `yaml:"head_series"` + // HeadSeriesLimit specifies the maximum number of head series allowed for any tenant. + HeadSeriesLimit uint64 `yaml:"head_series_limit"` } -type tenantsWriteLimitsConfig map[string]*writeLimitConfig +type TenantsWriteLimitsConfig map[string]*WriteLimitConfig // A tenant might not always have limits configured, so things here must // use pointers. -type writeLimitConfig struct { +type WriteLimitConfig struct { // RequestLimits holds the difficult per-request limits. RequestLimits *requestLimitsConfig `yaml:"request"` - // HeadSeriesConfig *headSeriesLimiter `yaml:"head_series"` + // HeadSeriesLimit specifies the maximum number of head series allowed for a tenant. + HeadSeriesLimit *uint64 `yaml:"head_series_limit"` +} + +// Utils for initializing. +func NewEmptyWriteLimitConfig() *WriteLimitConfig { + return &WriteLimitConfig{} +} + +func (w *WriteLimitConfig) SetRequestLimits(rl *requestLimitsConfig) *WriteLimitConfig { + w.RequestLimits = rl + return w +} + +func (w *WriteLimitConfig) SetHeadSeriesLimit(val uint64) *WriteLimitConfig { + w.HeadSeriesLimit = &val + return w } type requestLimitsConfig struct { @@ -60,10 +110,26 @@ type requestLimitsConfig struct { SamplesLimit *int64 `yaml:"samples_limit"` } +// Utils for initializing. func newEmptyRequestLimitsConfig() *requestLimitsConfig { return &requestLimitsConfig{} } +func (rl *requestLimitsConfig) SetSizeBytesLimit(value int64) *requestLimitsConfig { + rl.SizeBytesLimit = &value + return rl +} + +func (rl *requestLimitsConfig) SetSeriesLimit(value int64) *requestLimitsConfig { + rl.SeriesLimit = &value + return rl +} + +func (rl *requestLimitsConfig) SetSamplesLimit(value int64) *requestLimitsConfig { + rl.SamplesLimit = &value + return rl +} + // OverlayWith overlays the current configuration with another one. This means // that limit values that are not set (have a nil value) will be overwritten in // the caller. @@ -79,18 +145,3 @@ func (rl *requestLimitsConfig) OverlayWith(other *requestLimitsConfig) *requestL } return rl } - -func (rl *requestLimitsConfig) SetSizeBytesLimit(value int64) *requestLimitsConfig { - rl.SizeBytesLimit = &value - return rl -} - -func (rl *requestLimitsConfig) SetSeriesLimit(value int64) *requestLimitsConfig { - rl.SeriesLimit = &value - return rl -} - -func (rl *requestLimitsConfig) SetSamplesLimit(value int64) *requestLimitsConfig { - rl.SamplesLimit = &value - return rl -} diff --git a/pkg/receive/limiter_config_test.go b/pkg/receive/limiter_config_test.go index a58940dd9a..b080680162 100644 --- a/pkg/receive/limiter_config_test.go +++ b/pkg/receive/limiter_config_test.go @@ -4,6 +4,7 @@ package receive import ( + "net/url" "os" "path" "testing" @@ -23,26 +24,38 @@ func TestParseLimiterConfig(t *testing.T) { configFileName: "good_limits.yaml", wantErr: false, want: &RootLimitsConfig{ - WriteLimits: writeLimitsConfig{ - GlobalLimits: globalLimitsConfig{MaxConcurrency: 30}, - DefaultLimits: defaultLimitsConfig{ + WriteLimits: WriteLimitsConfig{ + GlobalLimits: GlobalLimitsConfig{ + MaxConcurrency: 30, + MetaMonitoringURL: "http://localhost:9090", + MetaMonitoringLimitQuery: "sum(prometheus_tsdb_head_series) by (tenant)", + metaMonitoringURL: &url.URL{ + Scheme: "http", + Host: "localhost:9090", + }, + }, + DefaultLimits: DefaultLimitsConfig{ RequestLimits: *newEmptyRequestLimitsConfig(). SetSizeBytesLimit(1024). SetSeriesLimit(1000). SetSamplesLimit(10), + HeadSeriesLimit: 1000, }, - TenantsLimits: tenantsWriteLimitsConfig{ - "acme": &writeLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig(). - SetSizeBytesLimit(0). - SetSeriesLimit(0). - SetSamplesLimit(0), - }, - "ajax": &writeLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig(). - SetSeriesLimit(50000). - SetSamplesLimit(500), - }, + TenantsLimits: TenantsWriteLimitsConfig{ + "acme": NewEmptyWriteLimitConfig(). + SetRequestLimits( + newEmptyRequestLimitsConfig(). + SetSizeBytesLimit(0). + SetSeriesLimit(0). + SetSamplesLimit(0), + ). + SetHeadSeriesLimit(2000), + "ajax": NewEmptyWriteLimitConfig(). + SetRequestLimits( + newEmptyRequestLimitsConfig(). + SetSeriesLimit(50000). + SetSamplesLimit(500), + ), }, }, }, diff --git a/pkg/receive/request_limiter.go b/pkg/receive/request_limiter.go index 8479c95960..de7554de2f 100644 --- a/pkg/receive/request_limiter.go +++ b/pkg/receive/request_limiter.go @@ -19,6 +19,7 @@ var unlimitedRequestLimitsConfig = newEmptyRequestLimitsConfig(). SetSeriesLimit(0). SetSamplesLimit(0) +// configRequestLimiter implements requestLimiter interface. type configRequestLimiter struct { tenantLimits map[string]*requestLimitsConfig cachedDefaultLimits *requestLimitsConfig @@ -26,7 +27,7 @@ type configRequestLimiter struct { configuredLimits *prometheus.GaugeVec } -func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *writeLimitsConfig) *configRequestLimiter { +func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *WriteLimitsConfig) *configRequestLimiter { // Merge the default limits configuration with an unlimited configuration // to ensure the nils are overwritten with zeroes. defaultRequestLimits := writeLimits.DefaultLimits.RequestLimits.OverlayWith(unlimitedRequestLimitsConfig) @@ -39,7 +40,9 @@ func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *writeLimits tenantsLimits := writeLimits.TenantsLimits tenantRequestLimits := make(map[string]*requestLimitsConfig) for tenant, limitConfig := range tenantsLimits { - tenantRequestLimits[tenant] = limitConfig.RequestLimits.OverlayWith(defaultRequestLimits) + if limitConfig.RequestLimits != nil { + tenantRequestLimits[tenant] = limitConfig.RequestLimits.OverlayWith(defaultRequestLimits) + } } limiter := configRequestLimiter{ diff --git a/pkg/receive/request_limiter_test.go b/pkg/receive/request_limiter_test.go index fc5a968c73..e654cd1cdf 100644 --- a/pkg/receive/request_limiter_test.go +++ b/pkg/receive/request_limiter_test.go @@ -13,13 +13,13 @@ func TestRequestLimiter_limitsFor(t *testing.T) { tenantWithLimits := "limited-tenant" tenantWithoutLimits := "unlimited-tenant" - limits := writeLimitsConfig{ - DefaultLimits: defaultLimitsConfig{ + limits := WriteLimitsConfig{ + DefaultLimits: DefaultLimitsConfig{ RequestLimits: *newEmptyRequestLimitsConfig(). SetSeriesLimit(10), }, - TenantsLimits: tenantsWriteLimitsConfig{ - tenantWithLimits: &writeLimitConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenantWithLimits: &WriteLimitConfig{ RequestLimits: newEmptyRequestLimitsConfig(). SetSeriesLimit(30), }, @@ -100,12 +100,12 @@ func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tenant := "tenant" - limits := writeLimitsConfig{ - DefaultLimits: defaultLimitsConfig{ + limits := WriteLimitsConfig{ + DefaultLimits: DefaultLimitsConfig{ RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), }, - TenantsLimits: tenantsWriteLimitsConfig{ - tenant: &writeLimitConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenant: &WriteLimitConfig{ RequestLimits: newEmptyRequestLimitsConfig().SetSizeBytesLimit(tt.sizeByteLimit), }, }, @@ -157,12 +157,12 @@ func TestRequestLimiter_AllowSeries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tenant := "tenant" - limits := writeLimitsConfig{ - DefaultLimits: defaultLimitsConfig{ + limits := WriteLimitsConfig{ + DefaultLimits: DefaultLimitsConfig{ RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), }, - TenantsLimits: tenantsWriteLimitsConfig{ - tenant: &writeLimitConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenant: &WriteLimitConfig{ RequestLimits: newEmptyRequestLimitsConfig().SetSeriesLimit(tt.seriesLimit), }, }, @@ -215,12 +215,12 @@ func TestRequestLimiter_AllowSamples(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tenant := "tenant" - limits := writeLimitsConfig{ - DefaultLimits: defaultLimitsConfig{ + limits := WriteLimitsConfig{ + DefaultLimits: DefaultLimitsConfig{ RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), }, - TenantsLimits: tenantsWriteLimitsConfig{ - tenant: &writeLimitConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenant: &WriteLimitConfig{ RequestLimits: newEmptyRequestLimitsConfig().SetSamplesLimit(tt.samplesLimit), }, }, diff --git a/pkg/receive/testdata/limits_config/good_limits.yaml b/pkg/receive/testdata/limits_config/good_limits.yaml index 66017cd59c..2345756179 100644 --- a/pkg/receive/testdata/limits_config/good_limits.yaml +++ b/pkg/receive/testdata/limits_config/good_limits.yaml @@ -1,17 +1,21 @@ write: global: max_concurrency: 30 + meta_monitoring_url: "http://localhost:9090" + meta_monitoring_limit_query: "sum(prometheus_tsdb_head_series) by (tenant)" default: request: size_bytes_limit: 1024 series_limit: 1000 samples_limit: 10 + head_series_limit: 1000 tenants: acme: request: size_bytes_limit: 0 series_limit: 0 samples_limit: 0 + head_series_limit: 2000 ajax: request: series_limit: 50000 diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index a638a3b9f0..bddc07afd0 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -502,11 +502,28 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable { } if r.limit != 0 && r.metaMonitoring != "" { - args["--receive.tenant-limits.max-head-series"] = fmt.Sprintf("%v", r.limit) - args["--receive.tenant-limits.meta-monitoring-url"] = r.metaMonitoring - if r.metaMonitoringQuery != "" { - args["--receive.tenant-limits.meta-monitoring-query"] = r.metaMonitoringQuery + cfg := receive.RootLimitsConfig{ + WriteLimits: receive.WriteLimitsConfig{ + GlobalLimits: receive.GlobalLimitsConfig{ + MetaMonitoringURL: r.metaMonitoring, + MetaMonitoringLimitQuery: r.metaMonitoringQuery, + }, + DefaultLimits: receive.DefaultLimitsConfig{ + HeadSeriesLimit: uint64(r.limit), + }, + }, } + + b, err := yaml.Marshal(cfg) + if err != nil { + return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(r.Name(), errors.Wrapf(err, "generate limiting file: %v", hashring))} + } + + if err := os.WriteFile(filepath.Join(r.Dir(), "limits.yaml"), b, 0600); err != nil { + return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(r.Name(), errors.Wrap(err, "creating limitin config"))} + } + + args["--receive.limits-config-file"] = filepath.Join(r.InternalDir(), "limits.yaml") } if err := os.MkdirAll(filepath.Join(r.Dir(), "data"), 0750); err != nil { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 0ff47d75f0..ba1537e895 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -729,7 +729,7 @@ func TestReceive(t *testing.T) { ingestor1Name := e.Name() + "-" + ingestor1.Name() // Here for exceed-tenant we go above limit by 10, which results in 0 value. queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name) + return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name) }, time.Now, promclient.QueryOptions{ Deduplicate: true, }, model.Vector{ @@ -741,7 +741,7 @@ func TestReceive(t *testing.T) { // For under-tenant we stay at -5, as we have only pushed 5 series. queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name) + return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name) }, time.Now, promclient.QueryOptions{ Deduplicate: true, }, model.Vector{