diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 85e010b56825..a723ce8c19ff 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,6 +17,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled tenant-rw changefeed.fast_gzip.enabled boolean true use fast gzip implementation tenant-rw changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds tenant-rw +changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed is the protected timestamp age exceeds this threshold; 0 disables expiration tenant-rw changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables tenant-rw changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. tenant-rw cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 686268b50177..e4763298ab28 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -23,6 +23,7 @@
changefeed.event_consumer_workers
integer0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabledServerless/Dedicated/Self-Hosted
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementationServerless/Dedicated/Self-Hosted
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeedsServerless/Dedicated/Self-Hosted +
changefeed.protect_timestamp.max_age
duration96h0m0sfail the changefeed is the protected timestamp age exceeds this threshold; 0 disables expirationServerless/Dedicated/Self-Hosted
changefeed.schema_feed.read_with_priority_after
duration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disablesServerless/Dedicated/Self-Hosted
changefeed.sink_io_workers
integer0the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.Serverless/Dedicated/Self-Hosted
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an uploadServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 9286954617c9..4c34e8c4d0a7 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -239,7 +239,6 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", - "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/scheduledjobs/schedulebase", @@ -269,7 +268,6 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/flowinfra", "//pkg/sql/importer", - "//pkg/sql/isql", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", @@ -309,7 +307,6 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", - "//pkg/util/uuid", "//pkg/workload/bank", "//pkg/workload/ledger", "//pkg/workload/workloadsql", diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 68b2d76e46d4..e9ca11b606b7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -714,6 +714,10 @@ func createChangefeedJobRecord( return nil, err } + if ptsExpiration == 0 { + ptsExpiration = changefeedbase.MaxProtectedTimestampAge.Get(&p.ExecCfg().Settings.SV) + } + if ptsExpiration > 0 && ptsExpiration < time.Hour { // This threshold is rather arbitrary. But we want to warn users about // the potential impact of keeping this setting too low. @@ -1164,10 +1168,6 @@ func (b *changefeedResumer) handleChangefeedError( changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) return b.job.NoTxn().PauseRequestedWithFunc(ctx, func(ctx context.Context, planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error { - err := b.OnPauseRequest(ctx, jobExec, txn, progress) - if err != nil { - return err - } // directly update running status to avoid the running/reverted job status check progress.RunningStatus = errorMessage log.Warningf(ctx, errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) @@ -1409,47 +1409,6 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp( } } -var _ jobs.PauseRequester = (*changefeedResumer)(nil) - -// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being -// paused, we may want to clear the protected timestamp record. -func (b *changefeedResumer) OnPauseRequest( - ctx context.Context, jobExec interface{}, txn isql.Txn, progress *jobspb.Progress, -) error { - details := b.job.Details().(jobspb.ChangefeedDetails) - - cp := progress.GetChangefeed() - execCfg := jobExec.(sql.JobExecContext).ExecCfg() - - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { - // Release existing pts record to avoid a single changefeed left on pause - // resulting in storage issues - if cp.ProtectedTimestampRecord != uuid.Nil { - pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - if err := pts.Release(ctx, cp.ProtectedTimestampRecord); err != nil { - log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err) - } else { - cp.ProtectedTimestampRecord = uuid.Nil - } - } - return nil - } - - if cp.ProtectedTimestampRecord == uuid.Nil { - resolved := progress.GetHighWater() - if resolved == nil { - return nil - } - pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - ptr := createProtectedTimestampRecord( - ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp, - ) - return pts.Protect(ctx, ptr) - } - - return nil -} - // getQualifiedTableName returns the database-qualified name of the table // or view represented by the provided descriptor. func getQualifiedTableName( diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e5d5bb078add..709d89486e43 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -52,7 +52,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -95,7 +94,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/lib/pq" @@ -5626,84 +5624,6 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { })) } -func TestChangefeedProtectedTimestampOnPause(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - testFn := func(shouldPause bool) cdcTestFn { - return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`) - - var tableID int - sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+ - `WHERE name = 'foo' AND database_name = current_database()`). - Scan(&tableID) - stmt := `CREATE CHANGEFEED FOR foo WITH resolved` - if shouldPause { - stmt += ", " + changefeedbase.OptProtectDataFromGCOnPause - } - foo := feed(t, f, stmt) - defer closeFeed(t, foo) - assertPayloads(t, foo, []string{ - `foo: [1]->{"after": {"a": 1, "b": "a"}}`, - `foo: [2]->{"after": {"a": 2, "b": "b"}}`, - `foo: [4]->{"after": {"a": 4, "b": "c"}}`, - `foo: [7]->{"after": {"a": 7, "b": "d"}}`, - `foo: [8]->{"after": {"a": 8, "b": "e"}}`, - }) - expectResolvedTimestamp(t, foo) - - // Pause the job then ensure that it has a reasonable protected timestamp. - - ctx := context.Background() - serverCfg := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig - jr := serverCfg.JobRegistry - pts := ptstorage.WithDatabase( - serverCfg.ProtectedTimestampProvider, serverCfg.DB, - ) - - feedJob := foo.(cdctest.EnterpriseTestFeed) - require.NoError(t, feedJob.Pause()) - { - j, err := jr.LoadJob(ctx, feedJob.JobID()) - require.NoError(t, err) - progress := j.Progress() - details := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - if shouldPause { - require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord) - r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord) - require.NoError(t, err) - require.True(t, r.Timestamp.LessEq(*progress.GetHighWater())) - } else { - require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord) - } - } - - // Resume the job and ensure that the protected timestamp is removed once - // the changefeed has caught up. - require.NoError(t, feedJob.Resume()) - testutils.SucceedsSoon(t, func() error { - resolvedTs, _ := expectResolvedTimestamp(t, foo) - j, err := jr.LoadJob(ctx, feedJob.JobID()) - require.NoError(t, err) - details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed - r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord) - if err != nil || r.Timestamp.Less(resolvedTs) { - return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs) - } - return nil - }) - } - } - - testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) { - cdcTest(t, testFn(shouldPause), feedTestEnterpriseSinks) - }) - -} - func TestManyChangefeedsOneTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 413e824286f8..1b1991466075 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -73,34 +73,34 @@ const ( // Constants for the options. const ( - OptAvroSchemaPrefix = `avro_schema_prefix` - OptConfluentSchemaRegistry = `confluent_schema_registry` - OptCursor = `cursor` - OptCustomKeyColumn = `key_column` - OptEndTime = `end_time` - OptEnvelope = `envelope` - OptFormat = `format` - OptFullTableName = `full_table_name` - OptKeyInValue = `key_in_value` - OptTopicInValue = `topic_in_value` - OptResolvedTimestamps = `resolved` - OptMinCheckpointFrequency = `min_checkpoint_frequency` - OptUpdatedTimestamps = `updated` - OptMVCCTimestamps = `mvcc_timestamp` - OptDiff = `diff` - OptCompression = `compression` - OptSchemaChangeEvents = `schema_change_events` - OptSchemaChangePolicy = `schema_change_policy` - OptSplitColumnFamilies = `split_column_families` - OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` - OptExpirePTSAfter = `gc_protect_expires_after` - OptWebhookAuthHeader = `webhook_auth_header` - OptWebhookClientTimeout = `webhook_client_timeout` - OptOnError = `on_error` - OptMetricsScope = `metrics_label` - OptUnordered = `unordered` - OptVirtualColumns = `virtual_columns` - OptExecutionLocality = `execution_locality` + OptAvroSchemaPrefix = `avro_schema_prefix` + OptConfluentSchemaRegistry = `confluent_schema_registry` + OptCursor = `cursor` + OptCustomKeyColumn = `key_column` + OptEndTime = `end_time` + OptEnvelope = `envelope` + OptFormat = `format` + OptFullTableName = `full_table_name` + OptKeyInValue = `key_in_value` + OptTopicInValue = `topic_in_value` + OptResolvedTimestamps = `resolved` + OptMinCheckpointFrequency = `min_checkpoint_frequency` + OptUpdatedTimestamps = `updated` + OptMVCCTimestamps = `mvcc_timestamp` + OptDiff = `diff` + OptCompression = `compression` + OptSchemaChangeEvents = `schema_change_events` + OptSchemaChangePolicy = `schema_change_policy` + OptSplitColumnFamilies = `split_column_families` + DeprecatedOptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` + OptExpirePTSAfter = `gc_protect_expires_after` + OptWebhookAuthHeader = `webhook_auth_header` + OptWebhookClientTimeout = `webhook_client_timeout` + OptOnError = `on_error` + OptMetricsScope = `metrics_label` + OptUnordered = `unordered` + OptVirtualColumns = `virtual_columns` + OptExecutionLocality = `execution_locality` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` OptVirtualColumnsNull VirtualColumnVisibility = `null` @@ -309,40 +309,40 @@ var jsonOption = OptionPermittedValues{Type: OptionTypeJSON} // ChangefeedOptionExpectValues is used to parse changefeed options using // PlanHookState.TypeAsStringOpts(). var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ - OptAvroSchemaPrefix: stringOption, - OptConfluentSchemaRegistry: stringOption, - OptCursor: timestampOption, - OptCustomKeyColumn: stringOption, - OptEndTime: timestampOption, - OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare"), - OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"), - OptFullTableName: flagOption, - OptKeyInValue: flagOption, - OptTopicInValue: flagOption, - OptResolvedTimestamps: durationOption.thatCanBeZero().orEmptyMeans("0"), - OptMinCheckpointFrequency: durationOption.thatCanBeZero(), - OptUpdatedTimestamps: flagOption, - OptMVCCTimestamps: flagOption, - OptDiff: flagOption, - OptCompression: enum("gzip", "zstd"), - OptSchemaChangeEvents: enum("column_changes", "default"), - OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"), - OptSplitColumnFamilies: flagOption, - OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"), - OptNoInitialScan: flagOption, - OptInitialScanOnly: flagOption, - OptProtectDataFromGCOnPause: flagOption, - OptExpirePTSAfter: durationOption.thatCanBeZero(), - OptKafkaSinkConfig: jsonOption, - OptPubsubSinkConfig: jsonOption, - OptWebhookSinkConfig: jsonOption, - OptWebhookAuthHeader: stringOption, - OptWebhookClientTimeout: durationOption, - OptOnError: enum("pause", "fail"), - OptMetricsScope: stringOption, - OptUnordered: flagOption, - OptVirtualColumns: enum("omitted", "null"), - OptExecutionLocality: stringOption, + OptAvroSchemaPrefix: stringOption, + OptConfluentSchemaRegistry: stringOption, + OptCursor: timestampOption, + OptCustomKeyColumn: stringOption, + OptEndTime: timestampOption, + OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare"), + OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"), + OptFullTableName: flagOption, + OptKeyInValue: flagOption, + OptTopicInValue: flagOption, + OptResolvedTimestamps: durationOption.thatCanBeZero().orEmptyMeans("0"), + OptMinCheckpointFrequency: durationOption.thatCanBeZero(), + OptUpdatedTimestamps: flagOption, + OptMVCCTimestamps: flagOption, + OptDiff: flagOption, + OptCompression: enum("gzip", "zstd"), + OptSchemaChangeEvents: enum("column_changes", "default"), + OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"), + OptSplitColumnFamilies: flagOption, + OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"), + OptNoInitialScan: flagOption, + OptInitialScanOnly: flagOption, + DeprecatedOptProtectDataFromGCOnPause: flagOption, + OptExpirePTSAfter: durationOption.thatCanBeZero(), + OptKafkaSinkConfig: jsonOption, + OptPubsubSinkConfig: jsonOption, + OptWebhookSinkConfig: jsonOption, + OptWebhookAuthHeader: stringOption, + OptWebhookClientTimeout: durationOption, + OptOnError: enum("pause", "fail"), + OptMetricsScope: stringOption, + OptUnordered: flagOption, + OptVirtualColumns: enum("omitted", "null"), + OptExecutionLocality: stringOption, } // CommonOptions is options common to all sinks @@ -352,7 +352,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptResolvedTimestamps, OptUpdatedTimestamps, OptMVCCTimestamps, OptDiff, OptSplitColumnFamilies, OptSchemaChangeEvents, OptSchemaChangePolicy, - OptProtectDataFromGCOnPause, OptOnError, + OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptCustomKeyColumn, OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter, OptExecutionLocality, @@ -385,6 +385,9 @@ var ExternalConnectionValidOptions = unionStringSets(SQLValidOptions, KafkaValid var CaseInsensitiveOpts = makeStringSet(OptFormat, OptEnvelope, OptCompression, OptSchemaChangeEvents, OptSchemaChangePolicy, OptOnError, OptInitialScan) +// RetiredOptions are the options which are no longer active. +var RetiredOptions = makeStringSet(DeprecatedOptProtectDataFromGCOnPause) + // redactionFunc is a function applied to a string option which returns its redacted value. type redactionFunc func(string) (string, error) @@ -529,12 +532,17 @@ func (s StatementOptions) IsSet(key string) bool { } // DeprecationWarnings checks for options in forms we still support and serialize, -// but should be replaced with a new form. Currently hardcoded to just check format. +// but should be replaced with a new form. func (s StatementOptions) DeprecationWarnings() []string { if newFormat, ok := NoLongerExperimental[s.m[OptFormat]]; ok { return []string{fmt.Sprintf(`%[1]s is no longer experimental, use %[2]s=%[1]s`, newFormat, OptFormat)} } + for retiredOpt := range RetiredOptions { + if _, isSet := s.m[retiredOpt]; isSet { + return []string{fmt.Sprintf("%s option is no longer needed", retiredOpt)} + } + } return []string{} } diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 1f17af7e5ac7..35879a6c0df5 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -211,6 +211,15 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting( settings.PositiveDuration, ) +// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates +var MaxProtectedTimestampAge = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.protect_timestamp.max_age", + "fail the changefeed is the protected timestamp age exceeds this threshold; 0 disables expiration", + 4*24*time.Hour, + settings.NonNegativeDuration, +).WithPublic() + // BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5c15ceca655d..1d19ee8a5e30 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -309,6 +309,9 @@ func validateSinkOptions(opts map[string]string, sinkSpecificOpts map[string]str if _, ok := changefeedbase.CommonOptions[opt]; ok { continue } + if _, retired := changefeedbase.RetiredOptions[opt]; retired { + continue + } if sinkSpecificOpts != nil { if _, ok := sinkSpecificOpts[opt]; ok { continue diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2c83d846bafc..4b29f4cd967e 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -42,7 +42,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" - "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -351,13 +350,6 @@ func (r *reportErrorResumer) OnFailOrCancel( return r.wrapped.OnFailOrCancel(ctx, execCtx, jobErr) } -// OnPauseRequest implements PauseRequester interface. -func (r *reportErrorResumer) OnPauseRequest( - ctx context.Context, execCtx interface{}, txn isql.Txn, details *jobspb.Progress, -) error { - return r.wrapped.(*changefeedResumer).OnPauseRequest(ctx, execCtx, txn, details) -} - type wrapSinkFn func(sink Sink) Sink // jobFeed indicates that the feed is an "enterprise feed" -- that is, diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go index b69b90eb2199..b4cc61606db4 100644 --- a/pkg/jobs/metricspoller/job_statistics.go +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -170,12 +170,8 @@ func processJobPTSRecord( } // If MaximumPTSAge is set on the job payload, verify if PTS record - // timestamp is fresh enough. Note: we only look at paused jobs. - // If the running job wants to enforce an invariant wrt to PTS age, - // it can do so itself. This check here is a safety mechanism to detect - // paused jobs that own protected timestamp records. - if j.Status() == jobs.StatusPaused && - p.MaximumPTSAge > 0 && + // timestamp is fresh enough. + if p.MaximumPTSAge > 0 && rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { stats.expired++ ptsExpired := errors.Newf(