diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index f3694122b228..0eb4c790cdeb 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1441,7 +1441,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { } } - cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection, feedTestNoForcedSyntheticTimestamps) + cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection) } func TestAlterChangefeedInitialScan(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index 84f6a73635fa..9a5203c499a2 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -1028,7 +1028,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow( return nil, changefeedbase.WithTerminalError( errors.Errorf(`unknown metadata timestamp type: %T`, u)) } - native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts)) + native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } if r.opts.resolvedField { @@ -1040,7 +1040,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow( return nil, changefeedbase.WithTerminalError( errors.Errorf(`unknown metadata timestamp type: %T`, u)) } - native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts)) + native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } for k := range meta { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index bbb146db773d..f71468fbc930 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6894,7 +6894,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // TODO(ssd): Tenant testing disabled because of use of DB() for _, sz := range []int64{100 << 20, 100} { maxCheckpointSize = sz - cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"), feedTestNoForcedSyntheticTimestamps) + cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook")) } } diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 195cf289776d..00f474854733 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -65,10 +65,3 @@ func getEncoder( return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format) } } - -// timestampToString converts an internal timestamp to the string form used in -// all encoders. This could be made more efficient. And/or it could be configurable -// to include the Synthetic flag when present, but that's unlikely to be needed. -func timestampToString(t hlc.Timestamp) string { - return t.WithSynthetic(false).AsOfSystemTime() -} diff --git a/pkg/ccl/changefeedccl/encoder_json.go b/pkg/ccl/changefeedccl/encoder_json.go index 4e84cb1c6b9a..db2e30826c46 100644 --- a/pkg/ccl/changefeedccl/encoder_json.go +++ b/pkg/ccl/changefeedccl/encoder_json.go @@ -261,13 +261,13 @@ func (e *jsonEncoder) initRawEnvelope() error { } if e.updatedField { - if err := metaBuilder.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil { + if err := metaBuilder.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil { return nil, err } } if e.mvccTimestampField { - if err := metaBuilder.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil { + if err := metaBuilder.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil { return nil, err } } @@ -355,13 +355,13 @@ func (e *jsonEncoder) initWrappedEnvelope() error { } if e.updatedField { - if err := b.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil { + if err := b.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil { return nil, err } } if e.mvccTimestampField { - if err := b.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil { + if err := b.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil { return nil, err } } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 75fc46f228b8..559302efd9cb 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -549,7 +549,6 @@ type feedTestOptions struct { externalIODir string allowedSinkTypes []string disabledSinkTypes []string - disableSyntheticTimestamps bool settings *cluster.Settings } @@ -570,12 +569,6 @@ var feedTestNoExternalConnection = func(opts *feedTestOptions) { opts.forceNoExt // has privileges to create changefeeds on tables in the default database `d` only. var feedTestUseRootUserConnection = func(opts *feedTestOptions) { opts.forceRootUserConnection = true } -// feedTestNoForcedSyntheticTimestamps is a feedTestOption that will prevent -// the test from randomly forcing timestamps to be synthetic and offset five seconds into the future from -// what they would otherwise be. It doesn't prevent synthetic timestamps but they're otherwise unlikely to -// occur in tests. -var feedTestNoForcedSyntheticTimestamps = func(opts *feedTestOptions) { opts.disableSyntheticTimestamps = true } - var feedTestForceSink = func(sinkType string) feedTestOption { return feedTestRestrictSinks(sinkType) } @@ -631,30 +624,6 @@ func makeOptions(opts ...feedTestOption) feedTestOptions { for _, o := range opts { o(&options) } - if !options.disableSyntheticTimestamps && rand.Intn(2) == 0 { - // Offset all timestamps a random (but consistent per test) amount into the - // future to ensure we can handle that. Always chooses an integer number of - // seconds for easier debugging and so that 0 is a possibility. - offset := int64(rand.Intn(6)) * time.Second.Nanoseconds() - // TODO(#105053): Remove this line - _ = offset - oldKnobsFn := options.knobsFn - options.knobsFn = func(knobs *base.TestingKnobs) { - if oldKnobsFn != nil { - oldKnobsFn(knobs) - } - knobs.DistSQL.(*execinfra.TestingKnobs). - Changefeed.(*TestingKnobs).FeedKnobs.ModifyTimestamps = func(t *hlc.Timestamp) { - // NOTE(ricky): This line of code should be uncommented. - // It used to be just t.Add(offset, 0), but t.Add() has no side - // effects so this was a no-op. *t = t.Add(offset, 0) is correct, - // but causes test failures. - // TODO(#105053): Uncomment and fix test failures - //*t = t.Add(offset, 0) - t.Synthetic = true - } - } - } return options } diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 7c5b23032055..be0c1b325cba 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -109,20 +109,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { return err } } - if p.knobs.ModifyTimestamps != nil { - e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan} - p.knobs.ModifyTimestamps(&e.Val.Value.Timestamp) - } if err := p.memBuf.Add( ctx, kvevent.MakeKVEvent(e.RangeFeedEvent), ); err != nil { return err } case *kvpb.RangeFeedCheckpoint: - if p.knobs.ModifyTimestamps != nil { - e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan} - p.knobs.ModifyTimestamps(&e.Checkpoint.ResolvedTS) - } if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { // RangeFeed happily forwards any closed timestamps it receives as // soon as there are no outstanding intents under them. diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go index 541c3a34c23b..42a5cbc6592b 100644 --- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go +++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go @@ -12,7 +12,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // TestingKnobs are the testing knobs for kvfeed. @@ -30,9 +29,6 @@ type TestingKnobs struct { // EndTimeReached is a callback that may return true to indicate the // feed should exit because its end time has been reached. EndTimeReached func() bool - // ModifyTimestamps is called on the timestamp for each RangefeedMessage - // before converting it into a kv event. - ModifyTimestamps func(*hlc.Timestamp) // RangefeedOptions lets the kvfeed override rangefeed settings. RangefeedOptions []kvcoord.RangeFeedOption } diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 87e41aad38f7..7f43df9e97f1 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -172,10 +172,10 @@ func (w *parquetWriter) populateDatums( datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) if w.encodingOpts.UpdatedTimestamps { - datums = append(datums, tree.NewDString(timestampToString(updated))) + datums = append(datums, tree.NewDString(updated.AsOfSystemTime())) } if w.encodingOpts.MVCCTimestamps { - datums = append(datums, tree.NewDString(timestampToString(mvcc))) + datums = append(datums, tree.NewDString(mvcc.AsOfSystemTime())) } if w.encodingOpts.Diff { if prevRow.IsDeleted() {