diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index e9fc4b11e748..d9d4daa57d42 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -581,7 +581,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) { } // ensure that we do not emit a resolved timestamp - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { return true } @@ -616,7 +616,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) { ) // allow the changefeed to emit resolved events now - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { return false } @@ -973,14 +973,14 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { s.SystemServer.DB(), s.Codec, "d", "foo") tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) - // ShouldSkipResolved should ensure that once the backfill begins, the following resolved events + // FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events // that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some // but not all of the time, which results in a checkpoint eventually being created haveGaps := false var backfillTimestamp hlc.Timestamp var initialCheckpoint roachpb.SpanGroup var foundCheckpoint int32 - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { // Stop resolving anything after checkpoint set to avoid eventually resolving the full span if initialCheckpoint.Len() > 0 { return true @@ -1077,7 +1077,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { // Emit resolved events for the majority of spans. Be extra paranoid and ensure that // we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing). haveGaps := false - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { rndMu.Lock() defer rndMu.Unlock() @@ -1159,7 +1159,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { // Collect spans we attempt to resolve after when we resume. var resolvedFoo []roachpb.Span - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { if !r.Span.Equal(fooTableSpan) { resolvedFoo = append(resolvedFoo, r.Span) } diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 1982c82c0710..08a7f06d1d33 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -255,14 +255,14 @@ func createBenchmarkChangefeed( if err != nil { return nil, nil, err } - tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) { + tickFn := func(ctx context.Context) (jobspb.ResolvedSpan, error) { event, err := buf.Get(ctx) if err != nil { - return nil, err + return jobspb.ResolvedSpan{}, err } if event.Type() == kvevent.TypeKV { if err := eventConsumer.ConsumeEvent(ctx, event); err != nil { - return nil, err + return jobspb.ResolvedSpan{}, err } } return event.Resolved(), nil diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 1d423312bd2a..eac7ee4217c9 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -542,7 +542,7 @@ func (ca *changeAggregator) tick() error { a := event.DetachAlloc() a.Release(ca.Ctx) resolved := event.Resolved() - if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) { + if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: @@ -555,8 +555,8 @@ func (ca *changeAggregator) tick() error { // noteResolvedSpan periodically flushes Frontier progress from the current // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress -func (ca *changeAggregator) noteResolvedSpan(resolved *jobspb.ResolvedSpan) error { - advanced, err := ca.frontier.ForwardResolvedSpan(*resolved) +func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error { + advanced, err := ca.frontier.ForwardResolvedSpan(resolved) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 85fae2a501f7..90ffb6f14939 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1636,14 +1636,14 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { s.SystemServer.DB(), s.Codec, "d", "foo") tableSpan := fooDesc.PrimaryIndexSpan(s.Codec) - // ShouldSkipResolved should ensure that once the backfill begins, the following resolved events + // FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events // that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some // but not all of the time, which results in a checkpoint eventually being created haveGaps := false var backfillTimestamp hlc.Timestamp var initialCheckpoint roachpb.SpanGroup var foundCheckpoint int32 - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { // Stop resolving anything after checkpoint set to avoid eventually resolving the full span if initialCheckpoint.Len() > 0 { return true @@ -1706,7 +1706,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { var secondCheckpoint roachpb.SpanGroup foundCheckpoint = 0 haveGaps = false - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { // Stop resolving anything after second checkpoint set to avoid backfill completion if secondCheckpoint.Len() > 0 { return true @@ -1756,7 +1756,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { // Collect spans we attempt to resolve after when we resume. var resolved []roachpb.Span - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { resolved = append(resolved, r.Span) return false } @@ -5659,7 +5659,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // Emit resolved events for majority of spans. Be extra paranoid and ensure that // we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing). haveGaps := false - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { if r.Span.Equal(tableSpan) { // Do not emit resolved events for the entire table span. // We "simulate" large table by splitting single table span into many parts, so @@ -5742,7 +5742,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // Collect spans we attempt to resolve after when we resume. var resolved []roachpb.Span - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { if !r.Span.Equal(tableSpan) { resolved = append(resolved, r.Span) } @@ -6865,7 +6865,7 @@ func TestChangefeedFlushesSinkToReleaseMemory(t *testing.T) { // an effect of never advancing the frontier, and thus never flushing // the sink due to frontier advancement. The only time we flush the sink // is if the memory pressure causes flush request to be delivered. - knobs.ShouldSkipResolved = func(_ *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(_ jobspb.ResolvedSpan) bool { return true } diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index 81fa1b2eae91..7dd0369581d5 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -92,7 +92,7 @@ type Event struct { kv roachpb.KeyValue prevVal roachpb.Value flush bool - resolved *jobspb.ResolvedSpan + resolved jobspb.ResolvedSpan backfillTimestamp hlc.Timestamp bufferAddTimestamp time.Time approxSize int @@ -104,12 +104,14 @@ func (b *Event) Type() Type { if b.kv.Key != nil { return TypeKV } - if b.resolved != nil { + if b.resolved.Span.Key != nil { return TypeResolved } if b.flush { return TypeFlush } + //fmt.Printf("AAAA %t %t", b.resolved.Timestamp.IsSet(), b.resolved.Timestamp.IsEmpty()) + //panic("BBB") return TypeUnknown } @@ -132,7 +134,7 @@ func (b *Event) PrevValue() roachpb.Value { // Resolved will be non-nil if this is a resolved timestamp event (i.e. IsKV() // returns false). -func (b *Event) Resolved() *jobspb.ResolvedSpan { +func (b *Event) Resolved() jobspb.ResolvedSpan { return b.resolved } @@ -202,14 +204,15 @@ func (b *Event) DetachAlloc() Alloc { func MakeResolvedEvent( span roachpb.Span, ts hlc.Timestamp, boundaryType jobspb.ResolvedSpan_BoundaryType, ) Event { - return Event{ - resolved: &jobspb.ResolvedSpan{ + e := Event{ + resolved: jobspb.ResolvedSpan{ Span: span, Timestamp: ts, BoundaryType: boundaryType, }, - approxSize: span.Size() + ts.Size() + 4, } + e.approxSize = e.resolved.Size() + 1 + return e } // MakeKVEvent returns KV event. diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go index 9ec8f58449a5..b5360e074d2c 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go @@ -33,7 +33,7 @@ type recordResolvedWriter struct { func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error { if e.Type() == kvevent.TypeResolved { - r.resolved = append(r.resolved, *e.Resolved()) + r.resolved = append(r.resolved, e.Resolved()) } return nil } diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 4c4061a6fd7f..cafb5c04986a 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -31,9 +31,9 @@ type TestingKnobs struct { // It allows the tests to muck with the Sink, and even return altogether different // implementation. WrapSink func(s Sink, jobID jobspb.JobID) Sink - // ShouldSkipResolved is a filter returning true if the resolved span event should - // be skipped. - ShouldSkipResolved func(resolved *jobspb.ResolvedSpan) bool + // FilterSpanWithMutation is a filter returning true if the resolved span event should + // be skipped. This method takes a pointer in case resolved spans need to be mutated. + FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool // FeedKnobs are kvfeed testing knobs. FeedKnobs kvfeed.TestingKnobs // NullSinkIsExternalIOAccounted controls whether we record