diff --git a/CHANGELOG.md b/CHANGELOG.md index a47446ee212..52394775770 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers. * [FEATURE] Alertmanager: Add support for time_intervals. #5102 * [FEATURE] Added `snappy-block` as an option for grpc compression #5215 +* [FEATURE] Enable experimental out-of-order samples support. Added 2 new configs `ingester.out_of_order_time_window` and `blocks-storage.tsdb.out_of_order_cap_max`. #4964 * [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976 * [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005 * [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 89546149346..40bea8cd55f 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1227,4 +1227,9 @@ blocks_storage: # down. # CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown [memory_snapshot_on_shutdown: | default = false] + + # [EXPERIMENTAL] Configures the maximum number of samples per chunk that can + # be out-of-order. + # CLI flag: -blocks-storage.tsdb.out-of-order-cap-max + [out_of_order_cap_max: | default = 32] ``` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 907e7cbbf7f..6ee5c8cd304 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1304,4 +1304,9 @@ blocks_storage: # down. # CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown [memory_snapshot_on_shutdown: | default = false] + + # [EXPERIMENTAL] Configures the maximum number of samples per chunk that can + # be out-of-order. + # CLI flag: -blocks-storage.tsdb.out-of-order-cap-max + [out_of_order_cap_max: | default = 32] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d24b22b9266..7a5e0c8fa61 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1735,6 +1735,11 @@ tsdb: # down. # CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown [memory_snapshot_on_shutdown: | default = false] + + # [EXPERIMENTAL] Configures the maximum number of samples per chunk that can + # be out-of-order. + # CLI flag: -blocks-storage.tsdb.out-of-order-cap-max + [out_of_order_cap_max: | default = 32] ``` ### `compactor_config` diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 004f8aabb25..1384357c26a 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -100,4 +100,8 @@ Currently experimental features are: - `-frontend.query-vertical-shard-size` (int) CLI flag - `query_vertical_shard_size` (int) field in runtime config file - Snapshotting of in-memory TSDB on disk during shutdown - - `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag \ No newline at end of file + - `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag +- Out of order samples support + - `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag + - `-ingester.out-of-order-time-window` (duration) CLI flag + - `out_of_order_time_window` (duration) field in runtime config file diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index be5de572ed4..6a9648dbbbf 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -3036,7 +3036,6 @@ func TestDistributorValidation(t *testing.T) { }}, err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past), }, - // Test validation fails for samples from the future. { labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 247c83a263f..8417bcd019e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -842,11 +842,13 @@ func (i *Ingester) updateUserTSDBConfigs() { ExemplarsConfig: &config.ExemplarsConfig{ MaxExemplars: i.getMaxExemplars(userID), }, + TSDBConfig: &config.TSDBConfig{ + OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(), + }, }, } - // This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method - // with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars. + // This method currently updates the MaxExemplars and OutOfOrderTimeWindow. err := userDB.db.ApplyConfig(cfg) if err != nil { level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.") @@ -995,6 +997,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte startAppend = time.Now() sampleOutOfBoundsCount = 0 sampleOutOfOrderCount = 0 + sampleTooOldCount = 0 newValueForTimestampCount = 0 perUserSeriesLimitCount = 0 perMetricSeriesLimitCount = 0 @@ -1063,6 +1066,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) }) continue + case storage.ErrTooOldSample: + sampleTooOldCount++ + updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) }) + continue + case errMaxSeriesPerUserLimitExceeded: perUserSeriesLimitCount++ updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) }) @@ -1153,6 +1161,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if sampleOutOfOrderCount > 0 { validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount)) } + if sampleTooOldCount > 0 { + validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount)) + } if newValueForTimestampCount > 0 { validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount)) } @@ -1947,6 +1958,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { MaxExemplars: maxExemplarsForUser, HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown, + OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(), + OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 11819292bee..dd40c49efd6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -341,6 +341,7 @@ func TestIngester_Push(t *testing.T) { additionalMetrics []string disableActiveSeries bool maxExemplars int + oooTimeWindow time.Duration }{ "should succeed on valid series and metadata": { reqs: []*cortexpb.WriteRequest{ @@ -553,7 +554,7 @@ func TestIngester_Push(t *testing.T) { cortex_ingester_memory_series_removed_total{user="test"} 0 `, }, - "should soft fail on sample out of order": { + "ooo disabled, should soft fail on sample out of order": { reqs: []*cortexpb.WriteRequest{ cortexpb.ToWriteRequest( []labels.Labels{metricLabels}, @@ -597,7 +598,7 @@ func TestIngester_Push(t *testing.T) { cortex_ingester_active_series{user="test"} 1 `, }, - "should soft fail on sample out of bound": { + "ooo disabled, should soft fail on sample out of bound": { reqs: []*cortexpb.WriteRequest{ cortexpb.ToWriteRequest( []labels.Labels{metricLabels}, @@ -641,6 +642,92 @@ func TestIngester_Push(t *testing.T) { cortex_ingester_active_series{user="test"} 1 `, }, + "ooo enabled, should soft fail on sample too old": { + reqs: []*cortexpb.WriteRequest{ + cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}, + nil, + cortexpb.API), + cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (600 * 1000)}}, + nil, + cortexpb.API), + }, + oooTimeWindow: 5 * time.Minute, + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(storage.ErrTooOldSample, model.Time(1575043969-(600*1000)), cortexpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()), + expectedIngested: []cortexpb.TimeSeries{ + {Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}}, + }, + expectedMetrics: ` + # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. + # TYPE cortex_ingester_ingested_samples_total counter + cortex_ingester_ingested_samples_total 1 + # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. + # TYPE cortex_ingester_ingested_samples_failures_total counter + cortex_ingester_ingested_samples_failures_total 1 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 + # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. + # TYPE cortex_ingester_memory_series_created_total counter + cortex_ingester_memory_series_created_total{user="test"} 1 + # HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user. + # TYPE cortex_ingester_memory_series_removed_total counter + cortex_ingester_memory_series_removed_total{user="test"} 0 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1 + # HELP cortex_ingester_active_series Number of currently active series per user. + # TYPE cortex_ingester_active_series gauge + cortex_ingester_active_series{user="test"} 1 + `, + }, + "ooo enabled, should succeed": { + reqs: []*cortexpb.WriteRequest{ + cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}, + nil, + cortexpb.API), + cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}}, + nil, + cortexpb.API), + }, + oooTimeWindow: 5 * time.Minute, + expectedIngested: []cortexpb.TimeSeries{ + {Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}}, + }, + expectedMetrics: ` + # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. + # TYPE cortex_ingester_ingested_samples_total counter + cortex_ingester_ingested_samples_total 2 + # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. + # TYPE cortex_ingester_ingested_samples_failures_total counter + cortex_ingester_ingested_samples_failures_total 0 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 + # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. + # TYPE cortex_ingester_memory_series_created_total counter + cortex_ingester_memory_series_created_total{user="test"} 1 + # HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user. + # TYPE cortex_ingester_memory_series_removed_total counter + cortex_ingester_memory_series_removed_total{user="test"} 0 + # HELP cortex_ingester_active_series Number of currently active series per user. + # TYPE cortex_ingester_active_series gauge + cortex_ingester_active_series{user="test"} 1 + `, + }, "should soft fail on two different sample values at the same timestamp": { reqs: []*cortexpb.WriteRequest{ cortexpb.ToWriteRequest( @@ -777,6 +864,7 @@ func TestIngester_Push(t *testing.T) { limits := defaultLimitsTestConfig() limits.MaxExemplars = testData.maxExemplars + limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow) i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 727cd1ce931..8ebad338d44 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -4,4 +4,5 @@ const ( sampleOutOfOrder = "sample-out-of-order" newValueForTimestamp = "new-value-for-timestamp" sampleOutOfBounds = "sample-out-of-bounds" + sampleTooOld = "sample-too-old" ) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2e030220a3e..48753108f42 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -8,6 +8,7 @@ import ( "github.com/alecthomas/units" "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/thanos-io/thanos/pkg/store" @@ -45,6 +46,7 @@ var ( errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes") errInvalidStripeSize = errors.New("invalid TSDB stripe size") + errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") errEmptyBlockranges = errors.New("empty block ranges for TSDB") ) @@ -145,11 +147,14 @@ type TSDBConfig struct { // How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override. CloseIdleTSDBInterval time.Duration `yaml:"-"` - // Positive value enables experiemental support for exemplars. 0 or less to disable. + // Positive value enables experimental support for exemplars. 0 or less to disable. MaxExemplars int `yaml:"max_exemplars"` // Enable snapshotting of in-memory TSDB data on disk when shutting down. MemorySnapshotOnShutdown bool `yaml:"memory_snapshot_on_shutdown"` + + // OutOfOrderCapMax is maximum capacity for OOO chunks (in samples). + OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"` } // RegisterFlags registers the TSDBConfig flags. @@ -176,6 +181,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.") f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.") f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.") + f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.") } // Validate the config. @@ -212,6 +218,10 @@ func (cfg *TSDBConfig) Validate() error { return errInvalidWALSegmentSizeBytes } + if cfg.OutOfOrderCapMax <= 0 { + return errInvalidOutOfOrderCapMax + } + return nil } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 6946f51c6e8..e7cb5dc502a 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -115,6 +115,12 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errInvalidWALSegmentSizeBytes, }, + "should fail on out of order cap max": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.OutOfOrderCapMax = 0 + }, + expectedErr: errInvalidOutOfOrderCapMax, + }, } for testName, testData := range tests { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3be42b02dcf..39e49b2bed1 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -70,6 +70,8 @@ type Limits struct { MaxLocalMetadataPerMetric int `yaml:"max_metadata_per_metric" json:"max_metadata_per_metric"` MaxGlobalMetricsWithMetadataPerUser int `yaml:"max_global_metadata_per_user" json:"max_global_metadata_per_user"` MaxGlobalMetadataPerMetric int `yaml:"max_global_metadata_per_metric" json:"max_global_metadata_per_metric"` + // Out-of-order + OutOfOrderTimeWindow model.Duration `yaml:"out_of_order_time_window" json:"out_of_order_time_window"` // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` @@ -150,6 +152,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalSeriesPerUser, "ingester.max-global-series-per-user", 0, "The maximum number of active series per user, across the cluster before replication. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.") f.IntVar(&l.MaxGlobalSeriesPerMetric, "ingester.max-global-series-per-metric", 0, "The maximum number of active series per metric name, across the cluster before replication. 0 to disable.") f.IntVar(&l.MaxExemplars, "ingester.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. less than zero means disabled. If the value is set to zero, cortex will fallback to blocks-storage.tsdb.max-exemplars value.") + f.Var(&l.OutOfOrderTimeWindow, "ingester.out-of-order-time-window", "[Experimental] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default.") f.IntVar(&l.MaxLocalMetricsWithMetadataPerUser, "ingester.max-metadata-per-user", 8000, "The maximum number of active metrics with metadata per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalMetadataPerMetric, "ingester.max-metadata-per-metric", 10, "The maximum number of metadata per metric, per ingester. 0 to disable.") @@ -157,7 +160,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 2000000, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") - f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") + f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler for `query`, `query_range` and `series` APIs. 0 to disable.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query) and in the querier (on the query possibly split by the query-frontend). 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") @@ -389,6 +392,11 @@ func (o *Overrides) MaxGlobalSeriesPerUser(userID string) int { return o.GetOverridesForUser(userID).MaxGlobalSeriesPerUser } +// OutOfOrderTimeWindow returns the allowed time window for ingestion of out-of-order samples. +func (o *Overrides) OutOfOrderTimeWindow(userID string) model.Duration { + return o.GetOverridesForUser(userID).OutOfOrderTimeWindow +} + // MaxGlobalSeriesPerMetric returns the maximum number of series allowed per metric across the cluster. func (o *Overrides) MaxGlobalSeriesPerMetric(userID string) int { return o.GetOverridesForUser(userID).MaxGlobalSeriesPerMetric