From 5259c1d7430040c7709db32b81e7d7c034e3e812 Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Fri, 18 Nov 2022 09:12:25 -0500 Subject: [PATCH] changefeedccl: suppress the synthetic marker in changefeed output Informs #91607 Until recently a synthetic marker (trailing ?) in a changefeed highwater would cause a protected timestamp error, so it's likely there are no functioning integrations that use the ? and many that could break if they see it in a resolved timestamp or mvcc/updated timestamp. So let's just not output it. Release note: None --- .../changefeedccl/alter_changefeed_test.go | 2 +- pkg/ccl/changefeedccl/avro.go | 4 +-- pkg/ccl/changefeedccl/changefeed_test.go | 2 +- pkg/ccl/changefeedccl/encoder.go | 7 ++++ pkg/ccl/changefeedccl/encoder_json.go | 8 ++--- pkg/ccl/changefeedccl/helpers_test.go | 36 +++++++++++++++---- .../changefeedccl/kvfeed/physical_kv_feed.go | 7 ++++ pkg/ccl/changefeedccl/kvfeed/testing_knobs.go | 4 +++ .../scheduled_changefeed_test.go | 10 ++++++ 9 files changed, 66 insertions(+), 14 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 367c0966f030..9c1a6668e1e8 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1213,7 +1213,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { } } - cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks) + cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoForcedSyntheticTimestamps) } func TestAlterChangefeedUpdateFilter(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index 119356c8b529..e96bc5baf39e 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -1008,7 +1008,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow( return nil, changefeedbase.WithTerminalError( errors.Errorf(`unknown metadata timestamp type: %T`, u)) } - native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) + native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts)) } } if r.opts.resolvedField { @@ -1020,7 +1020,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow( return nil, changefeedbase.WithTerminalError( errors.Errorf(`unknown metadata timestamp type: %T`, u)) } - native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) + native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts)) } } for k := range meta { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 566ae519327a..b0244275e75e 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6080,7 +6080,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // TODO(ssd): Tenant testing disabled because of use of DB() for _, sz := range []int64{100 << 20, 100} { maxCheckpointSize = sz - cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook")) + cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"), feedTestNoForcedSyntheticTimestamps) } } diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index b3aa7a02b6e5..dd212cedac00 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -61,3 +61,10 @@ func getEncoder( return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format) } } + +// timestampToString converts an internal timestamp to the string form used in +// all encoders. This could be made more efficient. And/or it could be configurable +// to include the Synthetic flag when present, but that's unlikely to be needed. +func timestampToString(t hlc.Timestamp) string { + return t.WithSynthetic(false).AsOfSystemTime() +} diff --git a/pkg/ccl/changefeedccl/encoder_json.go b/pkg/ccl/changefeedccl/encoder_json.go index ec3057183efd..13c7d66d770c 100644 --- a/pkg/ccl/changefeedccl/encoder_json.go +++ b/pkg/ccl/changefeedccl/encoder_json.go @@ -231,13 +231,13 @@ func (e *jsonEncoder) initRawEnvelope() error { } if e.updatedField { - if err := metaBuilder.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil { + if err := metaBuilder.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil { return nil, err } } if e.mvccTimestampField { - if err := metaBuilder.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil { + if err := metaBuilder.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil { return nil, err } } @@ -324,13 +324,13 @@ func (e *jsonEncoder) initWrappedEnvelope() error { } if e.updatedField { - if err := b.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil { + if err := b.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil { return nil, err } } if e.mvccTimestampField { - if err := b.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil { + if err := b.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil { return nil, err } } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 70716810cc55..a68e6b471f6f 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -514,12 +514,13 @@ type updateArgsFn func(args *base.TestServerArgs) type updateKnobsFn func(knobs *base.TestingKnobs) type feedTestOptions struct { - useTenant bool - argsFn updateArgsFn - knobsFn updateKnobsFn - externalIODir string - allowedSinkTypes []string - disabledSinkTypes []string + useTenant bool + argsFn updateArgsFn + knobsFn updateKnobsFn + externalIODir string + allowedSinkTypes []string + disabledSinkTypes []string + disableSyntheticTimestamps bool } type feedTestOption func(opts *feedTestOptions) @@ -528,6 +529,12 @@ type feedTestOption func(opts *feedTestOptions) // from randomly running on a tenant. var feedTestNoTenants = func(opts *feedTestOptions) { opts.useTenant = false } +// feedTestNoForcedSyntheticTimestamps is a feedTestOption that will prevent +// the test from randomly forcing timestamps to be synthetic and offset five seconds into the future from +// what they would otherwise be. It doesn't prevent synthetic timestamps but they're otherwise unlikely to +// occur in tests. +var feedTestNoForcedSyntheticTimestamps = func(opts *feedTestOptions) { opts.disableSyntheticTimestamps = true } + var feedTestForceSink = func(sinkType string) feedTestOption { return feedTestRestrictSinks(sinkType) } @@ -583,6 +590,23 @@ func makeOptions(opts ...feedTestOption) feedTestOptions { for _, o := range opts { o(&options) } + if !options.disableSyntheticTimestamps && rand.Intn(2) == 0 { + // Offset all timestamps a random (but consistent per test) amount into the + // future to ensure we can handle that. Always chooses an integer number of + // seconds for easier debugging and so that 0 is a possibility. + offset := int64(rand.Intn(6)) * time.Second.Nanoseconds() + oldKnobsFn := options.knobsFn + options.knobsFn = func(knobs *base.TestingKnobs) { + if oldKnobsFn != nil { + oldKnobsFn(knobs) + } + knobs.DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs).FeedKnobs.ModifyTimestamps = func(t *hlc.Timestamp) { + t.Add(offset, 0) + t.Synthetic = true + } + } + } return options } diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 8ed3f8feceb1..14a0d7db6476 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -97,12 +97,19 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { return err } } + if p.knobs.ModifyTimestamps != nil { + e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan} + p.knobs.ModifyTimestamps(&e.Val.Value.Timestamp) + } if err := p.memBuf.Add( ctx, kvevent.MakeKVEvent(e.RangeFeedEvent), ); err != nil { return err } case *roachpb.RangeFeedCheckpoint: + if p.knobs.ModifyTimestamps != nil { + p.knobs.ModifyTimestamps(&t.ResolvedTS) + } if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { // RangeFeed happily forwards any closed timestamps it receives as // soon as there are no outstanding intents under them. diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go index dd87b86979f7..6fd5074e2fa5 100644 --- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go +++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // TestingKnobs are the testing knobs for kvfeed. @@ -29,6 +30,9 @@ type TestingKnobs struct { // EndTimeReached is a callback that may return true to indicate the // feed should exit because its end time has been reached. EndTimeReached func() bool + // ModifyTimestamps is called on the timestamp for each RangefeedMessage + // before converting it into a kv event. + ModifyTimestamps func(*hlc.Timestamp) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go index 8aa43b15373e..6a136564a3fc 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go @@ -327,6 +327,16 @@ func TestCreateChangefeedScheduleIfNotExists(t *testing.T) { const selectQuery = "SELECT label FROM [SHOW SCHEDULES FOR CHANGEFEED]" + if th.cfg == nil { + t.Log("cfg") + t.FailNow() + } + + if th.cfg.InternalExecutor == nil { + t.Log("InternalExecutor") + t.FailNow() + } + rows, err := th.cfg.InternalExecutor.QueryBufferedEx( context.Background(), "check-sched", nil, sessiondata.RootUserSessionDataOverride,