Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Nov 20, 2022
1 parent a5b5b9b commit 5acf328
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 6 deletions.
6 changes: 5 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ Currently experimental features are:
- Enabled via `-compactor.sharding-enabled=true`, `-compactor.sharding-strategy=shuffle-sharding`, and `-compactor.tenant-shard-size` set to a value larger than 0.
- Vertical sharding at query frontend for range/instant queries
- `-frontend.query-vertical-shard-size` (int) CLI flag
- `query_vertical_shard_size` (int) field in runtime config file
- `query_vertical_shard_size` (int) field in runtime config file
- Out of order samples support
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
- `-ingester.out-of-order-time-window` (duration) CLI flag
- `out_of_order_time_window` (duration) field in runtime config file
12 changes: 11 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2949,7 +2949,15 @@ func TestDistributorValidation(t *testing.T) {
Value: 1,
}},
},

// Test validation fails for very old samples.
{
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
samples: []cortexpb.Sample{{
TimestampMs: int64(past),
Value: 2,
}},
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
},
// Test validation fails for samples from the future.
{
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
Expand Down Expand Up @@ -2997,6 +3005,8 @@ func TestDistributorValidation(t *testing.T) {
flagext.DefaultValues(&limits)

limits.CreationGracePeriod = model.Duration(2 * time.Hour)
limits.RejectOldSamples = true
limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour)
limits.MaxLabelNamesPerSeries = 2

ds, _, _, _ := prepare(t, prepConfig{
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package tsdb

import (
"flag"
"fmt"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -51,6 +50,7 @@ var (
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
)

Expand Down Expand Up @@ -153,7 +153,6 @@ type TSDBConfig struct {
MaxExemplars int `yaml:"max_exemplars"`

// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
// If it is <=0, the default value is assumed.
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
}

Expand All @@ -179,7 +178,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.")
f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.")
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, fmt.Sprintf("[EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in samples). If set to <=0, default value %d is assumed.", tsdb.DefaultOutOfOrderCapMax))
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
}

// Validate the config.
Expand Down Expand Up @@ -216,6 +215,10 @@ func (cfg *TSDBConfig) Validate() error {
return errInvalidWALSegmentSizeBytes
}

if cfg.OutOfOrderCapMax <= 0 {
return errInvalidOutOfOrderCapMax
}

return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ func (e *sampleValidationError) Error() string {
return fmt.Sprintf(e.message, e.timestamp, e.metricName)
}

func newSampleTimestampTooOldError(metricName string, timestamp int64) ValidationError {
return &sampleValidationError{
message: "timestamp too old: %d metric: %.200q",
metricName: metricName,
timestamp: timestamp,
}
}

func newSampleTimestampTooNewError(metricName string, timestamp int64) ValidationError {
return &sampleValidationError{
message: "timestamp too new: %d metric: %.200q",
Expand Down
18 changes: 17 additions & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Limits struct {
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"`
MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
Expand Down Expand Up @@ -130,6 +132,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
f.IntVar(&l.MaxLabelsSizeBytes, "validation.max-labels-size-bytes", 0, "Maximum combined size in bytes of all labels and label values accepted for a series. 0 to disable the limit.")
f.IntVar(&l.MaxMetadataLength, "validation.max-metadata-length", 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT.")
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Deprecated (use ingester.out-of-order-time-window instead): Reject old samples.")
_ = l.RejectOldSamplesMaxAge.Set("14d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Deprecated (use ingester.out-of-order-time-window instead): Maximum accepted sample age before rejecting.")
_ = l.CreationGracePeriod.Set("10m")
f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
Expand All @@ -148,7 +153,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.")
f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 2000000, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable")
f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler for `query`, `query_range` and `series` APIs. 0 to disable.")
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query) and in the querier (on the query possibly split by the query-frontend). 0 to disable.")
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
Expand Down Expand Up @@ -341,6 +346,17 @@ func (o *Overrides) MaxMetadataLength(userID string) int {
return o.GetOverridesForUser(userID).MaxMetadataLength
}

// RejectOldSamples returns true when we should reject samples older than certain
// age.
func (o *Overrides) RejectOldSamples(userID string) bool {
return o.GetOverridesForUser(userID).RejectOldSamples
}

// RejectOldSamplesMaxAge returns the age at which samples should be rejected.
func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration {
return time.Duration(o.GetOverridesForUser(userID).RejectOldSamplesMaxAge)
}

// CreationGracePeriod is misnamed, and actually returns how far into the future
// we should accept samples.
func (o *Overrides) CreationGracePeriod(userID string) time.Duration {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

missingMetricName = "missing_metric_name"
invalidMetricName = "metric_name_invalid"
greaterThanMaxSampleAge = "greater_than_max_sample_age"
maxLabelNamesPerSeries = "max_label_names_per_series"
tooFarInFuture = "too_far_in_future"
invalidLabel = "label_invalid"
Expand Down Expand Up @@ -105,6 +106,11 @@ func init() {
func ValidateSample(limits *Limits, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError {
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)

if limits.RejectOldSamples && model.Time(s.TimestampMs) < model.Now().Add(-time.Duration(limits.RejectOldSamplesMaxAge)) {
DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc()
return newSampleTimestampTooOldError(unsafeMetricName, s.TimestampMs)
}

if model.Time(s.TimestampMs) > model.Now().Add(time.Duration(limits.CreationGracePeriod)) {
DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc()
return newSampleTimestampTooNewError(unsafeMetricName, s.TimestampMs)
Expand Down

0 comments on commit 5acf328

Please sign in to comment.