Skip to content

Commit

Permalink
Merge #119959
Browse files Browse the repository at this point in the history
119959: kvfeed: refactor and unit test copyFromSourceToDestUntilTableEvent r=rharding6373 a=andyyang890

This patch refactors `copyFromSourceToDestUntilTableEvent` and adds
comments and a unit test.

Epic: None

Release note: None

Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
craig[bot] and andyyang890 committed Mar 7, 2024
2 parents f00dc0e + 1afd0d2 commit 4806184
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 73 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ go_test(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/span",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//slices",
],
)
148 changes: 75 additions & 73 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
// recreate the rangefeeds.
err = g.Wait()
if err == nil {
return errors.AssertionFailedf("feed exited with no error and no scan boundary")
return errors.AssertionFailedf("feed exited with no error and no copy boundary")
} else if tErr := (*errTableEventReached)(nil); errors.As(err, &tErr) {
// TODO(ajwerner): iterate the spans and add a Resolved timestamp.
// We'll need to do this to ensure that a resolved timestamp propagates
Expand All @@ -602,19 +602,28 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
}
}

type errBoundaryReached interface {
// copyBoundary is used within copyFromSourceToDestUntilTableEvent
// to encapsulate the timestamp at which we should stop copying and an
// error explaining the reason.
type copyBoundary interface {
error
Timestamp() hlc.Timestamp
}

var _ copyBoundary = (*errTableEventReached)(nil)
var _ copyBoundary = (*errEndTimeReached)(nil)

// errTableEventReached contains the earliest table event we receive, which
// contains the timestamp at which we should stop copying.
type errTableEventReached struct {
schemafeed.TableEvent
}

func (e *errTableEventReached) Error() string {
return "scan boundary reached: " + e.String()
return "table event reached: " + e.String()
}

// errEndTimeReached contains the end timestamp at which we should stop copying.
type errEndTimeReached struct {
endTime hlc.Timestamp
}
Expand All @@ -627,73 +636,66 @@ func (e *errEndTimeReached) Timestamp() hlc.Timestamp {
return e.endTime
}

// errUnknownEvent indicates we should stop copying because we encountered an unknown event type.
type errUnknownEvent struct {
kvevent.Event
}

var _ errBoundaryReached = (*errTableEventReached)(nil)
var _ errBoundaryReached = (*errEndTimeReached)(nil)

func (e *errUnknownEvent) Error() string {
return "unknown event type"
return "unknown event type: " + e.String()
}

// copyFromSourceToDestUntilTableEvents will pull read entries from source and
// publish them to the destination if there is no table event from the SchemaFeed. If a
// tableEvent occurs then the function will return once all of the spans have
// been resolved up to the event. The first such event will be returned as
// *errBoundaryReached. A nil error will never be returned.
// copyFromSourceToDestUntilTableEvent will copy events from the source to the
// dest until a copy boundary is reached (i.e. the table event is encountered or
// the end time (if specified) is reached). Once this happens, the function will
// return after all of the spans have been resolved up to the copy boundary time.
// The frontier is forwarded for the relevant span whenever a resolved event is
// copied. A non-nil error containing details about why the copying stopped will
// always be returned.
func copyFromSourceToDestUntilTableEvent(
ctx context.Context,
dest kvevent.Writer,
source kvevent.Reader,
frontier span.Frontier,
tables schemafeed.SchemaFeed,
schemaFeed schemafeed.SchemaFeed,
endTime hlc.Timestamp,
knobs TestingKnobs,
) error {
var (
scanBoundary errBoundaryReached
endTimeIsSet = !endTime.IsEmpty()

// checkForScanBoundary takes in a new event's timestamp (event generated
// from rangefeed), and asks "Is some type of 'boundary' reached
// at 'ts'?"
// Here a boundary is reached either
// - table event(s) occurred at timestamp at or before `ts`, or
// - endTime reached at or before `ts`.
checkForScanBoundary = func(ts hlc.Timestamp) error {
// If the scanBoundary is not nil, it either means that there is a table
// event boundary set or a boundary for the end time. If the boundary is
// for the end time, we should keep looking for table events.
isEndTimeBoundary := false
if endTimeIsSet {
_, isEndTimeBoundary = scanBoundary.(*errEndTimeReached)
}
// Initially, the only copy boundary is the end time if one is specified.
// Once we discover a table event (which is before the end time), that will
// become the new boundary.
var boundary copyBoundary
if endTime.IsSet() {
boundary = &errEndTimeReached{
endTime: endTime,
}
}

if scanBoundary != nil && !isEndTimeBoundary {
var (
// checkForTableEvent takes in a new event's timestamp (event generated
// from rangefeed) and checks if a table event was encountered at or before
// said timestamp. If so, it replaces the copy boundary with the table event.
checkForTableEvent = func(ts hlc.Timestamp) error {
// There's no need to check for table events again if we already found one
// since that should already be the earliest one.
if _, ok := boundary.(*errTableEventReached); ok {
return nil
}
nextEvents, err := tables.Peek(ctx, ts)

nextEvents, err := schemaFeed.Peek(ctx, ts)
if err != nil {
return err
}

// If there are any table events that occur, we will set the scan boundary
// to this table event. However, if the end time is not empty, we will set
// the scan boundary to the specified end time. Hence, we give a higher
// precedence to table events.
if len(nextEvents) > 0 {
scanBoundary = &errTableEventReached{nextEvents[0]}
} else if endTimeIsSet && scanBoundary == nil {
scanBoundary = &errEndTimeReached{
endTime: endTime,
}
boundary = &errTableEventReached{nextEvents[0]}
}

return nil
}

// spanFrontier returns frontier timestamp for the specified span.
// spanFrontier returns the frontier timestamp for the specified span by
// finding the minimum timestamp of its subspans in the frontier.
spanFrontier = func(sp roachpb.Span) (sf hlc.Timestamp) {
frontier.SpanEntries(sp, func(_ roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
if sf.IsEmpty() || ts.Less(sf) {
Expand All @@ -704,28 +706,30 @@ func copyFromSourceToDestUntilTableEvent(
return sf
}

// applyScanBoundary apply the boundary that we set above.
// In most cases, a boundary isn't reached, and thus we do nothing.
// If a boundary is reached but event `e` happens before that boundary,
// then we let the event proceed.
// checkCopyBoundary checks the event against the current copy boundary
// to determine if we should skip the event and/or whether we can stop copying.
// We can stop copying once the frontier has reached boundary.Timestamp().Prev().
// In most cases, a boundary does not exist, and thus we do nothing.
// If a boundary has been discovered, but the event happens before that boundary,
// we let the event proceed.
// Otherwise (if `e.ts` >= `boundary.ts`), we will act as follows:
// - KV event: do nothing (we shouldn't emit this event)
// - Resolved event: advance this span to `boundary.ts` in the frontier
applyScanBoundary = func(e kvevent.Event) (skipEvent, reachedBoundary bool, err error) {
if scanBoundary == nil {
checkCopyBoundary = func(e kvevent.Event) (skipEvent, stopCopying bool, err error) {
if boundary == nil {
return false, false, nil
}
if knobs.EndTimeReached != nil && knobs.EndTimeReached() {
return true, true, nil
}
if e.Timestamp().Less(scanBoundary.Timestamp()) {
if e.Timestamp().Less(boundary.Timestamp()) {
return false, false, nil
}
switch e.Type() {
case kvevent.TypeKV:
return true, false, nil
case kvevent.TypeResolved:
boundaryResolvedTimestamp := scanBoundary.Timestamp().Prev()
boundaryResolvedTimestamp := boundary.Timestamp().Prev()
resolved := e.Resolved()
if resolved.Timestamp.LessEq(boundaryResolvedTimestamp) {
return false, false, nil
Expand All @@ -734,8 +738,8 @@ func copyFromSourceToDestUntilTableEvent(
// At this point, we know event is after boundaryResolvedTimestamp.
skipEvent = true

if _, ok := scanBoundary.(*errEndTimeReached); ok {
// We know we have end time boundary. In this case, we do not want to
if _, ok := boundary.(*errEndTimeReached); ok {
// We know we've hit the end time boundary. In this case, we do not want to
// skip this event because we want to make sure we emit checkpoint at
// exactly boundaryResolvedTimestamp. This checkpoint can be used to
// produce span based changefeed checkpoints if needed.
Expand All @@ -749,7 +753,7 @@ func copyFromSourceToDestUntilTableEvent(
}

if _, err := frontier.Forward(resolved.Span, boundaryResolvedTimestamp); err != nil {
return true, false, err
return false, false, err
}

return skipEvent, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil
Expand All @@ -758,21 +762,20 @@ func copyFromSourceToDestUntilTableEvent(
// been processed by the timestamp check above. We include this here
// for completeness.
return false, false, nil

default:
return false, false, &errUnknownEvent{e}
}
}

// addEntry simply writes to `dest`.
addEntry = func(e kvevent.Event) error {
// writeToDest writes an event to the dest.
writeToDest = func(e kvevent.Event) error {
switch e.Type() {
case kvevent.TypeKV, kvevent.TypeFlush:
return dest.Add(ctx, e)
case kvevent.TypeResolved:
// TODO(ajwerner): technically this doesn't need to happen for most
// events - we just need to make sure we forward for events which are
// at scanBoundary.Prev(). We may not yet know about that scanBoundary.
// at boundary.Prev(). We may not yet know about that boundary.
// The logic currently doesn't make this clean.
resolved := e.Resolved()
if _, err := frontier.Forward(resolved.Span, resolved.Timestamp); err != nil {
Expand All @@ -784,51 +787,50 @@ func copyFromSourceToDestUntilTableEvent(
}
}

// copyEvent copies `e` (read from rangefeed) and writes to `dest`,
// until a boundary is detected and reached (meaning all watched spans
// in the frontier have advanced to `boundary.ts.Prev()`, and it's ready for
// either EXIT or another SCAN.
copyEvent = func(e kvevent.Event) error {
if err := checkForScanBoundary(e.Timestamp()); err != nil {
// checkAndCopyEvent checks to see if a new copy boundary exists and
// whether the event should be copied. If so, it writes the event to dest.
checkAndCopyEvent = func(e kvevent.Event) error {
if err := checkForTableEvent(e.Timestamp()); err != nil {
return err
}
skipEntry, scanBoundaryReached, err := applyScanBoundary(e)
skipEntry, stopCopying, err := checkCopyBoundary(e)
if err != nil {
return err
}

if skipEntry || scanBoundaryReached {
if skipEntry || stopCopying {
// We will skip this entry or outright terminate kvfeed (if boundary reached).
// Regardless of the reason, we must release this event memory allocation
// since other ranges might not have reached scan boundary yet.
// since other ranges might not have reached copy boundary yet.
// Failure to release this event allocation may prevent other events from being
// enqueued in the blocking buffer due to memory limit.
a := e.DetachAlloc()
a.Release(ctx)
}

if scanBoundaryReached {
if stopCopying {
// All component rangefeeds are now at the boundary.
// Break out of the ctxgroup by returning the sentinel error.
// (We don't care if skipEntry is false -- scan boundary can only be
// (We don't care if skipEntry is false -- copy boundary can only be
// returned for resolved event, and we don't care if we emit this event
// since exiting with scan boundary error will cause appropriate
// since exiting with copy boundary error will cause appropriate
// boundary type (EXIT) to be emitted for the entire frontier)
return scanBoundary
return boundary
}

if skipEntry {
return nil
}
return addEntry(e)
return writeToDest(e)
}
)

for {
e, err := source.Get(ctx)
if err != nil {
return err
}
if err := copyEvent(e); err != nil {
if err := checkAndCopyEvent(e); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 4806184

Please sign in to comment.