Skip to content

Commit

Permalink
Merge #71436
Browse files Browse the repository at this point in the history
71436: changefeedccl: introduce 'min_checkpoint_frequency' to adjust sink flush frequency r=gh-casper a=gh-casper

Previously, for latency-sensitive sinks like cloud storage, users want to achieve better throughput by adjusting flush frequencies and use 'resolved=X' for this purpose. However, 'resolved' has a different meaning, controlling what time we should at least wait to emit a resolved timestamp event.

This patch disentangles 'resolved' option from this purpose.

Fixes: #69161

Release note (sql change): An option “min_checkpoint_frequency” has been added to CREATE CHANGEFEED statement. When this option is set, changefeed will wait for at least the specified duration before a flush to the external sink. If set empty, changefeed will flush as long as it has local frontier advancement. If not set, it will default to 30 seconds. This option helps high latency sinks to control flush frequency to achieve better throughput.

Co-authored-by: Casper <[email protected]>
  • Loading branch information
craig[bot] and gh-casper committed Nov 15, 2021
2 parents eeec6ff + 0f8f384 commit c968f1f
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 37 deletions.
25 changes: 12 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,30 +163,29 @@ func newChangeAggregatorProcessor(
return nil, err
}

// If the resolved timestamp frequency is specified, use it as a rough
// approximation of how latency-sensitive the changefeed user is. If it's
// not, fall back to a default of 5s
// MinCheckpointFrequency controls how frequently the changeAggregator flushes the sink
// and checkpoints the local frontier to changeFrontier. It is used as a rough
// approximation of how latency-sensitive the changefeed user is. For a high latency
// user, such as cloud storage sink where flushes can take much longer, it is often set
// as the sink's flush frequency so as not to negate the sink's batch config.
//
// With timeBetweenFlushes and changefeedPollInterval both set to 1s, TPCC
// was seeing about 100x more time spent emitting than flushing when tested
// with low-latency sinks like Kafka. However when using cloud-storage
// sinks, flushes can take much longer and trying to flush too often can
// thus end up spending too much time flushing and not enough in emitting to
// keep up with the feed. If a user does not specify a 'resolved' time, we
// instead default to 5s, which is hopefully long enough to account for most
// possible sink latencies we could see without falling behind.
// If a user does not specify a 'min_checkpoint_frequency' duration, we instead default
// to 30s, which is hopefully long enough to account for most possible sink latencies we
// could see without falling too behind.
//
// NB: As long as we periodically get new span-level resolved timestamps
// from the poller (which should always happen, even if the watched data is
// not changing), then this is sufficient and we don't have to do anything
// fancy with timers.
if r, ok := ca.spec.Feed.Opts[changefeedbase.OptResolvedTimestamps]; ok && r != `` {
if r, ok := ca.spec.Feed.Opts[changefeedbase.OptMinCheckpointFrequency]; ok && r != `` {
ca.flushFrequency, err = time.ParseDuration(r)
if err != nil {
return nil, err
}
} else if r == `` {
ca.flushFrequency = 0
} else {
ca.flushFrequency = changefeedbase.DefaultFlushFrequency
ca.flushFrequency = changefeedbase.DefaultMinCheckpointFrequency
}
return ca, nil
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
}
}
}
{
const opt = changefeedbase.OptMinCheckpointFrequency
if o, ok := details.Opts[opt]; ok && o != `` {
if err := validateNonNegativeDuration(opt, o); err != nil {
return jobspb.ChangefeedDetails{}, err
}
}
}
{
const opt = changefeedbase.OptSchemaChangeEvents
switch v := changefeedbase.SchemaChangeEventClass(details.Opts[opt]); v {
Expand Down
43 changes: 33 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestChangefeedTimestamps(t *testing.T) {
// Check that we eventually get a resolved timestamp greater than ts1.
parsed := parseTimeToHLC(t, ts1)
for {
if resolved := expectResolvedTimestamp(t, foo); parsed.Less(resolved) {
if resolved, _ := expectResolvedTimestamp(t, foo); parsed.Less(resolved) {
break
}
}
Expand Down Expand Up @@ -585,33 +585,56 @@ func TestChangefeedResolvedFrequency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var resolvedFreq, checkpointFreq time.Duration
durationString := func(duration time.Duration) string {
if duration == 0 {
return ""
}
return duration.String()
}
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

const freq = 10 * time.Millisecond
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved=$1`, freq.String())
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved=$1, min_checkpoint_frequency=$2`,
durationString(resolvedFreq), durationString(checkpointFreq))
defer closeFeed(t, foo)

// We get each resolved timestamp notification once in each partition.
// Grab the first `2 * #partitions`, sort because we might get all from
// one partition first, and compare the first and last.
resolved := make([]hlc.Timestamp, 2*len(foo.Partitions()))
receivedPerPartition := make(map[string]time.Time)
var partition string
for i := range resolved {
resolved[i] = expectResolvedTimestamp(t, foo)
resolved[i], partition = expectResolvedTimestamp(t, foo)
now := timeutil.Now()
if lastReceived, ok := receivedPerPartition[partition]; ok {
if d := now.Sub(lastReceived); d < checkpointFreq {
t.Errorf(`expected %s between two resolved timestamp emission time within same partition, but got %s`, checkpointFreq, d)
}
}
receivedPerPartition[partition] = now
}
sort.Slice(resolved, func(i, j int) bool { return resolved[i].Less(resolved[j]) })
first, last := resolved[0], resolved[len(resolved)-1]

if d := last.GoTime().Sub(first.GoTime()); d < freq {
t.Errorf(`expected %s between resolved timestamps, but got %s`, freq, d)
if d := last.GoTime().Sub(first.GoTime()); d < resolvedFreq {
t.Errorf(`expected %s between resolved timestamps, but got %s`, resolvedFreq, d)
}
}

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
runTests := func(testResolvedFreq, testCheckpointFreq time.Duration) {
resolvedFreq, checkpointFreq = testResolvedFreq, testCheckpointFreq
t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}

runTests(1*time.Second, 0)
runTests(0, 1*time.Second)
runTests(600*time.Millisecond, 1*time.Second)
}

// Test how Changefeeds react to schema changes that do not require a backfill
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
OptKeyInValue = `key_in_value`
OptTopicInValue = `topic_in_value`
OptResolvedTimestamps = `resolved`
OptMinCheckpointFrequency = `min_checkpoint_frequency`
OptUpdatedTimestamps = `updated`
OptMVCCTimestamps = `mvcc_timestamp`
OptDiff = `diff`
Expand Down Expand Up @@ -152,6 +153,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptKeyInValue: sql.KVStringOptRequireNoValue,
OptTopicInValue: sql.KVStringOptRequireNoValue,
OptResolvedTimestamps: sql.KVStringOptAny,
OptMinCheckpointFrequency: sql.KVStringOptRequireValue,
OptUpdatedTimestamps: sql.KVStringOptRequireNoValue,
OptMVCCTimestamps: sql.KVStringOptRequireNoValue,
OptDiff: sql.KVStringOptRequireNoValue,
Expand Down Expand Up @@ -184,7 +186,8 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope,
OptMVCCTimestamps, OptDiff,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan)
OptInitialScan, OptNoInitialScan,
OptMinCheckpointFrequency)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

