Skip to content

Commit

Permalink
support out of order samples ingestion feature
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Mar 22, 2023
1 parent f23d591 commit e7be020
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers.
* [FEATURE] Alertmanager: Add support for time_intervals. #5102
* [FEATURE] Added `snappy-block` as an option for grpc compression #5215
* [FEATURE] Enable experimental out-of-order samples support. Added 2 new configs `ingester.out_of_order_time_window` and `blocks-storage.tsdb.out_of_order_cap_max`. #4964
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1227,4 +1227,9 @@ blocks_storage:
# down.
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
[memory_snapshot_on_shutdown: <boolean> | default = false]

# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]
```
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1304,4 +1304,9 @@ blocks_storage:
# down.
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
[memory_snapshot_on_shutdown: <boolean> | default = false]

# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]
```
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,11 @@ tsdb:
# down.
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
[memory_snapshot_on_shutdown: <boolean> | default = false]

# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]
```
### `compactor_config`
Expand Down
6 changes: 5 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ Currently experimental features are:
- `-frontend.query-vertical-shard-size` (int) CLI flag
- `query_vertical_shard_size` (int) field in runtime config file
- Snapshotting of in-memory TSDB on disk during shutdown
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
- Out of order samples support
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
- `-ingester.out-of-order-time-window` (duration) CLI flag
- `out_of_order_time_window` (duration) field in runtime config file
1 change: 0 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3036,7 +3036,6 @@ func TestDistributorValidation(t *testing.T) {
}},
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
},

// Test validation fails for samples from the future.
{
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
Expand Down
17 changes: 15 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,11 +842,13 @@ func (i *Ingester) updateUserTSDBConfigs() {
ExemplarsConfig: &config.ExemplarsConfig{
MaxExemplars: i.getMaxExemplars(userID),
},
TSDBConfig: &config.TSDBConfig{
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
},
},
}

// This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method
// with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars.
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
err := userDB.db.ApplyConfig(cfg)
if err != nil {
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
Expand Down Expand Up @@ -995,6 +997,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
Expand Down Expand Up @@ -1063,6 +1066,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrTooOldSample:
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errMaxSeriesPerUserLimitExceeded:
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
Expand Down Expand Up @@ -1153,6 +1161,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if sampleOutOfOrderCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
}
if sampleTooOldCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
}
if newValueForTimestampCount > 0 {
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
}
Expand Down Expand Up @@ -1947,6 +1958,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
MaxExemplars: maxExemplarsForUser,
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
92 changes: 90 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestIngester_Push(t *testing.T) {
additionalMetrics []string
disableActiveSeries bool
maxExemplars int
oooTimeWindow time.Duration
}{
"should succeed on valid series and metadata": {
reqs: []*cortexpb.WriteRequest{
Expand Down Expand Up @@ -553,7 +554,7 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_memory_series_removed_total{user="test"} 0
`,
},
"should soft fail on sample out of order": {
"ooo disabled, should soft fail on sample out of order": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
Expand Down Expand Up @@ -597,7 +598,7 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_active_series{user="test"} 1
`,
},
"should soft fail on sample out of bound": {
"ooo disabled, should soft fail on sample out of bound": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
Expand Down Expand Up @@ -641,6 +642,92 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_active_series{user="test"} 1
`,
},
"ooo enabled, should soft fail on sample too old": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
nil,
cortexpb.API),
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (600 * 1000)}},
nil,
cortexpb.API),
},
oooTimeWindow: 5 * time.Minute,
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(storage.ErrTooOldSample, model.Time(1575043969-(600*1000)), cortexpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedIngested: []cortexpb.TimeSeries{
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}},
},
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 1
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1
# HELP cortex_ingester_active_series Number of currently active series per user.
# TYPE cortex_ingester_active_series gauge
cortex_ingester_active_series{user="test"} 1
`,
},
"ooo enabled, should succeed": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
nil,
cortexpb.API),
cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}},
nil,
cortexpb.API),
},
oooTimeWindow: 5 * time.Minute,
expectedIngested: []cortexpb.TimeSeries{
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}},
},
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 2
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_ingester_active_series Number of currently active series per user.
# TYPE cortex_ingester_active_series gauge
cortex_ingester_active_series{user="test"} 1
`,
},
"should soft fail on two different sample values at the same timestamp": {
reqs: []*cortexpb.WriteRequest{
cortexpb.ToWriteRequest(
Expand Down Expand Up @@ -777,6 +864,7 @@ func TestIngester_Push(t *testing.T) {

limits := defaultLimitsTestConfig()
limits.MaxExemplars = testData.maxExemplars
limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow)
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ const (
sampleOutOfOrder = "sample-out-of-order"
newValueForTimestamp = "new-value-for-timestamp"
sampleOutOfBounds = "sample-out-of-bounds"
sampleTooOld = "sample-too-old"
)
12 changes: 11 additions & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/alecthomas/units"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -45,6 +46,7 @@ var (
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
)

Expand Down Expand Up @@ -145,11 +147,14 @@ type TSDBConfig struct {
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
CloseIdleTSDBInterval time.Duration `yaml:"-"`

// Positive value enables experiemental support for exemplars. 0 or less to disable.
// Positive value enables experimental support for exemplars. 0 or less to disable.
MaxExemplars int `yaml:"max_exemplars"`

// Enable snapshotting of in-memory TSDB data on disk when shutting down.
MemorySnapshotOnShutdown bool `yaml:"memory_snapshot_on_shutdown"`

// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
}

// RegisterFlags registers the TSDBConfig flags.
Expand All @@ -176,6 +181,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
}

// Validate the config.
Expand Down Expand Up @@ -212,6 +218,10 @@ func (cfg *TSDBConfig) Validate() error {
return errInvalidWALSegmentSizeBytes
}

if cfg.OutOfOrderCapMax <= 0 {
return errInvalidOutOfOrderCapMax
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: errInvalidWALSegmentSizeBytes,
},
"should fail on out of order cap max": {
setup: func(cfg *BlocksStorageConfig) {
cfg.TSDB.OutOfOrderCapMax = 0
},
expectedErr: errInvalidOutOfOrderCapMax,
},
}

for testName, testData := range tests {
Expand Down
Loading

0 comments on commit e7be020

Please sign in to comment.