Skip to content

Commit

Permalink
Merge pull request cockroachdb#92123 from HonoreDB/cdc_test_synthetic…
Browse files Browse the repository at this point in the history
…_timestamps

changefeedccl: suppress the synthetic marker in changefeed output
  • Loading branch information
HonoreDB authored Jan 12, 2023
2 parents 241f613 + 5259c1d commit f62b59f
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoForcedSyntheticTimestamps)
}

func TestAlterChangefeedUpdateFilter(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
}
}
if r.opts.resolvedField {
Expand All @@ -1020,7 +1020,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
}
}
for k := range meta {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6111,7 +6111,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// TODO(ssd): Tenant testing disabled because of use of DB()
for _, sz := range []int64{100 << 20, 100} {
maxCheckpointSize = sz
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"))
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"), feedTestNoForcedSyntheticTimestamps)
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ func getEncoder(
return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format)
}
}

// timestampToString converts an internal timestamp to the string form used in
// all encoders. This could be made more efficient. And/or it could be configurable
// to include the Synthetic flag when present, but that's unlikely to be needed.
func timestampToString(t hlc.Timestamp) string {
return t.WithSynthetic(false).AsOfSystemTime()
}
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ func (e *jsonEncoder) initRawEnvelope() error {
}

if e.updatedField {
if err := metaBuilder.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
if err := metaBuilder.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -324,13 +324,13 @@ func (e *jsonEncoder) initWrappedEnvelope() error {
}

if e.updatedField {
if err := b.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
if err := b.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := b.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
if err := b.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
return nil, err
}
}
Expand Down
36 changes: 30 additions & 6 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,13 @@ type updateArgsFn func(args *base.TestServerArgs)
type updateKnobsFn func(knobs *base.TestingKnobs)

type feedTestOptions struct {
useTenant bool
argsFn updateArgsFn
knobsFn updateKnobsFn
externalIODir string
allowedSinkTypes []string
disabledSinkTypes []string
useTenant bool
argsFn updateArgsFn
knobsFn updateKnobsFn
externalIODir string
allowedSinkTypes []string
disabledSinkTypes []string
disableSyntheticTimestamps bool
}

type feedTestOption func(opts *feedTestOptions)
Expand All @@ -528,6 +529,12 @@ type feedTestOption func(opts *feedTestOptions)
// from randomly running on a tenant.
var feedTestNoTenants = func(opts *feedTestOptions) { opts.useTenant = false }

// feedTestNoForcedSyntheticTimestamps is a feedTestOption that will prevent
// the test from randomly forcing timestamps to be synthetic and offset five seconds into the future from
// what they would otherwise be. It doesn't prevent synthetic timestamps but they're otherwise unlikely to
// occur in tests.
var feedTestNoForcedSyntheticTimestamps = func(opts *feedTestOptions) { opts.disableSyntheticTimestamps = true }

var feedTestForceSink = func(sinkType string) feedTestOption {
return feedTestRestrictSinks(sinkType)
}
Expand Down Expand Up @@ -583,6 +590,23 @@ func makeOptions(opts ...feedTestOption) feedTestOptions {
for _, o := range opts {
o(&options)
}
if !options.disableSyntheticTimestamps && rand.Intn(2) == 0 {
// Offset all timestamps a random (but consistent per test) amount into the
// future to ensure we can handle that. Always chooses an integer number of
// seconds for easier debugging and so that 0 is a possibility.
offset := int64(rand.Intn(6)) * time.Second.Nanoseconds()
oldKnobsFn := options.knobsFn
options.knobsFn = func(knobs *base.TestingKnobs) {
if oldKnobsFn != nil {
oldKnobsFn(knobs)
}
knobs.DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs).FeedKnobs.ModifyTimestamps = func(t *hlc.Timestamp) {
t.Add(offset, 0)
t.Synthetic = true
}
}
}
return options
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,19 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return err
}
}
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Val.Value.Timestamp)
}
if err := p.memBuf.Add(
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
); err != nil {
return err
}
case *roachpb.RangeFeedCheckpoint:
if p.knobs.ModifyTimestamps != nil {
p.knobs.ModifyTimestamps(&t.ResolvedTS)
}
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TestingKnobs are the testing knobs for kvfeed.
Expand All @@ -29,6 +30,9 @@ type TestingKnobs struct {
// EndTimeReached is a callback that may return true to indicate the
// feed should exit because its end time has been reached.
EndTimeReached func() bool
// ModifyTimestamps is called on the timestamp for each RangefeedMessage
// before converting it into a kv event.
ModifyTimestamps func(*hlc.Timestamp)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/scheduled_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,16 @@ func TestCreateChangefeedScheduleIfNotExists(t *testing.T) {

const selectQuery = "SELECT label FROM [SHOW SCHEDULES FOR CHANGEFEED]"

if th.cfg == nil {
t.Log("cfg")
t.FailNow()
}

if th.cfg.InternalExecutor == nil {
t.Log("InternalExecutor")
t.FailNow()
}

rows, err := th.cfg.InternalExecutor.QueryBufferedEx(
context.Background(), "check-sched", nil,
sessiondata.RootUserSessionDataOverride,
Expand Down

0 comments on commit f62b59f

Please sign in to comment.