Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support out of order samples ingestion #4964

Merged
merged 1 commit into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers.
* [FEATURE] Alertmanager: Add support for time_intervals. #5102
* [FEATURE] Added `snappy-block` as an option for grpc compression #5215
* [FEATURE] Enable experimental out-of-order samples support. Added 2 new configs `ingester.out_of_order_time_window` and `blocks-storage.tsdb.out_of_order_cap_max`. #4964
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1227,4 +1227,9 @@ blocks_storage:
# down.
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
[memory_snapshot_on_shutdown: <boolean> | default = false]

# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]
Copy link
Contributor Author

@yeya24 yeya24 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to me. Shall we extract TSDB configs from block storage section? Having them in querier and SG is strange.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think querier comonent doesn't carray about this field, Is this due to the use of automatic template document?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think querier and store gateway uses some configs from block storage.
But tsdb configs shouldn't be related.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If out of order is enabled, users should not care about the configuration here (the burden of configuration is too heavy), unless there is a maximum capacity limit (the default is the largest).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are talking about the ooo capacity, I don't think this change would introduce additional burden to them.

This is a global value, not per tenant.

```
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1304,4 +1304,9 @@ blocks_storage:
# down.
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
[memory_snapshot_on_shutdown: <boolean> | default = false]

# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]
```
14 changes: 12 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,11 @@ tsdb:
# down.
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
[memory_snapshot_on_shutdown: <boolean> | default = false]

# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]
```

### `compactor_config`
Expand Down Expand Up @@ -2852,6 +2857,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-metadata-per-metric
[max_global_metadata_per_metric: <int> | default = 0]

# [Experimental] Configures the allowed time window for ingestion of
# out-of-order samples. Disabled (0s) by default.
# CLI flag: -ingester.out-of-order-time-window
[out_of_order_time_window: <duration> | default = 0s]

# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage. This limit is enforced in the querier, ruler and
# store-gateway. 0 to disable.
Expand All @@ -2864,8 +2874,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -querier.max-fetched-series-per-query
[max_fetched_series_per_query: <int> | default = 0]

# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
# of all chunks in bytes that a query can fetch from each ingester and storage.
# Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of
# all chunks in bytes that a query can fetch from each ingester and storage.
# This limit is enforced in the querier, ruler and store-gateway. 0 to disable.
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
[max_fetched_chunk_bytes_per_query: <int> | default = 0]
Expand Down
6 changes: 5 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ Currently experimental features are:
- `-frontend.query-vertical-shard-size` (int) CLI flag
- `query_vertical_shard_size` (int) field in runtime config file
- Snapshotting of in-memory TSDB on disk during shutdown
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
- Out of order samples support
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
- `-ingester.out-of-order-time-window` (duration) CLI flag
- `out_of_order_time_window` (duration) field in runtime config file
1 change: 0 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3036,7 +3036,6 @@ func TestDistributorValidation(t *testing.T) {
}},
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
},

// Test validation fails for samples from the future.
{
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
Expand Down
17 changes: 15 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,11 +842,13 @@ func (i *Ingester) updateUserTSDBConfigs() {
ExemplarsConfig: &config.ExemplarsConfig{
MaxExemplars: i.getMaxExemplars(userID),
},
TSDBConfig: &config.TSDBConfig{
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
},
},
}

// This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method
// with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars.
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
err := userDB.db.ApplyConfig(cfg)
if err != nil {
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
Expand Down Expand Up @@ -995,6 +997,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
Expand Down Expand Up @@ -1063,6 +1066,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrTooOldSample:
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errMaxSeriesPerUserLimitExceeded:
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
Expand Down Expand Up @@ -1153,6 +1161,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if sampleOutOfOrderCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
}
if sampleTooOldCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
}
if newValueForTimestampCount > 0 {
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
}
Expand Down Expand Up @@ -1947,6 +1958,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
MaxExemplars: maxExemplarsForUser,
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
92 changes: 90 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestIngester_Push(t *testing.T) {
additionalMetrics []string
disableActiveSeries bool
maxExemplars int
oooTimeWindow time.Duration
}{
"should succeed on valid series and metadata": {
reqs: []*cortexpb.WriteRequest{
Expand Down Expand Up @@ -553,7 +554,7 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_memory_series_removed_total{user="test"} 0
`,
},
"should soft fail on sample out of order": {
"ooo disabled, should soft fail on sample out of order": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
Expand Down Expand Up @@ -597,7 +598,7 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_active_series{user="test"} 1
`,
},
"should soft fail on sample out of bound": {
"ooo disabled, should soft fail on sample out of bound": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
Expand Down Expand Up @@ -641,6 +642,92 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_active_series{user="test"} 1
`,
},
"ooo enabled, should soft fail on sample too old": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
nil,
cortexpb.API),
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (600 * 1000)}},
nil,
cortexpb.API),
},
oooTimeWindow: 5 * time.Minute,
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(storage.ErrTooOldSample, model.Time(1575043969-(600*1000)), cortexpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedIngested: []cortexpb.TimeSeries{
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}},
},
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 1
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1
# HELP cortex_ingester_active_series Number of currently active series per user.
# TYPE cortex_ingester_active_series gauge
cortex_ingester_active_series{user="test"} 1
`,
},
"ooo enabled, should succeed": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
nil,
cortexpb.API),
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}},
nil,
cortexpb.API),
},
oooTimeWindow: 5 * time.Minute,
expectedIngested: []cortexpb.TimeSeries{
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}},
},
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 2
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_ingester_active_series Number of currently active series per user.
# TYPE cortex_ingester_active_series gauge
cortex_ingester_active_series{user="test"} 1
`,
},
"should soft fail on two different sample values at the same timestamp": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
Expand Down Expand Up @@ -777,6 +864,7 @@ func TestIngester_Push(t *testing.T) {

limits := defaultLimitsTestConfig()
limits.MaxExemplars = testData.maxExemplars
limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow)
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ const (
sampleOutOfOrder = "sample-out-of-order"
newValueForTimestamp = "new-value-for-timestamp"
sampleOutOfBounds = "sample-out-of-bounds"
sampleTooOld = "sample-too-old"
)
12 changes: 11 additions & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/alecthomas/units"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -45,6 +46,7 @@ var (
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
)

Expand Down Expand Up @@ -145,11 +147,14 @@ type TSDBConfig struct {
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
CloseIdleTSDBInterval time.Duration `yaml:"-"`

// Positive value enables experiemental support for exemplars. 0 or less to disable.
// Positive value enables experimental support for exemplars. 0 or less to disable.
MaxExemplars int `yaml:"max_exemplars"`

// Enable snapshotting of in-memory TSDB data on disk when shutting down.
MemorySnapshotOnShutdown bool `yaml:"memory_snapshot_on_shutdown"`

// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
}

// RegisterFlags registers the TSDBConfig flags.
Expand All @@ -176,6 +181,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
}

// Validate the config.
Expand Down Expand Up @@ -212,6 +218,10 @@ func (cfg *TSDBConfig) Validate() error {
return errInvalidWALSegmentSizeBytes
}

if cfg.OutOfOrderCapMax <= 0 {
return errInvalidOutOfOrderCapMax
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: errInvalidWALSegmentSizeBytes,
},
"should fail on out of order cap max": {
setup: func(cfg *BlocksStorageConfig) {
cfg.TSDB.OutOfOrderCapMax = 0
},
expectedErr: errInvalidOutOfOrderCapMax,
},
}

for testName, testData := range tests {
Expand Down
Loading