Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
73487: kvserver: emit `AddSSTable` events via rangefeeds r=miretskiy a=erikgrinaker

**testutils: add sstutil package**

This patch adds a `testutils/sstutil` package with SST-related test
utilities.

Release note: None

**kvserver: disconnect rangefeeds on MVCC history mutations**

This patch adds a field `MVCCHistoryMutation` to `ReplicatedEvalResult`,
which disconnects any rangefeeds that overlap with the command span.
Callers are expected to ensure there are no rangefeeds over such spans,
but if they fail to do so it is better to error out rather than silently
pretend like nothing happened. The field is set for `AddSSTable`,
`ClearRange`, and `RevertRange` requests.

This error is exposed via the `OnInternalError` callback for
`rangefeed.Factory`-based rangefeed clients. However, if this callback
is not set, these clients will silently stop processing events. This is
unfortunate, but follows from the API design. Most callers do not appear
to set such a callback, and it is left for the owning teams to update
the usage of the library with appropriate error handling. Furthermore,
this error detection requires knowledge about the new
`MVCCHistoryMutationError` error type, so older nodes in mixed-version
clusters will simply treat this as a retryable error.

Touches cockroachdb#70434.

Release note: None

**kvserver: emit AddSSTable events via rangefeeds**

This patch emits `AddSSTable` events across rangefeeds when the
ingestion was done with `WriteAtRequestTimestamp` enabled (since this
ensures they respect the closed timestamp). This setting is introduced
with 22.1, and callers must check the `MVCCAddSStable` version gate
before using it, so by extension this event is only emitted once the
entire cluster runs 22.1.

Clients built via `rangefeed.Factory` have a new `WithOnSSTable` option
that can be used to register a callback for these events. If such a
callback is not set, the rangefeed will run a catchup scan that includes
the values written by the `AddSSTable` request.

The entire SST is emitted in binary form, regardless of how the
registration span overlaps it -- it is up to callers to prune the SST
contents as appropriate. Previous values are not included for keys
replaced by the SST.

Resolves cockroachdb#70434.

Release note: None

---

Initial draft. A few outstanding questions:

1. ~~Changefeeds will error on these SST events. We currently do not expect to ingest SSTs into online tables, and thus we do not expect changefeeds to see these events. Now that we have MVCC-compliant `AddSSTable` we _could_ begin to use `AddSSTable` into online tables, and thus the changefeeds would need to handle this (note here that the SST events do not contain previous value diffs), but I figure we can cross that bridge when we get there. Does this make sense?~~

2. ~~Internal consumers that subscribe via `rangefeed.Factory` must register an `OnSSTable` handler, otherwise they will not be notified about such events. I'm not sure if we ever expect to ingest SSTs into e.g. online system spans and such, but we may want to consider sending such subscribers an error if they haven't registered `OnSSTable` rather than silently ignoring them.~~

3. These events are likely to be much larger than regular key/value events, and thus there is a greater risk of excessive memory usage since the buffer size per replica is 4096 events. The SSTs are already in memory due to Raft application, but buffering them here can cause them to stick around and pile up. We should consider adding memory accounting here, and use a store-wide memory limit rather than a per-replica fixed-size queue. See: cockroachdb#73616.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Jan 27, 2022
2 parents c2023ef + f048ab0 commit 11da0cf
Show file tree
Hide file tree
Showing 33 changed files with 1,317 additions and 336 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
); err != nil {
return err
}
case *roachpb.RangeFeedSSTable:
// For now, we just error on SST ingestion, since we currently don't
// expect SST ingestion into spans with active changefeeds.
return errors.Errorf("unexpected SST ingestion: %v", t)

