Skip to content

Commit

Permalink
changefeedccl: remove handling of synthetic timestamps
Browse files Browse the repository at this point in the history
Informs #101938.

This commit is essentially a revert of 5259c1d. It removes the handling
of synthetic timestamps from the changefeedccl package.

This flag has been deprecated since v22.2 and is no longer consulted in
uncertainty interval checks or by transaction commit-wait. It will never
makes its way into the changefeedccl package once it is removed from the
proto definition.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 9, 2024
1 parent 9e1d700 commit 1b4f26e
Show file tree
Hide file tree
Showing 9 changed files with 10 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection, feedTestNoForcedSyntheticTimestamps)
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
}

func TestAlterChangefeedInitialScan(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
if r.opts.resolvedField {
Expand All @@ -1040,7 +1040,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
for k := range meta {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6894,7 +6894,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// TODO(ssd): Tenant testing disabled because of use of DB()
for _, sz := range []int64{100 << 20, 100} {
maxCheckpointSize = sz
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"), feedTestNoForcedSyntheticTimestamps)
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"))
}
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,3 @@ func getEncoder(
return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format)
}
}

// timestampToString converts an internal timestamp to the string form used in
// all encoders. This could be made more efficient. And/or it could be configurable
// to include the Synthetic flag when present, but that's unlikely to be needed.
func timestampToString(t hlc.Timestamp) string {
return t.WithSynthetic(false).AsOfSystemTime()
}
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ func (e *jsonEncoder) initRawEnvelope() error {
}

if e.updatedField {
if err := metaBuilder.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
if err := metaBuilder.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -355,13 +355,13 @@ func (e *jsonEncoder) initWrappedEnvelope() error {
}

if e.updatedField {
if err := b.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
if err := b.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := b.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
if err := b.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
return nil, err
}
}
Expand Down
31 changes: 0 additions & 31 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ type feedTestOptions struct {
externalIODir string
allowedSinkTypes []string
disabledSinkTypes []string
disableSyntheticTimestamps bool
settings *cluster.Settings
}

Expand All @@ -570,12 +569,6 @@ var feedTestNoExternalConnection = func(opts *feedTestOptions) { opts.forceNoExt
// has privileges to create changefeeds on tables in the default database `d` only.
var feedTestUseRootUserConnection = func(opts *feedTestOptions) { opts.forceRootUserConnection = true }

// feedTestNoForcedSyntheticTimestamps is a feedTestOption that will prevent
// the test from randomly forcing timestamps to be synthetic and offset five seconds into the future from
// what they would otherwise be. It doesn't prevent synthetic timestamps but they're otherwise unlikely to
// occur in tests.
var feedTestNoForcedSyntheticTimestamps = func(opts *feedTestOptions) { opts.disableSyntheticTimestamps = true }

var feedTestForceSink = func(sinkType string) feedTestOption {
return feedTestRestrictSinks(sinkType)
}
Expand Down Expand Up @@ -631,30 +624,6 @@ func makeOptions(opts ...feedTestOption) feedTestOptions {
for _, o := range opts {
o(&options)
}
if !options.disableSyntheticTimestamps && rand.Intn(2) == 0 {
// Offset all timestamps a random (but consistent per test) amount into the
// future to ensure we can handle that. Always chooses an integer number of
// seconds for easier debugging and so that 0 is a possibility.
offset := int64(rand.Intn(6)) * time.Second.Nanoseconds()
// TODO(#105053): Remove this line
_ = offset
oldKnobsFn := options.knobsFn
options.knobsFn = func(knobs *base.TestingKnobs) {
if oldKnobsFn != nil {
oldKnobsFn(knobs)
}
knobs.DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs).FeedKnobs.ModifyTimestamps = func(t *hlc.Timestamp) {
// NOTE(ricky): This line of code should be uncommented.
// It used to be just t.Add(offset, 0), but t.Add() has no side
// effects so this was a no-op. *t = t.Add(offset, 0) is correct,
// but causes test failures.
// TODO(#105053): Uncomment and fix test failures
//*t = t.Add(offset, 0)
t.Synthetic = true
}
}
}
return options
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return err
}
}
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Val.Value.Timestamp)
}
if err := p.memBuf.Add(
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
); err != nil {
return err
}
case *kvpb.RangeFeedCheckpoint:
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Checkpoint.ResolvedTS)
}
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TestingKnobs are the testing knobs for kvfeed.
Expand All @@ -30,9 +29,6 @@ type TestingKnobs struct {
// EndTimeReached is a callback that may return true to indicate the
// feed should exit because its end time has been reached.
EndTimeReached func() bool
// ModifyTimestamps is called on the timestamp for each RangefeedMessage
// before converting it into a kv event.
ModifyTimestamps func(*hlc.Timestamp)
// RangefeedOptions lets the kvfeed override rangefeed settings.
RangefeedOptions []kvcoord.RangeFeedOption
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ func (w *parquetWriter) populateDatums(
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if w.encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
datums = append(datums, tree.NewDString(updated.AsOfSystemTime()))
}
if w.encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
datums = append(datums, tree.NewDString(mvcc.AsOfSystemTime()))
}
if w.encodingOpts.Diff {
if prevRow.IsDeleted() {
Expand Down

0 comments on commit 1b4f26e

Please sign in to comment.