diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d1ca0fd77f4b..86e5223b5fd5 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -85,6 +85,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randident" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -7143,47 +7144,65 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) { defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - knobs := s.TestingKnobs. - DistSQL.(*execinfra.TestingKnobs). - Changefeed.(*TestingKnobs) - endTimeReached := make(chan struct{}) - knobs.FeedKnobs.EndTimeReached = func() bool { - select { - case <-endTimeReached: - return true - default: - return false - } - } - sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") - sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)") var tsCursor string sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp())").Scan(&tsCursor) - sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)") - fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime() - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, fakeEndTime) - defer closeFeed(t, feed) + // Insert 1k rows -- using separate statements to get different MVCC timestamps. + for i := 0; i < 1024; i++ { + sqlDB.Exec(t, "INSERT INTO foo VALUES ($1)", i) + } - assertPayloads(t, feed, []string{ - `foo: [4]->{"after": {"a": 4}}`, - `foo: [5]->{"after": {"a": 5}}`, - `foo: [6]->{"after": {"a": 6}}`, - }) - close(endTimeReached) + // Split table into multiple ranges to make things more interesting. + sqlDB.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (100), (200), (400), (800)") + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + fooSpan := func() roachpb.Span { + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + s.Server.DB(), s.Codec, "d", "foo") + return fooDesc.PrimaryIndexSpan(s.Codec) + }() + // Capture resolved events emitted during changefeed. We expect + // every range to emit resolved event with end_time timestamp. + frontier, err := span.MakeFrontier(fooSpan) + require.NoError(t, err) + knobs.FilterSpanWithMutation = func(rs *jobspb.ResolvedSpan) (bool, error) { + _, err := frontier.Forward(rs.Span, rs.Timestamp) + return false, err + } + + // endTime must be after creation time (5 seconds should be enough + // to reach create changefeed statement and process it). + endTime := s.Server.Clock().Now().AddDuration(5 * time.Second) + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", + tsCursor, eval.TimestampToDecimalDatum(endTime).String()) + defer closeFeed(t, feed) + + // Don't care much about the values emitted (tested elsewhere) -- all + // we want to make sure is that the feed terminates. testFeed := feed.(cdctest.EnterpriseTestFeed) require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusSucceeded })) + + // After changefeed completes, verify we have seen all ranges emit resolved + // event with end_time timestamp. That is: verify frontier.Frontier() is at end_time. + expectedFrontier := endTime.Prev() + testutils.SucceedsWithin(t, func() error { + if expectedFrontier.EqOrdering(frontier.Frontier()) { + return nil + } + return errors.Newf("still waiting for frontier to reach %s, current %s", + expectedFrontier, frontier.Frontier()) + }, 5*time.Second) } - // TODO: Fix sinkless feeds not providing pre-close events if Next is called - // after the feed was closed cdcTest(t, testFn, feedTestEnterpriseSinks) } diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index e18de9917899..0e2c3d731940 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -136,6 +136,11 @@ func (t Type) Index() int { } } +// Raw returns the underlying RangeFeedEvent. +func (e *Event) Raw() *kvpb.RangeFeedEvent { + return e.ev +} + // ApproximateSize returns events approximate size in bytes. func (e *Event) ApproximateSize() int { if e.et == TypeFlush { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 4e8d12ea4f95..77205a2cd8f0 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -691,6 +691,17 @@ func copyFromSourceToDestUntilTableEvent( return nil } + // spanFrontier returns frontier timestamp for the specified span. + spanFrontier = func(sp roachpb.Span) (sf hlc.Timestamp) { + frontier.SpanEntries(sp, func(_ roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { + if sf.IsEmpty() || ts.Less(sf) { + sf = ts + } + return span.ContinueMatch + }) + return sf + } + // applyScanBoundary apply the boundary that we set above. // In most cases, a boundary isn't reached, and thus we do nothing. // If a boundary is reached but event `e` happens before that boundary, @@ -717,10 +728,29 @@ func copyFromSourceToDestUntilTableEvent( if resolved.Timestamp.LessEq(boundaryResolvedTimestamp) { return false, false, nil } + + // At this point, we know event is after boundaryResolvedTimestamp. + skipEvent = true + + if _, ok := scanBoundary.(*errEndTimeReached); ok { + // We know we have end time boundary. In this case, we do not want to + // skip this event because we want to make sure we emit checkpoint at + // exactly boundaryResolvedTimestamp. This checkpoint can be used to + // produce span based changefeed checkpoints if needed. + // We only want to emit this checkpoint once, and then we can skip + // subsequent checkpoints for this span until entire frontier reaches + // boundary timestamp. + if boundaryResolvedTimestamp.Compare(spanFrontier(resolved.Span)) > 0 { + e.Raw().Checkpoint.ResolvedTS = boundaryResolvedTimestamp + skipEvent = false + } + } + if _, err := frontier.Forward(resolved.Span, boundaryResolvedTimestamp); err != nil { - return false, false, err + return true, false, err } - return true, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil + + return skipEvent, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil case kvevent.TypeFlush: // TypeFlush events have a timestamp of zero and should have already // been processed by the timestamp check above. We include this here @@ -778,8 +808,13 @@ func copyFromSourceToDestUntilTableEvent( if scanBoundaryReached { // All component rangefeeds are now at the boundary. // Break out of the ctxgroup by returning the sentinel error. + // (We don't care if skipEntry is false -- scan boundary can only be + // returned for resolved event, and we don't care if we emit this event + // since exiting with scan boundary error will cause appropriate + // boundary type (EXIT) to be emitted for the entire frontier) return scanBoundary } + if skipEntry { return nil }