// DefaultFlushFrequency is the default frequency to flush sink.
// DefaultMinCheckpointFrequency is the default frequency to flush sink.
// See comment in newChangeAggregatorProcessor for explanation on the value.
var DefaultFlushFrequency = 5 * time.Second
var DefaultMinCheckpointFrequency = 30 * time.Second

// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests.
// TestingSetDefaultMinCheckpointFrequency changes DefaultMinCheckpointFrequency for tests.
// Returns function to restore flush frequency to its original value.
func TestingSetDefaultFlushFrequency(f time.Duration) func() {
old := DefaultFlushFrequency
DefaultFlushFrequency = f
return func() { DefaultFlushFrequency = old }
func TestingSetDefaultMinCheckpointFrequency(f time.Duration) func() {
old := DefaultMinCheckpointFrequency
DefaultMinCheckpointFrequency = f
return func() { DefaultMinCheckpointFrequency = old }
}

// PerChangefeedMemLimit controls how much data can be buffered by
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,17 @@ func parseTimeToHLC(t testing.TB, s string) hlc.Timestamp {
return ts
}

func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) hlc.Timestamp {
// Expect to receive a resolved timestamp and the partition it belongs to from
// a test changefeed.
func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) (hlc.Timestamp, string) {
t.Helper()
m, err := f.Next()
if err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal(`expected message`)
}
return extractResolvedTimestamp(t, m)
return extractResolvedTimestamp(t, m), m.Partition
}

func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Timestamp {
Expand Down Expand Up @@ -343,7 +345,7 @@ func startTestFullServer(
}

ctx := context.Background()
resetFlushFrequency := changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)
resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)
s, db, _ := serverutils.StartServer(t, args)

cleanup := func() {
Expand Down Expand Up @@ -381,7 +383,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

resetFlushFrequency := changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)
resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)
cluster, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs,
multiregionccltestutils.WithUseDatabase("d"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTenantStreaming(t *testing.T) {
sourceSQL := sqlutils.MakeSQLRunner(tenantConn)

// Make changefeeds run faster.
resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Millisecond)
resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond)
defer resetFreq()
// Set required cluster settings.
_, err := sourceDB.Exec(`
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingtest/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func NewReplicationHelper(
s, db, _ := serverutils.StartServer(t, serverArgs)

// Make changefeeds run faster.
resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Millisecond)
resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond)

// Set required cluster settings.
_, err := db.Exec(`
Expand Down

0 comments on commit c968f1f

Please sign in to comment.