default:
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (a *activeRangeFeed) onRangeEvent(
) {
a.Lock()
defer a.Unlock()
if event.Val != nil {
if event.Val != nil || event.SST != nil {
a.LastValueReceived = timeutil.Now()
} else if event.Checkpoint != nil {
a.Resolved = event.Checkpoint.ResolvedTS
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ go_test(
"//build/bazelutil:noop",
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -92,6 +93,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type config struct {
onUnrecoverableError OnUnrecoverableError
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
onSSTable OnSSTable
extraPProfLabels []string
}

Expand Down Expand Up @@ -134,6 +135,31 @@ func WithOnCheckpoint(f OnCheckpoint) Option {
})
}

// OnSSTable is called when an SSTable is ingested. If this callback is not
// provided, a catchup scan will be run instead that will include the contents
// of these SSTs.
//
// Note that the SST is emitted as it was ingested, so it may contain keys
// outside of the rangefeed span, and the caller should prune the SST contents
// as appropriate. Futhermore, these events do not contain previous values as
// requested by WithDiff, and callers must obtain these themselves if needed.
//
// Also note that AddSSTable requests that do not set the
// WriteAtRequestTimestamp flag, possibly writing below the closed timestamp,
// will cause affected rangefeeds to be disconnected with a terminal
// MVCCHistoryMutationError and thus will not be emitted here -- there should be
// no such requests into spans with rangefeeds across them, but it is up to
// callers to ensure this.
type OnSSTable func(ctx context.Context, sst *roachpb.RangeFeedSSTable)

// WithOnSSTable sets up a callback that's invoked whenever an SSTable is
// ingested.
func WithOnSSTable(f OnSSTable) Option {
return optionFunc(func(c *config) {
c.onSSTable = f
})
}

// OnFrontierAdvance is called when the rangefeed frontier is advanced with the
// new frontier timestamp.
type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp)
Expand Down
39 changes: 17 additions & 22 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -270,7 +271,6 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
// TODO(ajwerner): Consider adding event buffering. Doing so would require
// draining when the rangefeed fails.
eventCh := make(chan *roachpb.RangeFeedEvent)
errCh := make(chan error)

for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
Expand All @@ -280,19 +280,16 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {

start := timeutil.Now()

// Note that the below channel send will not block forever because
// processEvents will wait for the worker to send. RunWorker is safe here
// because processEvents is guaranteed to consume the error before
// returning.
if err := f.stopper.RunAsyncTask(ctx, "rangefeed", func(ctx context.Context) {
errCh <- f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh)
}); err != nil {
log.VEventf(ctx, 1, "exiting rangefeed due to stopper")
return
rangeFeedTask := func(ctx context.Context) error {
return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh)
}
processEventsTask := func(ctx context.Context) error {
return f.processEvents(ctx, frontier, eventCh)
}

err := f.processEvents(ctx, frontier, eventCh, errCh)
if errors.HasType(err, &roachpb.BatchTimestampBeforeGCError{}) {
err := ctxgroup.GoAndWait(ctx, rangeFeedTask, processEventsTask)
if errors.HasType(err, &roachpb.BatchTimestampBeforeGCError{}) ||
errors.HasType(err, &roachpb.MVCCHistoryMutationError{}) {
if errCallback := f.onUnrecoverableError; errCallback != nil {
errCallback(ctx, err)
}
Expand Down Expand Up @@ -325,13 +322,9 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
}
}

// processEvents processes events sent by the rangefeed on the eventCh. It waits
// for the rangefeed to signal that it has exited by sending on errCh.
// processEvents processes events sent by the rangefeed on the eventCh.
func (f *RangeFeed) processEvents(
ctx context.Context,
frontier *span.Frontier,
eventCh <-chan *roachpb.RangeFeedEvent,
errCh <-chan error,
ctx context.Context, frontier *span.Frontier, eventCh <-chan *roachpb.RangeFeedEvent,
) error {
for {
select {
Expand All @@ -350,16 +343,18 @@ func (f *RangeFeed) processEvents(
if advanced && f.onFrontierAdvance != nil {
f.onFrontierAdvance(ctx, frontier.Frontier())
}
case ev.SST != nil:
if f.onSSTable == nil {
return errors.AssertionFailedf(
"received unexpected rangefeed SST event with no OnSSTable handler")
}
f.onSSTable(ctx, ev.SST)
case ev.Error != nil:
// Intentionally do nothing, we'll get an error returned from the
// call to RangeFeed.
}
case <-ctx.Done():
// Ensure that the RangeFeed goroutine stops.
<-errCh
return ctx.Err()
case err := <-errCh:
return err
}
}
}
Loading

0 comments on commit 11da0cf

Please sign in to comment.