diff --git a/docs/components/receive.md b/docs/components/receive.md index fbf08e4b631..2e2a74e1f5c 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -241,7 +241,7 @@ Under `global`: - `meta_monitoring_http_client`: Optional YAML field specifying HTTP client config for meta-monitoring. Under `default` and per `tenant`: -- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. +- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. Set to 0 for unlimited. NOTE: - It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times. diff --git a/pkg/receive/head_series_limiter.go b/pkg/receive/head_series_limiter.go index c088dc64951..f0e45123865 100644 --- a/pkg/receive/head_series_limiter.go +++ b/pkg/receive/head_series_limiter.go @@ -155,6 +155,11 @@ func (h *headSeriesLimit) isUnderLimit(tenant string) (bool, error) { limit = h.defaultLimit } + // If tenant limit is 0 we treat it as unlimited. + if limit == 0 { + return true, nil + } + if v >= float64(limit) { level.Error(h.logger).Log("msg", "tenant above limit", "tenant", tenant, "currentSeries", v, "limit", limit) h.limitedRequests.WithLabelValues(tenant).Inc() diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 18e111332a6..2053421ef6f 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -500,6 +500,7 @@ type ReceiveBuilder struct { maxExemplars int ingestion bool limit int + tenantsLimits receive.TenantsWriteLimitsConfig metaMonitoring string metaMonitoringQuery string hashringConfigs []receive.HashringConfig @@ -554,9 +555,10 @@ func (r *ReceiveBuilder) WithRelabelConfigs(relabelConfigs []*relabel.Config) *R return r } -func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, query ...string) *ReceiveBuilder { +func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, tenantsLimits receive.TenantsWriteLimitsConfig, query ...string) *ReceiveBuilder { r.limit = limit r.metaMonitoring = metaMonitoring + r.tenantsLimits = tenantsLimits if len(query) > 0 { r.metaMonitoringQuery = query[0] } @@ -611,6 +613,10 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable { }, } + if r.tenantsLimits != nil { + cfg.WriteLimits.TenantsLimits = r.tenantsLimits + } + b, err := yaml.Marshal(cfg) if err != nil { return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(r.Name(), errors.Wrapf(err, "generate limiting file: %v", hashring))} diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 4f729b7c325..63872e3dda9 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -813,9 +813,13 @@ test_metric{a="2", b="2"} 1`) }, } - i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() + tenantsLimits := receive.TenantsWriteLimitsConfig{ + "unlimited-tenant": receive.NewEmptyWriteLimitConfig().SetHeadSeriesLimit(0), + } + + i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init() + i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init() + i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init() testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable, i2Runnable, i3Runnable)) @@ -824,7 +828,7 @@ test_metric{a="2", b="2"} 1`) testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - // We run two avalanches, one tenant which exceeds the limit, and one tenant which remains under it. + // We run three avalanches, one tenant which exceeds the limit, one tenant which remains under it, and one for the unlimited tenant. // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. // One request always fails due to TSDB not being ready for new tenant. @@ -864,7 +868,26 @@ test_metric{a="2", b="2"} 1`) TenantID: "under-tenant", }) - testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2)) + // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. + // One request always fails due to TSDB not being ready for new tenant. + // So without limiting we end up with 40 timeseries and 40 samples. + avalanche3 := e2ethanos.NewAvalanche(e, "avalanche-3", + e2ethanos.AvalancheOptions{ + MetricCount: "10", + SeriesCount: "1", + MetricInterval: "30", + SeriesInterval: "3600", + ValueInterval: "3600", + + RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), + RemoteWriteInterval: "30s", + RemoteBatchSize: "10", + RemoteRequestCount: "5", + + TenantID: "unlimited-tenant", + }) + + testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2, avalanche3)) // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request. // 3 limited requests belong to the exceed-tenant. @@ -876,7 +899,7 @@ test_metric{a="2", b="2"} 1`) ingestor1Name := e.Name() + "-" + ingestor1.Name() // Here for exceed-tenant we go above limit by 10, which results in 0 value. queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name) + return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\", tenant=\"\"}", ingestor1Name) }, time.Now, promclient.QueryOptions{ Deduplicate: true, }, model.Vector{ @@ -888,7 +911,7 @@ test_metric{a="2", b="2"} 1`) // For under-tenant we stay at -5, as we have only pushed 5 series. queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name) + return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\", tenant=\"\"}", ingestor1Name) }, time.Now, promclient.QueryOptions{ Deduplicate: true, }, model.Vector{ @@ -918,6 +941,16 @@ test_metric{a="2", b="2"} 1`) }, }) + // Query meta-monitoring solution to assert that 40 timeseries have been ingested for unlimited-tenant. + queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"unlimited-tenant\"})" }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{}, + Value: model.SampleValue(40), + }, + }) + // Query meta-monitoring solution to assert that 3 requests were limited for exceed-tenant and none for under-tenant. queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "thanos_receive_head_series_limited_requests_total" }, time.Now, promclient.QueryOptions{ Deduplicate: true,