Skip to content

Commit

Permalink
changefeedccl: introduce 'min_checkpoint_frequency' to adjust flush f…
Browse files Browse the repository at this point in the history
…requency to sinks

Previously, for latency-sensitive sinks like cloud storage, users want to achieve better throughput by adjusting flush frequencies and use 'resolved=X' for this purpose. However, 'resolved' has a different meaning, controlling what time we should at least wait to emit a resolved timestamp event.

This patch disentangles 'resolved' option from this purpose.

Fixes: #60481
Release note: none.
  • Loading branch information
gh-casper committed Nov 6, 2021
1 parent 9dde19c commit 0f8f384
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 37 deletions.
25 changes: 12 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,30 +163,29 @@ func newChangeAggregatorProcessor(
return nil, err
}

// If the resolved timestamp frequency is specified, use it as a rough
// approximation of how latency-sensitive the changefeed user is. If it's
// not, fall back to a default of 5s
// MinCheckpointFrequency controls how frequently the changeAggregator flushes the sink
// and checkpoints the local frontier to changeFrontier. It is used as a rough
// approximation of how latency-sensitive the changefeed user is. For a high latency
// user, such as cloud storage sink where flushes can take much longer, it is often set
// as the sink's flush frequency so as not to negate the sink's batch config.
//
// With timeBetweenFlushes and changefeedPollInterval both set to 1s, TPCC
// was seeing about 100x more time spent emitting than flushing when tested
// with low-latency sinks like Kafka. However when using cloud-storage
// sinks, flushes can take much longer and trying to flush too often can
// thus end up spending too much time flushing and not enough in emitting to
// keep up with the feed. If a user does not specify a 'resolved' time, we
// instead default to 5s, which is hopefully long enough to account for most
// possible sink latencies we could see without falling behind.
// If a user does not specify a 'min_checkpoint_frequency' duration, we instead default
// to 30s, which is hopefully long enough to account for most possible sink latencies we
// could see without falling too behind.
//
// NB: As long as we periodically get new span-level resolved timestamps
// from the poller (which should always happen, even if the watched data is
// not changing), then this is sufficient and we don't have to do anything
// fancy with timers.
if r, ok := ca.spec.Feed.Opts[changefeedbase.OptResolvedTimestamps]; ok && r != `` {
if r, ok := ca.spec.Feed.Opts[changefeedbase.OptMinCheckpointFrequency]; ok && r != `` {
ca.flushFrequency, err = time.ParseDuration(r)
if err != nil {
return nil, err
}
} else if r == `` {
ca.flushFrequency = 0
} else {
ca.flushFrequency = changefeedbase.DefaultFlushFrequency
ca.flushFrequency = changefeedbase.DefaultMinCheckpointFrequency
}
return ca, nil
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
}
}
}
{
const opt = changefeedbase.OptMinCheckpointFrequency
if o, ok := details.Opts[opt]; ok && o != `` {
if err := validateNonNegativeDuration(opt, o); err != nil {
return jobspb.ChangefeedDetails{}, err
}
}
}
{
const opt = changefeedbase.OptSchemaChangeEvents
switch v := changefeedbase.SchemaChangeEventClass(details.Opts[opt]); v {
Expand Down
43 changes: 33 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestChangefeedTimestamps(t *testing.T) {
// Check that we eventually get a resolved timestamp greater than ts1.
parsed := parseTimeToHLC(t, ts1)
for {
if resolved := expectResolvedTimestamp(t, foo); parsed.Less(resolved) {
if resolved, _ := expectResolvedTimestamp(t, foo); parsed.Less(resolved) {
break
}
}
Expand Down Expand Up @@ -585,33 +585,56 @@ func TestChangefeedResolvedFrequency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var resolvedFreq, checkpointFreq time.Duration
durationString := func(duration time.Duration) string {
if duration == 0 {
return ""
}
return duration.String()
}
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

const freq = 10 * time.Millisecond
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved=$1`, freq.String())
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved=$1, min_checkpoint_frequency=$2`,
durationString(resolvedFreq), durationString(checkpointFreq))
defer closeFeed(t, foo)

// We get each resolved timestamp notification once in each partition.
// Grab the first `2 * #partitions`, sort because we might get all from
// one partition first, and compare the first and last.
resolved := make([]hlc.Timestamp, 2*len(foo.Partitions()))
receivedPerPartition := make(map[string]time.Time)
var partition string
for i := range resolved {
resolved[i] = expectResolvedTimestamp(t, foo)
resolved[i], partition = expectResolvedTimestamp(t, foo)
now := timeutil.Now()
if lastReceived, ok := receivedPerPartition[partition]; ok {
if d := now.Sub(lastReceived); d < checkpointFreq {
t.Errorf(`expected %s between two resolved timestamp emission time within same partition, but got %s`, checkpointFreq, d)
}
}
receivedPerPartition[partition] = now
}
sort.Slice(resolved, func(i, j int) bool { return resolved[i].Less(resolved[j]) })
first, last := resolved[0], resolved[len(resolved)-1]

if d := last.GoTime().Sub(first.GoTime()); d < freq {
t.Errorf(`expected %s between resolved timestamps, but got %s`, freq, d)
if d := last.GoTime().Sub(first.GoTime()); d < resolvedFreq {
t.Errorf(`expected %s between resolved timestamps, but got %s`, resolvedFreq, d)
}
}

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
runTests := func(testResolvedFreq, testCheckpointFreq time.Duration) {
resolvedFreq, checkpointFreq = testResolvedFreq, testCheckpointFreq
t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}

runTests(1*time.Second, 0)
runTests(0, 1*time.Second)
runTests(600*time.Millisecond, 1*time.Second)
}

// Test how Changefeeds react to schema changes that do not require a backfill
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
OptKeyInValue = `key_in_value`
OptTopicInValue = `topic_in_value`
OptResolvedTimestamps = `resolved`
OptMinCheckpointFrequency = `min_checkpoint_frequency`
OptUpdatedTimestamps = `updated`
OptMVCCTimestamps = `mvcc_timestamp`
OptDiff = `diff`
Expand Down Expand Up @@ -152,6 +153,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptKeyInValue: sql.KVStringOptRequireNoValue,
OptTopicInValue: sql.KVStringOptRequireNoValue,
OptResolvedTimestamps: sql.KVStringOptAny,
OptMinCheckpointFrequency: sql.KVStringOptRequireValue,
OptUpdatedTimestamps: sql.KVStringOptRequireNoValue,
OptMVCCTimestamps: sql.KVStringOptRequireNoValue,
OptDiff: sql.KVStringOptRequireNoValue,
Expand Down Expand Up @@ -184,7 +186,8 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope,
OptMVCCTimestamps, OptDiff,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan)
OptInitialScan, OptNoInitialScan,
OptMinCheckpointFrequency)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

