From 0f8f384933f3852608e3d0c495df397057329c43 Mon Sep 17 00:00:00 2001 From: Casper Date: Mon, 11 Oct 2021 20:29:32 -0400 Subject: [PATCH] changefeedccl: introduce 'min_checkpoint_frequency' to adjust flush frequency to sinks Previously, for latency-sensitive sinks like cloud storage, users want to achieve better throughput by adjusting flush frequencies and use 'resolved=X' for this purpose. However, 'resolved' has a different meaning, controlling what time we should at least wait to emit a resolved timestamp event. This patch disentangles 'resolved' option from this purpose. Fixes: cockroachdb#60481 Release note: none. --- .../changefeedccl/changefeed_processors.go | 25 ++++++----- pkg/ccl/changefeedccl/changefeed_stmt.go | 8 ++++ pkg/ccl/changefeedccl/changefeed_test.go | 43 ++++++++++++++----- .../changefeedccl/changefeedbase/options.go | 5 ++- .../changefeedccl/changefeedbase/settings.go | 14 +++--- pkg/ccl/changefeedccl/helpers_test.go | 10 +++-- .../streamingest/stream_ingestion_job_test.go | 2 +- .../streamingtest/replication_helpers.go | 2 +- 8 files changed, 72 insertions(+), 37 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index ddb6c2061a2a..84d852707c30 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -163,30 +163,29 @@ func newChangeAggregatorProcessor( return nil, err } - // If the resolved timestamp frequency is specified, use it as a rough - // approximation of how latency-sensitive the changefeed user is. If it's - // not, fall back to a default of 5s + // MinCheckpointFrequency controls how frequently the changeAggregator flushes the sink + // and checkpoints the local frontier to changeFrontier. It is used as a rough + // approximation of how latency-sensitive the changefeed user is. For a high latency + // user, such as cloud storage sink where flushes can take much longer, it is often set + // as the sink's flush frequency so as not to negate the sink's batch config. // - // With timeBetweenFlushes and changefeedPollInterval both set to 1s, TPCC - // was seeing about 100x more time spent emitting than flushing when tested - // with low-latency sinks like Kafka. However when using cloud-storage - // sinks, flushes can take much longer and trying to flush too often can - // thus end up spending too much time flushing and not enough in emitting to - // keep up with the feed. If a user does not specify a 'resolved' time, we - // instead default to 5s, which is hopefully long enough to account for most - // possible sink latencies we could see without falling behind. + // If a user does not specify a 'min_checkpoint_frequency' duration, we instead default + // to 30s, which is hopefully long enough to account for most possible sink latencies we + // could see without falling too behind. // // NB: As long as we periodically get new span-level resolved timestamps // from the poller (which should always happen, even if the watched data is // not changing), then this is sufficient and we don't have to do anything // fancy with timers. - if r, ok := ca.spec.Feed.Opts[changefeedbase.OptResolvedTimestamps]; ok && r != `` { + if r, ok := ca.spec.Feed.Opts[changefeedbase.OptMinCheckpointFrequency]; ok && r != `` { ca.flushFrequency, err = time.ParseDuration(r) if err != nil { return nil, err } + } else if r == `` { + ca.flushFrequency = 0 } else { - ca.flushFrequency = changefeedbase.DefaultFlushFrequency + ca.flushFrequency = changefeedbase.DefaultMinCheckpointFrequency } return ca, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 624b96a9094e..d9fc3642166c 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -514,6 +514,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails } } } + { + const opt = changefeedbase.OptMinCheckpointFrequency + if o, ok := details.Opts[opt]; ok && o != `` { + if err := validateNonNegativeDuration(opt, o); err != nil { + return jobspb.ChangefeedDetails{}, err + } + } + } { const opt = changefeedbase.OptSchemaChangeEvents switch v := changefeedbase.SchemaChangeEventClass(details.Opts[opt]); v { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 2c436edc014c..b32ee92a26ae 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -538,7 +538,7 @@ func TestChangefeedTimestamps(t *testing.T) { // Check that we eventually get a resolved timestamp greater than ts1. parsed := parseTimeToHLC(t, ts1) for { - if resolved := expectResolvedTimestamp(t, foo); parsed.Less(resolved) { + if resolved, _ := expectResolvedTimestamp(t, foo); parsed.Less(resolved) { break } } @@ -585,33 +585,56 @@ func TestChangefeedResolvedFrequency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + var resolvedFreq, checkpointFreq time.Duration + durationString := func(duration time.Duration) string { + if duration == 0 { + return "" + } + return duration.String() + } testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) - const freq = 10 * time.Millisecond - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved=$1`, freq.String()) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved=$1, min_checkpoint_frequency=$2`, + durationString(resolvedFreq), durationString(checkpointFreq)) defer closeFeed(t, foo) // We get each resolved timestamp notification once in each partition. // Grab the first `2 * #partitions`, sort because we might get all from // one partition first, and compare the first and last. resolved := make([]hlc.Timestamp, 2*len(foo.Partitions())) + receivedPerPartition := make(map[string]time.Time) + var partition string for i := range resolved { - resolved[i] = expectResolvedTimestamp(t, foo) + resolved[i], partition = expectResolvedTimestamp(t, foo) + now := timeutil.Now() + if lastReceived, ok := receivedPerPartition[partition]; ok { + if d := now.Sub(lastReceived); d < checkpointFreq { + t.Errorf(`expected %s between two resolved timestamp emission time within same partition, but got %s`, checkpointFreq, d) + } + } + receivedPerPartition[partition] = now } sort.Slice(resolved, func(i, j int) bool { return resolved[i].Less(resolved[j]) }) first, last := resolved[0], resolved[len(resolved)-1] - if d := last.GoTime().Sub(first.GoTime()); d < freq { - t.Errorf(`expected %s between resolved timestamps, but got %s`, freq, d) + if d := last.GoTime().Sub(first.GoTime()); d < resolvedFreq { + t.Errorf(`expected %s between resolved timestamps, but got %s`, resolvedFreq, d) } } - t.Run(`sinkless`, sinklessTest(testFn)) - t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`kafka`, kafkaTest(testFn)) - t.Run(`webhook`, webhookTest(testFn)) + runTests := func(testResolvedFreq, testCheckpointFreq time.Duration) { + resolvedFreq, checkpointFreq = testResolvedFreq, testCheckpointFreq + t.Run(`sinkless`, sinklessTest(testFn)) + t.Run(`enterprise`, enterpriseTest(testFn)) + t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) + } + + runTests(1*time.Second, 0) + runTests(0, 1*time.Second) + runTests(600*time.Millisecond, 1*time.Second) } // Test how Changefeeds react to schema changes that do not require a backfill diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 1f6b474229fe..ade240119271 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -38,6 +38,7 @@ const ( OptKeyInValue = `key_in_value` OptTopicInValue = `topic_in_value` OptResolvedTimestamps = `resolved` + OptMinCheckpointFrequency = `min_checkpoint_frequency` OptUpdatedTimestamps = `updated` OptMVCCTimestamps = `mvcc_timestamp` OptDiff = `diff` @@ -152,6 +153,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptKeyInValue: sql.KVStringOptRequireNoValue, OptTopicInValue: sql.KVStringOptRequireNoValue, OptResolvedTimestamps: sql.KVStringOptAny, + OptMinCheckpointFrequency: sql.KVStringOptRequireValue, OptUpdatedTimestamps: sql.KVStringOptRequireNoValue, OptMVCCTimestamps: sql.KVStringOptRequireNoValue, OptDiff: sql.KVStringOptRequireNoValue, @@ -184,7 +186,8 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope, OptMVCCTimestamps, OptDiff, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, - OptInitialScan, OptNoInitialScan) + OptInitialScan, OptNoInitialScan, + OptMinCheckpointFrequency) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index b66e3e67704c..e0aa0b08d0f4 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -28,16 +28,16 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) -// DefaultFlushFrequency is the default frequency to flush sink. +// DefaultMinCheckpointFrequency is the default frequency to flush sink. // See comment in newChangeAggregatorProcessor for explanation on the value. -var DefaultFlushFrequency = 5 * time.Second +var DefaultMinCheckpointFrequency = 30 * time.Second -// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. +// TestingSetDefaultMinCheckpointFrequency changes DefaultMinCheckpointFrequency for tests. // Returns function to restore flush frequency to its original value. -func TestingSetDefaultFlushFrequency(f time.Duration) func() { - old := DefaultFlushFrequency - DefaultFlushFrequency = f - return func() { DefaultFlushFrequency = old } +func TestingSetDefaultMinCheckpointFrequency(f time.Duration) func() { + old := DefaultMinCheckpointFrequency + DefaultMinCheckpointFrequency = f + return func() { DefaultMinCheckpointFrequency = old } } // PerChangefeedMemLimit controls how much data can be buffered by diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 2b0f3f89d08c..59e783a9868b 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -250,7 +250,9 @@ func parseTimeToHLC(t testing.TB, s string) hlc.Timestamp { return ts } -func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) hlc.Timestamp { +// Expect to receive a resolved timestamp and the partition it belongs to from +// a test changefeed. +func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) (hlc.Timestamp, string) { t.Helper() m, err := f.Next() if err != nil { @@ -258,7 +260,7 @@ func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) hlc.Timestamp { } else if m == nil { t.Fatal(`expected message`) } - return extractResolvedTimestamp(t, m) + return extractResolvedTimestamp(t, m), m.Partition } func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Timestamp { @@ -343,7 +345,7 @@ func startTestFullServer( } ctx := context.Background() - resetFlushFrequency := changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency) + resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) s, db, _ := serverutils.StartServer(t, args) cleanup := func() { @@ -381,7 +383,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } - resetFlushFrequency := changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency) + resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) cluster, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( t, 3 /* numServers */, knobs, multiregionccltestutils.WithUseDatabase("d"), diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 7ef5ce220064..848e3fa336d0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -59,7 +59,7 @@ func TestTenantStreaming(t *testing.T) { sourceSQL := sqlutils.MakeSQLRunner(tenantConn) // Make changefeeds run faster. - resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Millisecond) + resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond) defer resetFreq() // Set required cluster settings. _, err := sourceDB.Exec(` diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go index dc0f160c95cb..ccfff5f53485 100644 --- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go +++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go @@ -188,7 +188,7 @@ func NewReplicationHelper(t *testing.T) (*ReplicationHelper, func()) { s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) // Make changefeeds run faster. - resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Millisecond) + resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond) // Set required cluster settings. _, err := db.Exec(`