Skip to content

Commit

Permalink
changefeedccl: Improve protected timestamp handling
Browse files Browse the repository at this point in the history
Changefeeds utilize protected timestamp (PTS) in order to ensure
that the data is not garbage collected (GC) prematurely.

This subsystem underwent through many rounds of changes, resulting
in an unintuitive, and potentially dangerous behavior.

This PR updates and improves PTS handling as follows.
PR #97148 introduce capability to cancel jobs that hold on to stale
PTS records.  This PR expands this functionality to apply to all
jobs -- not just paused jobs.

This is necessary because due to #90810, changefeeds will retry almost
every error -- and that means that if a running changefeed jobs fails
to make progress for very long time, it is possible that a PTS record
will protect GC collection for many days, weeks, or even months.

To guard against this case, introduce a new cluster setting
`changefeed.protect_timestamp.max_age`, which defaults to generous 4
days, to make sure that even if the explicit changefeed option
`gc_protect_expires_after` was not specified, the changefeed will fail
after `changefeed.protect_timestamp.max_age` if no progress is made.
This setting only applies to newly created changefeeds.
Use `ALTER CHANGEFEED` statement to set `gc_protect_expires_after` option
for existing changefeeds to enable PTS expiration.

The fail safe can be disabled by setting
`changefeed.protect_timestamp.max_age` to 0; Note, however, that doing
so could result in stability issues once stale PTS record released.

In addition, this PR deprecates `protect_data_from_gc_on_pause` option.
This option is not needed since we now employ "active protected
timestamp" management (meaning: there is always a PTS record when
running changefeed jobs), and the handling of this record is consistent
for both running and paused jobs.

Fixes #103464

Release note (enterprise change): Introduce a new
`changefeed.protect_timestamp.max_age` setting (default 4 days), which
will cancel running changefeed jobs if they fail to make forward
progress for much time.  This setting is used if the explicit
`gc_protect_expires_after` option was not set.  In addition, deprecate
`protect_data_from_gc_on_pause` option.  This option is no longer needed
since changefeed jobs always protect data.
  • Loading branch information
Yevgeniy Miretskiy committed May 19, 2023
1 parent b02f647 commit c637746
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 214 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled tenant-rw
changefeed.fast_gzip.enabled boolean true use fast gzip implementation tenant-rw
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds tenant-rw
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration tenant-rw
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables tenant-rw
changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. tenant-rw
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-sink-io-workers" class="anchored"><code>changefeed.sink_io_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers used by changefeeds when sending requests to the sink (currently webhook only): &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down Expand Up @@ -269,7 +268,6 @@ go_test(
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down Expand Up @@ -309,7 +307,6 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/ledger",
"//pkg/workload/workloadsql",
Expand Down
72 changes: 19 additions & 53 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,17 +714,28 @@ func createChangefeedJobRecord(
return nil, err
}

useDefaultExpiration := ptsExpiration == 0
if useDefaultExpiration {
ptsExpiration = changefeedbase.MaxProtectedTimestampAge.Get(&p.ExecCfg().Settings.SV)
}

