Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
103528: server: Revert server.shutdown.jobs_wait to 0 r=miretskiy a=miretskiy

Revert default setting for `server.shutdown.jobs_wait` to 0
to ensure that shutdown dows not wait for active jobs.

Issues: none
Epic: None

Release note: None

103539: changefeedccl: Improve protected timestamp handling r=miretskiy a=miretskiy

Changefeeds utilize protected timestamp (PTS) in order to ensure that the data is not garbage collected (GC) prematurely.

This subsystem underwent through many rounds of changes, resulting in an unintuitive, and potentially dangerous behavior.

This PR updates and improves PTS handling as follows. PR #97148 introduce capability to cancel jobs that hold on to stale PTS records.  This PR expands this functionality to apply to all jobs -- not just paused jobs.

This is necessary because due to #90810, changefeeds will retry almost every error -- and that means that if a running changefeed jobs fails to make progress for very long time, it is possible that a PTS record will protect GC collection for many days, weeks, or even months.

To guard against this case, introduce a new cluster setting `changefeed.protect_timestamp.max_age`, which defaults to generous 4 days, to make sure that even if the explicit changefeed option `gc_protect_expires_after` was not specified, the changefeed will fail after `changefeed.protect_timestamp.max_age` if no progress is made.

The fail safe can be disabled by setting
`changefeed.protect_timestamp.max_age` to 0; Note, however, that doing so could result in stability issues once stale PTS record released.

In addition, this PR deprecates `protect_data_from_gc_on_pause` option. This option is not needed since we now employ "active protected timestamp" management (meaning: there is always a PTS record when running changefeed jobs), and the handling of this record is consistent for both running and paused jobs.

Fixes #103464

Release note (enterprise change): Introduce a new
`changefeed.protect_timestamp.max_age` setting (default 4 days), which will cancel running changefeed jobs if they fail to make forward progress for much time.  This setting is used if the explicit `gc_protect_expires_after` option was not set.  In addition, deprecate `protect_data_from_gc_on_pause` option.  This option is no longer needed since changefeed jobs always protect data.

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed May 19, 2023
3 parents 3c42bf6 + aaf1ca4 + c637746 commit 0bd4172
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 217 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled tenant-rw
changefeed.fast_gzip.enabled boolean true use fast gzip implementation tenant-rw
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds tenant-rw
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration tenant-rw
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables tenant-rw
changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. tenant-rw
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload tenant-rw
Expand Down Expand Up @@ -74,7 +75,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with
server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw
server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.jobs_wait duration 0s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
Expand Down
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-sink-io-workers" class="anchored"><code>changefeed.sink_io_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers used by changefeeds when sending requests to the sink (currently webhook only): &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down Expand Up @@ -104,7 +105,7 @@
<tr><td><div id="setting-server-secondary-tenants-redact-trace-enabled" class="anchored"><code>server.secondary_tenants.redact_trace.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-connection-wait" class="anchored"><code>server.shutdown.connection_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-drain-wait" class="anchored"><code>server.shutdown.drain_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-lease-transfer-wait" class="anchored"><code>server.shutdown.lease_transfer_wait</code></div></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-query-wait" class="anchored"><code>server.shutdown.query_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-time-until-store-dead" class="anchored"><code>server.time_until_store_dead</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down Expand Up @@ -265,7 +264,6 @@ go_test(
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down Expand Up @@ -305,7 +303,6 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/ledger",
"//pkg/workload/workloadsql",
Expand Down
72 changes: 19 additions & 53 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,17 +714,28 @@ func createChangefeedJobRecord(
return nil, err
}

useDefaultExpiration := ptsExpiration == 0
if useDefaultExpiration {
ptsExpiration = changefeedbase.MaxProtectedTimestampAge.Get(&p.ExecCfg().Settings.SV)
}

