diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 2dc2db88bf3..dddaeb93afa 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -98,4 +98,8 @@ Currently experimental features are: - Enabled via `-compactor.sharding-enabled=true`, `-compactor.sharding-strategy=shuffle-sharding`, and `-compactor.tenant-shard-size` set to a value larger than 0. - Vertical sharding at query frontend for range/instant queries - `-frontend.query-vertical-shard-size` (int) CLI flag - - `query_vertical_shard_size` (int) field in runtime config file \ No newline at end of file + - `query_vertical_shard_size` (int) field in runtime config file +- Out of order samples support + - `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag + - `-ingester.out-of-order-time-window` (duration) CLI flag + - `out_of_order_time_window` (duration) field in runtime config file diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b078df5b2d6..29436c813fc 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2949,7 +2949,15 @@ func TestDistributorValidation(t *testing.T) { Value: 1, }}, }, - + // Test validation fails for very old samples. + { + labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, + samples: []cortexpb.Sample{{ + TimestampMs: int64(past), + Value: 2, + }}, + err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past), + }, // Test validation fails for samples from the future. { labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, @@ -2997,6 +3005,8 @@ func TestDistributorValidation(t *testing.T) { flagext.DefaultValues(&limits) limits.CreationGracePeriod = model.Duration(2 * time.Hour) + limits.RejectOldSamples = true + limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour) limits.MaxLabelNamesPerSeries = 2 ds, _, _, _ := prepare(t, prepConfig{ diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 19a5d73310b..85988e03244 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -2,7 +2,6 @@ package tsdb import ( "flag" - "fmt" "path/filepath" "strings" "time" @@ -51,6 +50,7 @@ var ( errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes") errInvalidStripeSize = errors.New("invalid TSDB stripe size") + errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") errEmptyBlockranges = errors.New("empty block ranges for TSDB") ) @@ -153,7 +153,6 @@ type TSDBConfig struct { MaxExemplars int `yaml:"max_exemplars"` // OutOfOrderCapMax is maximum capacity for OOO chunks (in samples). - // If it is <=0, the default value is assumed. OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"` } @@ -179,7 +178,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.") f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.") - f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, fmt.Sprintf("[EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in samples). If set to <=0, default value %d is assumed.", tsdb.DefaultOutOfOrderCapMax)) + f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.") } // Validate the config. @@ -216,6 +215,10 @@ func (cfg *TSDBConfig) Validate() error { return errInvalidWALSegmentSizeBytes } + if cfg.OutOfOrderCapMax <= 0 { + return errInvalidOutOfOrderCapMax + } + return nil } diff --git a/pkg/util/validation/errors.go b/pkg/util/validation/errors.go index a6a24178094..483bf6acc5e 100644 --- a/pkg/util/validation/errors.go +++ b/pkg/util/validation/errors.go @@ -168,6 +168,14 @@ func (e *sampleValidationError) Error() string { return fmt.Sprintf(e.message, e.timestamp, e.metricName) } +func newSampleTimestampTooOldError(metricName string, timestamp int64) ValidationError { + return &sampleValidationError{ + message: "timestamp too old: %d metric: %.200q", + metricName: metricName, + timestamp: timestamp, + } +} + func newSampleTimestampTooNewError(metricName string, timestamp int64) ValidationError { return &sampleValidationError{ message: "timestamp too new: %d metric: %.200q", diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 90aa21bcddb..148d2a7a612 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -48,6 +48,8 @@ type Limits struct { MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` + RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` + RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` @@ -130,6 +132,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") f.IntVar(&l.MaxLabelsSizeBytes, "validation.max-labels-size-bytes", 0, "Maximum combined size in bytes of all labels and label values accepted for a series. 0 to disable the limit.") f.IntVar(&l.MaxMetadataLength, "validation.max-metadata-length", 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT.") + f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Deprecated (use ingester.out-of-order-time-window instead): Reject old samples.") + _ = l.RejectOldSamplesMaxAge.Set("14d") + f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Deprecated (use ingester.out-of-order-time-window instead): Maximum accepted sample age before rejecting.") _ = l.CreationGracePeriod.Set("10m") f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") @@ -148,7 +153,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 2000000, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") - f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") + f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler for `query`, `query_range` and `series` APIs. 0 to disable.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query) and in the querier (on the query possibly split by the query-frontend). 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") @@ -341,6 +346,17 @@ func (o *Overrides) MaxMetadataLength(userID string) int { return o.GetOverridesForUser(userID).MaxMetadataLength } +// RejectOldSamples returns true when we should reject samples older than certain +// age. +func (o *Overrides) RejectOldSamples(userID string) bool { + return o.GetOverridesForUser(userID).RejectOldSamples +} + +// RejectOldSamplesMaxAge returns the age at which samples should be rejected. +func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration { + return time.Duration(o.GetOverridesForUser(userID).RejectOldSamplesMaxAge) +} + // CreationGracePeriod is misnamed, and actually returns how far into the future // we should accept samples. func (o *Overrides) CreationGracePeriod(userID string) time.Duration { diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 5de9454e22b..0985e7db75c 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -36,6 +36,7 @@ const ( missingMetricName = "missing_metric_name" invalidMetricName = "metric_name_invalid" + greaterThanMaxSampleAge = "greater_than_max_sample_age" maxLabelNamesPerSeries = "max_label_names_per_series" tooFarInFuture = "too_far_in_future" invalidLabel = "label_invalid" @@ -105,6 +106,11 @@ func init() { func ValidateSample(limits *Limits, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError { unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) + if limits.RejectOldSamples && model.Time(s.TimestampMs) < model.Now().Add(-time.Duration(limits.RejectOldSamplesMaxAge)) { + DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc() + return newSampleTimestampTooOldError(unsafeMetricName, s.TimestampMs) + } + if model.Time(s.TimestampMs) > model.Now().Add(time.Duration(limits.CreationGracePeriod)) { DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc() return newSampleTimestampTooNewError(unsafeMetricName, s.TimestampMs)