From a90f15cb4ddc3da3b4626d4e3ade725bd5910c8b Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 17 May 2023 15:32:16 -0400 Subject: [PATCH] changefeedccl: Improve protected timestamp handling Changefeeds utilize protected timestamp (PTS) in order to ensure that the data is not garbage collected (GC) prematurely. This subsystem underwent through many rounds of changes, resulting in an unintuitive, and potentially dangerous behavior. This PR updates and improves PTS handling as follows. PR #97148 introduce capability to cancel jobs that hold on to stale PTS records. This PR expands this functionality to apply to all jobs -- not just paused jobs. This is necessary because due to #90810, changefeeds will retry almost every error -- and that means that if a running changefeed jobs fails to make progress for very long time, it is possible that a PTS record will protect GC collection for many days, weeks, or even months. To guard against this case, introduce a new cluster setting `changefeed.protect_timestamp.max_age`, which defaults to generous 4 days, to make sure that even if the explicit changefeed option `gc_protect_expires_after` was not specified, the changefeed will fail after `changefeed.protect_timestamp.max_age` if no progress is made. The fail safe can be disabled by setting `changefeed.protect_timestamp.max_age` to 0; Note, however, that doing so could result in stability issues once stale PTS record released. In addition, this PR deprecates `protect_data_from_gc_on_pause` option. This option is not needed since we now employ "active protected timestamp" management (meaning: there is always a PTS record when running changefeed jobs), and the handling of this record is consistent for both running and paused jobs. Fixes #103464 Release note (enterprise change): Introduce a new `changefeed.protect_timestamp.max_age` setting (default 4 days), which will cancel running changefeed jobs if they fail to make forward progress for much time. This setting is used if the explicit `gc_protect_expires_after` option was not set. In addition, deprecate `protect_data_from_gc_on_pause` option. This option is no longer needed since changefeed jobs always protect data. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 3 - pkg/ccl/changefeedccl/changefeed_stmt.go | 49 +------ pkg/ccl/changefeedccl/changefeed_test.go | 80 ----------- .../changefeedccl/changefeedbase/options.go | 136 +++++++++--------- .../changefeedccl/changefeedbase/settings.go | 9 ++ pkg/ccl/changefeedccl/sink.go | 3 + pkg/ccl/changefeedccl/testfeed_test.go | 8 -- pkg/jobs/metricspoller/job_statistics.go | 8 +- 10 files changed, 92 insertions(+), 206 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 85e010b56825..a723ce8c19ff 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,6 +17,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled tenant-rw changefeed.fast_gzip.enabled boolean true use fast gzip implementation tenant-rw changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds tenant-rw +changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed is the protected timestamp age exceeds this threshold; 0 disables expiration tenant-rw changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables tenant-rw changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. tenant-rw cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 686268b50177..e4763298ab28 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -23,6 +23,7 @@
changefeed.event_consumer_workers
integer0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabledServerless/Dedicated/Self-Hosted
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementationServerless/Dedicated/Self-Hosted
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeedsServerless/Dedicated/Self-Hosted +
changefeed.protect_timestamp.max_age
duration96h0m0sfail the changefeed is the protected timestamp age exceeds this threshold; 0 disables expirationServerless/Dedicated/Self-Hosted
changefeed.schema_feed.read_with_priority_after
duration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disablesServerless/Dedicated/Self-Hosted
changefeed.sink_io_workers
integer0the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.Serverless/Dedicated/Self-Hosted
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an uploadServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 9286954617c9..4c34e8c4d0a7 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -239,7 +239,6 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", - "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/scheduledjobs/schedulebase", @@ -269,7 +268,6 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/flowinfra", "//pkg/sql/importer", - "//pkg/sql/isql", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", @@ -309,7 +307,6 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", - "//pkg/util/uuid", "//pkg/workload/bank", "//pkg/workload/ledger", "//pkg/workload/workloadsql", diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 68b2d76e46d4..e9ca11b606b7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -714,6 +714,10 @@ func createChangefeedJobRecord( return nil, err } + if ptsExpiration == 0 { + ptsExpiration = changefeedbase.MaxProtectedTimestampAge.Get(&p.ExecCfg().Settings.SV) + } + if ptsExpiration > 0 && ptsExpiration < time.Hour { // This threshold is rather arbitrary. But we want to warn users about // the potential impact of keeping this setting too low. @@ -1164,10 +1168,6 @@ func (b *changefeedResumer) handleChangefeedError( changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) return b.job.NoTxn().PauseRequestedWithFunc(ctx, func(ctx context.Context, planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error { - err := b.OnPauseRequest(ctx, jobExec, txn, progress) - if err != nil { - return err - } // directly update running status to avoid the running/reverted job status check progress.RunningStatus = errorMessage log.Warningf(ctx, errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) @@ -1409,47 +1409,6 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp( } } -var _ jobs.PauseRequester = (*changefeedResumer)(nil) - -// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being -// paused, we may want to clear the protected timestamp record. -func (b *changefeedResumer) OnPauseRequest( - ctx context.Context, jobExec interface{}, txn isql.Txn, progress *jobspb.Progress, -) error { - details := b.job.Details().(jobspb.ChangefeedDetails) - - cp := progress.GetChangefeed() - execCfg := jobExec.(sql.JobExecContext).ExecCfg() - - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { - // Release existing pts record to avoid a single changefeed left on pause - // resulting in storage issues - if cp.ProtectedTimestampRecord != uuid.Nil { - pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - if err := pts.Release(ctx, cp.ProtectedTimestampRecord); err != nil { - log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err) - } else { - cp.ProtectedTimestampRecord = uuid.Nil - } - } - return nil - } - - if cp.ProtectedTimestampRecord == uuid.Nil { - resolved := progress.GetHighWater() - if resolved == nil { - return nil - } - pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - ptr := createProtectedTimestampRecord( - ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp, - ) - return pts.Protect(ctx, ptr) - } - - return nil -} - // getQualifiedTableName returns the database-qualified name of the table // or view represented by the provided descriptor. func getQualifiedTableName( diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e5d5bb078add..709d89486e43 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -52,7 +52,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -95,7 +94,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/lib/pq" @@ -5626,84 +5624,6 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { })) } -func TestChangefeedProtectedTimestampOnPause(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - testFn := func(shouldPause bool) cdcTestFn { - return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`) - - var tableID int - sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+ - `WHERE name = 'foo' AND database_name = current_database()`). - Scan(&tableID) - stmt := `CREATE CHANGEFEED FOR foo WITH resolved` - if shouldPause { - stmt += ", " + changefeedbase.OptProtectDataFromGCOnPause - } - foo := feed(t, f, stmt) - defer closeFeed(t, foo) - assertPayloads(t, foo, []string{ - `foo: [1]->{"after": {"a": 1, "b": "a"}}`, - `foo: [2]->{"after": {"a": 2, "b": "b"}}`, - `foo: [4]->{"after": {"a": 4, "b": "c"}}`, - `foo: [7]->{"after": {"a": 7, "b": "d"}}`, - `foo: [8]->{"after": {"a": 8, "b": "e"}}`, - }) - expectResolvedTimestamp(t, foo) - - // Pause the job then ensure that it has a reasonable protected timestamp. - - ctx := context.Background() - serverCfg := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig - jr := serverCfg.JobRegistry - pts := ptstorage.WithDatabase( - serverCfg.ProtectedTimestampProvider, serverCfg.DB, - ) - - feedJob := foo.(cdctest.EnterpriseTestFeed) - require.NoError(t, feedJob.Pause()) - { - j, err := jr.LoadJob(ctx, feedJob.JobID()) - require.NoError(t, err) - progress := j.Progress() - details := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - if shouldPause { - require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord) - r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord) - require.NoError(t, err) - require.True(t, r.Timestamp.LessEq(*progress.GetHighWater())) - } else { - require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord) - } - } - - // Resume the job and ensure that the protected timestamp is removed once - // the changefeed has caught up. - require.NoError(t, feedJob.Resume()) - testutils.SucceedsSoon(t, func() error { - resolvedTs, _ := expectResolvedTimestamp(t, foo) - j, err := jr.LoadJob(ctx, feedJob.JobID()) - require.NoError(t, err) - details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed - r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord) - if err != nil || r.Timestamp.Less(resolvedTs) { - return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs) - } - return nil - }) - } - } - - testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) { - cdcTest(t, testFn(shouldPause), feedTestEnterpriseSinks) - }) - -} - func TestManyChangefeedsOneTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 413e824286f8..1b1991466075 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -73,34 +73,34 @@ const ( // Constants for the options. const ( - OptAvroSchemaPrefix = `avro_schema_prefix` - OptConfluentSchemaRegistry = `confluent_schema_registry` - OptCursor = `cursor` - OptCustomKeyColumn = `key_column` - OptEndTime = `end_time` - OptEnvelope = `envelope` - OptFormat = `format` - OptFullTableName = `full_table_name` - OptKeyInValue = `key_in_value` - OptTopicInValue = `topic_in_value` - OptResolvedTimestamps = `resolved` - OptMinCheckpointFrequency = `min_checkpoint_frequency` - OptUpdatedTimestamps = `updated` - OptMVCCTimestamps = `mvcc_timestamp` - OptDiff = `diff` - OptCompression = `compression` - OptSchemaChangeEvents = `schema_change_events` - OptSchemaChangePolicy = `schema_change_policy` - OptSplitColumnFamilies = `split_column_families` - OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` - OptExpirePTSAfter = `gc_protect_expires_after` - OptWebhookAuthHeader = `webhook_auth_header` - OptWebhookClientTimeout = `webhook_client_timeout` - OptOnError = `on_error` - OptMetricsScope = `metrics_label` - OptUnordered = `unordered` - OptVirtualColumns = `virtual_columns` - OptExecutionLocality = `execution_locality` + OptAvroSchemaPrefix = `avro_schema_prefix` + OptConfluentSchemaRegistry = `confluent_schema_registry` + OptCursor = `cursor` + OptCustomKeyColumn = `key_column` + OptEndTime = `end_time` + OptEnvelope = `envelope` + OptFormat = `format` + OptFullTableName = `full_table_name` + OptKeyInValue = `key_in_value` + OptTopicInValue = `topic_in_value` + OptResolvedTimestamps = `resolved` + OptMinCheckpointFrequency = `min_checkpoint_frequency` + OptUpdatedTimestamps = `updated` + OptMVCCTimestamps = `mvcc_timestamp` + OptDiff = `diff` + OptCompression = `compression` + OptSchemaChangeEvents = `schema_change_events` + OptSchemaChangePolicy = `schema_change_policy` + OptSplitColumnFamilies = `split_column_families` + DeprecatedOptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` + OptExpirePTSAfter = `gc_protect_expires_after` + OptWebhookAuthHeader = `webhook_auth_header` + OptWebhookClientTimeout = `webhook_client_timeout` + OptOnError = `on_error` + OptMetricsScope = `metrics_label` + OptUnordered = `unordered` + OptVirtualColumns = `virtual_columns` + OptExecutionLocality = `execution_locality` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` OptVirtualColumnsNull VirtualColumnVisibility = `null` @@ -309,40 +309,40 @@ var jsonOption = OptionPermittedValues{Type: OptionTypeJSON} // ChangefeedOptionExpectValues is used to parse changefeed options using // PlanHookState.TypeAsStringOpts(). var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ - OptAvroSchemaPrefix: stringOption, - OptConfluentSchemaRegistry: stringOption, - OptCursor: timestampOption, - OptCustomKeyColumn: stringOption, - OptEndTime: timestampOption, - OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare"), - OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"), - OptFullTableName: flagOption, - OptKeyInValue: flagOption, - OptTopicInValue: flagOption, - OptResolvedTimestamps: durationOption.thatCanBeZero().orEmptyMeans("0"), - OptMinCheckpointFrequency: durationOption.thatCanBeZero(), - OptUpdatedTimestamps: flagOption, - OptMVCCTimestamps: flagOption, - OptDiff: flagOption, - OptCompression: enum("gzip", "zstd"), - OptSchemaChangeEvents: enum("column_changes", "default"), - OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"), - OptSplitColumnFamilies: flagOption, - OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"), - OptNoInitialScan: flagOption, - OptInitialScanOnly: flagOption, - OptProtectDataFromGCOnPause: flagOption, - OptExpirePTSAfter: durationOption.thatCanBeZero(), - OptKafkaSinkConfig: jsonOption, - OptPubsubSinkConfig: jsonOption, - OptWebhookSinkConfig: jsonOption, - OptWebhookAuthHeader: stringOption, - OptWebhookClientTimeout: durationOption, - OptOnError: enum("pause", "fail"), - OptMetricsScope: stringOption, - OptUnordered: flagOption, - OptVirtualColumns: enum("omitted", "null"), - OptExecutionLocality: stringOption, + OptAvroSchemaPrefix: stringOption, + OptConfluentSchemaRegistry: stringOption, + OptCursor: timestampOption, + OptCustomKeyColumn: stringOption, + OptEndTime: timestampOption, + OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare"), + OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"), + OptFullTableName: flagOption, + OptKeyInValue: flagOption, + OptTopicInValue: flagOption, + OptResolvedTimestamps: durationOption.thatCanBeZero().orEmptyMeans("0"), + OptMinCheckpointFrequency: durationOption.thatCanBeZero(), + OptUpdatedTimestamps: flagOption, + OptMVCCTimestamps: flagOption, + OptDiff: flagOption, + OptCompression: enum("gzip", "zstd"), + OptSchemaChangeEvents: enum("column_changes", "default"), + OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"), + OptSplitColumnFamilies: flagOption, + OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"), + OptNoInitialScan: flagOption, + OptInitialScanOnly: flagOption, + DeprecatedOptProtectDataFromGCOnPause: flagOption, + OptExpirePTSAfter: durationOption.thatCanBeZero(), + OptKafkaSinkConfig: jsonOption, + OptPubsubSinkConfig: jsonOption, + OptWebhookSinkConfig: jsonOption, + OptWebhookAuthHeader: stringOption, + OptWebhookClientTimeout: durationOption, + OptOnError: enum("pause", "fail"), + OptMetricsScope: stringOption, + OptUnordered: flagOption, + OptVirtualColumns: enum("omitted", "null"), + OptExecutionLocality: stringOption, } // CommonOptions is options common to all sinks @@ -352,7 +352,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptResolvedTimestamps, OptUpdatedTimestamps, OptMVCCTimestamps, OptDiff, OptSplitColumnFamilies, OptSchemaChangeEvents, OptSchemaChangePolicy, - OptProtectDataFromGCOnPause, OptOnError, + OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptCustomKeyColumn, OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter, OptExecutionLocality, @@ -385,6 +385,9 @@ var ExternalConnectionValidOptions = unionStringSets(SQLValidOptions, KafkaValid var CaseInsensitiveOpts = makeStringSet(OptFormat, OptEnvelope, OptCompression, OptSchemaChangeEvents, OptSchemaChangePolicy, OptOnError, OptInitialScan) +// RetiredOptions are the options which are no longer active. +var RetiredOptions = makeStringSet(DeprecatedOptProtectDataFromGCOnPause) + // redactionFunc is a function applied to a string option which returns its redacted value. type redactionFunc func(string) (string, error) @@ -529,12 +532,17 @@ func (s StatementOptions) IsSet(key string) bool { } // DeprecationWarnings checks for options in forms we still support and serialize, -// but should be replaced with a new form. Currently hardcoded to just check format. +// but should be replaced with a new form. func (s StatementOptions) DeprecationWarnings() []string { if newFormat, ok := NoLongerExperimental[s.m[OptFormat]]; ok { return []string{fmt.Sprintf(`%[1]s is no longer experimental, use %[2]s=%[1]s`, newFormat, OptFormat)} } + for retiredOpt := range RetiredOptions { + if _, isSet := s.m[retiredOpt]; isSet { + return []string{fmt.Sprintf("%s option is no longer needed", retiredOpt)} + } + } return []string{} } diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 1f17af7e5ac7..35879a6c0df5 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -211,6 +211,15 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting( settings.PositiveDuration, ) +// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates +var MaxProtectedTimestampAge = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.protect_timestamp.max_age", + "fail the changefeed is the protected timestamp age exceeds this threshold; 0 disables expiration", + 4*24*time.Hour, + settings.NonNegativeDuration, +).WithPublic() + // BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5c15ceca655d..1d19ee8a5e30 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -309,6 +309,9 @@ func validateSinkOptions(opts map[string]string, sinkSpecificOpts map[string]str if _, ok := changefeedbase.CommonOptions[opt]; ok { continue } + if _, retired := changefeedbase.RetiredOptions[opt]; retired { + continue + } if sinkSpecificOpts != nil { if _, ok := sinkSpecificOpts[opt]; ok { continue diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2c83d846bafc..4b29f4cd967e 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -42,7 +42,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" - "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -351,13 +350,6 @@ func (r *reportErrorResumer) OnFailOrCancel( return r.wrapped.OnFailOrCancel(ctx, execCtx, jobErr) } -// OnPauseRequest implements PauseRequester interface. -func (r *reportErrorResumer) OnPauseRequest( - ctx context.Context, execCtx interface{}, txn isql.Txn, details *jobspb.Progress, -) error { - return r.wrapped.(*changefeedResumer).OnPauseRequest(ctx, execCtx, txn, details) -} - type wrapSinkFn func(sink Sink) Sink // jobFeed indicates that the feed is an "enterprise feed" -- that is, diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go index b69b90eb2199..b4cc61606db4 100644 --- a/pkg/jobs/metricspoller/job_statistics.go +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -170,12 +170,8 @@ func processJobPTSRecord( } // If MaximumPTSAge is set on the job payload, verify if PTS record - // timestamp is fresh enough. Note: we only look at paused jobs. - // If the running job wants to enforce an invariant wrt to PTS age, - // it can do so itself. This check here is a safety mechanism to detect - // paused jobs that own protected timestamp records. - if j.Status() == jobs.StatusPaused && - p.MaximumPTSAge > 0 && + // timestamp is fresh enough. + if p.MaximumPTSAge > 0 && rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { stats.expired++ ptsExpired := errors.Newf(