if ptsExpiration > 0 && ptsExpiration < time.Hour {
// This threshold is rather arbitrary. But we want to warn users about
// the potential impact of keeping this setting too low.
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. Having a low
value for this option should not have adverse effect as long as changefeed
is running. However, should the changefeed be paused, it will need to be
resumed before expiration time. The value of this setting should reflect
how much time the changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.
`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration))
const explainer = `Having a low protected timestamp expiration value should not have adverse effect
as long as changefeed is running. However, should the changefeed be paused, it
will need to be resumed before expiration time. The value of this setting should
reflect how much time he changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.`
if useDefaultExpiration {
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed.protect_timestamp.max_age setting might be too low. %s`,
ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer))
} else {
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. %s`,
ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer))
}
}

jr := &jobs.Record{
Expand Down Expand Up @@ -1164,10 +1175,6 @@ func (b *changefeedResumer) handleChangefeedError(
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.NoTxn().PauseRequestedWithFunc(ctx, func(ctx context.Context,
planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
return err
}
// directly update running status to avoid the running/reverted job status check
progress.RunningStatus = errorMessage
log.Warningf(ctx, errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
Expand Down Expand Up @@ -1409,47 +1416,6 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp(
}
}

var _ jobs.PauseRequester = (*changefeedResumer)(nil)

// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being
// paused, we may want to clear the protected timestamp record.
func (b *changefeedResumer) OnPauseRequest(
ctx context.Context, jobExec interface{}, txn isql.Txn, progress *jobspb.Progress,
) error {
details := b.job.Details().(jobspb.ChangefeedDetails)

cp := progress.GetChangefeed()
execCfg := jobExec.(sql.JobExecContext).ExecCfg()

if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
// Release existing pts record to avoid a single changefeed left on pause
// resulting in storage issues
if cp.ProtectedTimestampRecord != uuid.Nil {
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
if err := pts.Release(ctx, cp.ProtectedTimestampRecord); err != nil {
log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err)
} else {
cp.ProtectedTimestampRecord = uuid.Nil
}
}
return nil
}

if cp.ProtectedTimestampRecord == uuid.Nil {
resolved := progress.GetHighWater()
if resolved == nil {
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
ptr := createProtectedTimestampRecord(
ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp,
)
return pts.Protect(ctx, ptr)
}

return nil
}

// getQualifiedTableName returns the database-qualified name of the table
// or view represented by the provided descriptor.
func getQualifiedTableName(
Expand Down
80 changes: 0 additions & 80 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -95,7 +94,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/lib/pq"
Expand Down Expand Up @@ -5626,84 +5624,6 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}))
}

func TestChangefeedProtectedTimestampOnPause(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(shouldPause bool) cdcTestFn {
return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`)

var tableID int
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+
`WHERE name = 'foo' AND database_name = current_database()`).
Scan(&tableID)
stmt := `CREATE CHANGEFEED FOR foo WITH resolved`
if shouldPause {
stmt += ", " + changefeedbase.OptProtectDataFromGCOnPause
}
foo := feed(t, f, stmt)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
`foo: [2]->{"after": {"a": 2, "b": "b"}}`,
`foo: [4]->{"after": {"a": 4, "b": "c"}}`,
`foo: [7]->{"after": {"a": 7, "b": "d"}}`,
`foo: [8]->{"after": {"a": 8, "b": "e"}}`,
})
expectResolvedTimestamp(t, foo)

// Pause the job then ensure that it has a reasonable protected timestamp.

ctx := context.Background()
serverCfg := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig
jr := serverCfg.JobRegistry
pts := ptstorage.WithDatabase(
serverCfg.ProtectedTimestampProvider, serverCfg.DB,
)

feedJob := foo.(cdctest.EnterpriseTestFeed)
require.NoError(t, feedJob.Pause())
{
j, err := jr.LoadJob(ctx, feedJob.JobID())
require.NoError(t, err)
progress := j.Progress()
details := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if shouldPause {
require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord)
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
require.NoError(t, err)
require.True(t, r.Timestamp.LessEq(*progress.GetHighWater()))
} else {
require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord)
}
}

// Resume the job and ensure that the protected timestamp is removed once
// the changefeed has caught up.
require.NoError(t, feedJob.Resume())
testutils.SucceedsSoon(t, func() error {
resolvedTs, _ := expectResolvedTimestamp(t, foo)
j, err := jr.LoadJob(ctx, feedJob.JobID())
require.NoError(t, err)
details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
if err != nil || r.Timestamp.Less(resolvedTs) {
return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs)
}
return nil
})
}
}

testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) {
cdcTest(t, testFn(shouldPause), feedTestEnterpriseSinks)
})

}

func TestManyChangefeedsOneTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 0bd4172

Please sign in to comment.