Skip to content

Commit

Permalink
changefeedccl: Emit span resolved event when end time reached
Browse files Browse the repository at this point in the history
Changefeed supports a mode where the user wants to emit
all events that occurred since some time in the past (`cursor`),
and end the changefeed (`end_time) at the time in the near future.

In this mode, the rangefeed catchup scan starting from `cursor`
position could take some time -- maybe even a lot of time --
and in this case, the very first checkpoint kvfeed will observe
will be after `end_time`.  All of the events, including
checkpoints after `end_time` are skipped, as they should.

However, this meant that no changefeed checkpoint
records could be produced until entire changefeed completes.
This PR ensures that once the `end_time` is reached, we will
emit 1 "resolved event" for that span, so that changefeed
can produce span based checkpoint if needed.

Fixes #108464

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 8, 2023
1 parent 3595935 commit ca4683b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 28 deletions.
71 changes: 45 additions & 26 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randident"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -7143,47 +7144,65 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
endTimeReached := make(chan struct{})
knobs.FeedKnobs.EndTimeReached = func() bool {
select {
case <-endTimeReached:
return true
default:
return false
}
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")

var tsCursor string
sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp())").Scan(&tsCursor)
sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)")

fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime()
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, fakeEndTime)
defer closeFeed(t, feed)
// Insert 1k rows -- using separate statements to get different MVCC timestamps.
for i := 0; i < 1024; i++ {
sqlDB.Exec(t, "INSERT INTO foo VALUES ($1)", i)
}

assertPayloads(t, feed, []string{
`foo: [4]->{"after": {"a": 4}}`,
`foo: [5]->{"after": {"a": 5}}`,
`foo: [6]->{"after": {"a": 6}}`,
})
close(endTimeReached)
// Split table into multiple ranges to make things more interesting.
sqlDB.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (100), (200), (400), (800)")

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
fooSpan := func() roachpb.Span {
fooDesc := desctestutils.TestingGetPublicTableDescriptor(
s.Server.DB(), s.Codec, "d", "foo")
return fooDesc.PrimaryIndexSpan(s.Codec)
}()

// Capture resolved events emitted during changefeed. We expect
// every range to emit resolved event with end_time timestamp.
frontier, err := span.MakeFrontier(fooSpan)
require.NoError(t, err)
knobs.FilterSpanWithMutation = func(rs *jobspb.ResolvedSpan) (bool, error) {
_, err := frontier.Forward(rs.Span, rs.Timestamp)
return false, err
}

// endTime must be after creation time (5 seconds should be enough
// to reach create changefeed statement and process it).
endTime := s.Server.Clock().Now().AddDuration(5 * time.Second)
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan",
tsCursor, eval.TimestampToDecimalDatum(endTime).String())
defer closeFeed(t, feed)

// Don't care much about the values emitted (tested elsewhere) -- all
// we want to make sure is that the feed terminates.
testFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusSucceeded
}))

// After changefeed completes, verify we have seen all ranges emit resolved
// event with end_time timestamp. That is: verify frontier.Frontier() is at end_time.
expectedFrontier := endTime.Prev()
testutils.SucceedsWithin(t, func() error {
if expectedFrontier.EqOrdering(frontier.Frontier()) {
return nil
}
return errors.Newf("still waiting for frontier to reach %s, current %s",
expectedFrontier, frontier.Frontier())
}, 5*time.Second)
}

// TODO: Fix sinkless feeds not providing pre-close events if Next is called
// after the feed was closed
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (t Type) Index() int {
}
}

// Raw returns the underlying RangeFeedEvent.
func (e *Event) Raw() *kvpb.RangeFeedEvent {
return e.ev
}

// ApproximateSize returns events approximate size in bytes.
func (e *Event) ApproximateSize() int {
if e.et == TypeFlush {
Expand Down
39 changes: 37 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,17 @@ func copyFromSourceToDestUntilTableEvent(
return nil
}

// spanFrontier returns frontier timestamp for the specified span.
spanFrontier = func(sp roachpb.Span) (sf hlc.Timestamp) {
frontier.SpanEntries(sp, func(_ roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
if sf.IsEmpty() || ts.Less(sf) {
sf = ts
}
return span.ContinueMatch
})
return sf
}

// applyScanBoundary apply the boundary that we set above.
// In most cases, a boundary isn't reached, and thus we do nothing.
// If a boundary is reached but event `e` happens before that boundary,
Expand All @@ -717,10 +728,29 @@ func copyFromSourceToDestUntilTableEvent(
if resolved.Timestamp.LessEq(boundaryResolvedTimestamp) {
return false, false, nil
}

// At this point, we know event is after boundaryResolvedTimestamp.
skipEvent = true

if _, ok := scanBoundary.(*errEndTimeReached); ok {
// We know we have end time boundary. In this case, we do not want to
// skip this event because we want to make sure we emit checkpoint at
// exactly boundaryResolvedTimestamp. This checkpoint can be used to
// produce span based changefeed checkpoints if needed.
// We only want to emit this checkpoint once, and then we can skip
// subsequent checkpoints for this span until entire frontier reaches
// boundary timestamp.
if boundaryResolvedTimestamp.Compare(spanFrontier(resolved.Span)) > 0 {
e.Raw().Checkpoint.ResolvedTS = boundaryResolvedTimestamp
skipEvent = false
}
}

if _, err := frontier.Forward(resolved.Span, boundaryResolvedTimestamp); err != nil {
return false, false, err
return true, false, err
}
return true, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil

return skipEvent, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil
case kvevent.TypeFlush:
// TypeFlush events have a timestamp of zero and should have already
// been processed by the timestamp check above. We include this here
Expand Down Expand Up @@ -778,8 +808,13 @@ func copyFromSourceToDestUntilTableEvent(
if scanBoundaryReached {
// All component rangefeeds are now at the boundary.
// Break out of the ctxgroup by returning the sentinel error.
// (We don't care if skipEntry is false -- scan boundary can only be
// returned for resolved event, and we don't care if we emit this event
// since exiting with scan boundary error will cause appropriate
// boundary type (EXIT) to be emitted for the entire frontier)
return scanBoundary
}

if skipEntry {
return nil
}
Expand Down

0 comments on commit ca4683b

Please sign in to comment.