// DefaultFlushFrequency is the default frequency to flush sink.
// DefaultMinCheckpointFrequency is the default frequency to flush sink.
// See comment in newChangeAggregatorProcessor for explanation on the value.
var DefaultFlushFrequency = 5 * time.Second
var DefaultMinCheckpointFrequency = 30 * time.Second

// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests.
// TestingSetDefaultMinCheckpointFrequency changes DefaultMinCheckpointFrequency for tests.
// Returns function to restore flush frequency to its original value.
func TestingSetDefaultFlushFrequency(f time.Duration) func() {
old := DefaultFlushFrequency
DefaultFlushFrequency = f
return func() { DefaultFlushFrequency = old }
func TestingSetDefaultMinCheckpointFrequency(f time.Duration) func() {
old := DefaultMinCheckpointFrequency
DefaultMinCheckpointFrequency = f
return func() { DefaultMinCheckpointFrequency = old }
}

// PerChangefeedMemLimit controls how much data can be buffered by
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,17 @@ func parseTimeToHLC(t testing.TB, s string) hlc.Timestamp {
return ts
}

func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) hlc.Timestamp {
// Expect to receive a resolved timestamp and the partition it belongs to from
// a test changefeed.
func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) (hlc.Timestamp, string) {
t.Helper()
m, err := f.Next()
if err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal(`expected message`)
}
return extractResolvedTimestamp(t, m)
return extractResolvedTimestamp(t, m), m.Partition
}

func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Timestamp {
Expand Down Expand Up @@ -343,7 +345,7 @@ func startTestFullServer(
}

ctx := context.Background()
resetFlushFrequency := changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)
resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)
s, db, _ := serverutils.StartServer(t, args)

cleanup := func() {
Expand Down Expand Up @@ -381,7 +383,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

resetFlushFrequency := changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)
resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)
cluster, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs,
multiregionccltestutils.WithUseDatabase("d"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTenantStreaming(t *testing.T) {
sourceSQL := sqlutils.MakeSQLRunner(tenantConn)

// Make changefeeds run faster.
resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Millisecond)
resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond)
defer resetFreq()
// Set required cluster settings.
_, err := sourceDB.Exec(`
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingtest/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func NewReplicationHelper(t *testing.T) (*ReplicationHelper, func()) {
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})

// Make changefeeds run faster.
resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Millisecond)
resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond)

// Set required cluster settings.
_, err := db.Exec(`
Expand Down

0 comments on commit 0f8f384

Please sign in to comment.