if ptsExpiration > 0 && ptsExpiration < time.Hour {
// This threshold is rather arbitrary. But we want to warn users about
// the potential impact of keeping this setting too low.
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. Having a low
value for this option should not have adverse effect as long as changefeed
is running. However, should the changefeed be paused, it will need to be
resumed before expiration time. The value of this setting should reflect
how much time the changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.
`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration))
const explainer = `Having a low protected timestamp expiration value should not have adverse effect
as long as changefeed is running. However, should the changefeed be paused, it
will need to be resumed before expiration time. The value of this setting should
reflect how much time he changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.`
if useDefaultExpiration {
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed.protect_timestamp.max_age setting might be too low. %s`,
ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer))
} else {
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. %s`,
ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer))
}
}

jr := &jobs.Record{
Expand Down Expand Up @@ -1164,10 +1175,6 @@ func (b *changefeedResumer) handleChangefeedError(
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.NoTxn().PauseRequestedWithFunc(ctx, func(ctx context.Context,
planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
return err
}
// directly update running status to avoid the running/reverted job status check
progress.RunningStatus = errorMessage
log.Warningf(ctx, errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
Expand Down Expand Up @@ -1409,47 +1416,6 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp(
}
}

var _ jobs.PauseRequester = (*changefeedResumer)(nil)

// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being
// paused, we may want to clear the protected timestamp record.
func (b *changefeedResumer) OnPauseRequest(
ctx context.Context, jobExec interface{}, txn isql.Txn, progress *jobspb.Progress,
) error {
details := b.job.Details().(jobspb.ChangefeedDetails)

cp := progress.GetChangefeed()
execCfg := jobExec.(sql.JobExecContext).ExecCfg()

if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
// Release existing pts record to avoid a single changefeed left on pause
// resulting in storage issues
if cp.ProtectedTimestampRecord != uuid.Nil {
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
if err := pts.Release(ctx, cp.ProtectedTimestampRecord); err != nil {
log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err)
} else {
cp.ProtectedTimestampRecord = uuid.Nil
}
}
return nil
}

if cp.ProtectedTimestampRecord == uuid.Nil {
resolved := progress.GetHighWater()
if resolved == nil {
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
ptr := createProtectedTimestampRecord(
ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp,
)
return pts.Protect(ctx, ptr)
}

return nil
}

// getQualifiedTableName returns the database-qualified name of the table
// or view represented by the provided descriptor.
func getQualifiedTableName(
Expand Down
80 changes: 0 additions & 80 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -95,7 +94,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/lib/pq"
Expand Down Expand Up @@ -5626,84 +5624,6 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}))
}

func TestChangefeedProtectedTimestampOnPause(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(shouldPause bool) cdcTestFn {
return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`)

var tableID int
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+
`WHERE name = 'foo' AND database_name = current_database()`).
Scan(&tableID)
stmt := `CREATE CHANGEFEED FOR foo WITH resolved`
if shouldPause {
stmt += ", " + changefeedbase.OptProtectDataFromGCOnPause
}
foo := feed(t, f, stmt)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
`foo: [2]->{"after": {"a": 2, "b": "b"}}`,
`foo: [4]->{"after": {"a": 4, "b": "c"}}`,
`foo: [7]->{"after": {"a": 7, "b": "d"}}`,
`foo: [8]->{"after": {"a": 8, "b": "e"}}`,
})
expectResolvedTimestamp(t, foo)

// Pause the job then ensure that it has a reasonable protected timestamp.

ctx := context.Background()
serverCfg := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig
jr := serverCfg.JobRegistry
pts := ptstorage.WithDatabase(
serverCfg.ProtectedTimestampProvider, serverCfg.DB,
)

feedJob := foo.(cdctest.EnterpriseTestFeed)
require.NoError(t, feedJob.Pause())
{
j, err := jr.LoadJob(ctx, feedJob.JobID())
require.NoError(t, err)
progress := j.Progress()
details := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if shouldPause {
require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord)
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
require.NoError(t, err)
require.True(t, r.Timestamp.LessEq(*progress.GetHighWater()))
} else {
require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord)
}
}

// Resume the job and ensure that the protected timestamp is removed once
// the changefeed has caught up.
require.NoError(t, feedJob.Resume())
testutils.SucceedsSoon(t, func() error {
resolvedTs, _ := expectResolvedTimestamp(t, foo)
j, err := jr.LoadJob(ctx, feedJob.JobID())
require.NoError(t, err)
details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
if err != nil || r.Timestamp.Less(resolvedTs) {
return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs)
}
return nil
})
}
}

testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) {
cdcTest(t, testFn(shouldPause), feedTestEnterpriseSinks)
})

}

func TestManyChangefeedsOneTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit c637746

Please sign in to comment.