Skip to content

Commit

Permalink
kvserver: emit MVCC range tombstones over rangefeeds
Browse files Browse the repository at this point in the history
This patch adds MVCC range tombstone support in rangefeeds. Whenever an
MVCC range tombstone is written, a new `MVCCDeleteRangeOp` logical op
is recorded and emitted across the rangefeed as a `RangeFeedDeleteRange`
event. MVCC range tombstones will only be written when the
`MVCCRangeTombstones` version gate has been enabled.

Changefeeds will emit an error for these events. We do not expect to see
these in online spans with changefeeds, since they are initially only
planned for use with schema GC and import rollbacks.

The rangefeed client library has been extended with support for these
events, but no existing callers handle them for the same reason as
changefeeds. Initial scans do not emit regular tombstones, and thus not
range tombstones either, but catchup scans will emit them if
encountered.

This patch has rudimentary testing of MVCC range tombstones in
rangefeeds. A later patch will add a data-driven test harness for
rangefeeds with more exhaustive tests.

Release note: None
  • Loading branch information
erikgrinaker committed Jun 28, 2022
1 parent 443c4d5 commit 566dff2
Show file tree
Hide file tree
Showing 22 changed files with 450 additions and 32 deletions.
15 changes: 15 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,21 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
// expect SST ingestion into spans with active changefeeds.
return errors.Errorf("unexpected SST ingestion: %v", t)

case *roachpb.RangeFeedDeleteRange:
// For now, we just error on MVCC range tombstones. These are currently
// only expected to be used by schema GC and IMPORT INTO, and such spans
// should not have active changefeeds across them.
//
// TODO(erikgrinaker): Write an end-to-end test which verifies that an
// IMPORT INTO which gets rolled back using MVCC range tombstones will
// not be visible to a changefeed, neither when it was started before
// the import or when resuming from a timestamp before the import. The
// table decriptor should be marked as offline during the import, and
// catchup scans should detect that this happened and prevent reading
// anything in that timespan. See:
// https://github.com/cockroachdb/cockroach/issues/70433
return errors.Errorf("unexpected MVCC range deletion: %v", t)

default:
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
return err
}
default:
// TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones).
return errors.AssertionFailedf("unexpected event")
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_test(
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql/catalog/desctestutils",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type config struct {
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
onSSTable OnSSTable
onDeleteRange OnDeleteRange
extraPProfLabels []string
}

Expand Down Expand Up @@ -178,6 +179,24 @@ func WithOnSSTable(f OnSSTable) Option {
})
}

// OnDeleteRange is called when an MVCC range tombstone is written (e.g. when
// DeleteRange is called with UseExperimentalRangeTombstone, but not when the
// range is deleted using point tombstones). If this callback is not provided,
// an error is emitted when these are encountered.
//
// MVCC range tombstones are currently experimental, and requires the
// MVCCRangeTombstones version gate. They are only expected during certain
// operations like schema GC and IMPORT INTO (i.e. not across live tables).
type OnDeleteRange func(ctx context.Context, value *roachpb.RangeFeedDeleteRange)

// WithOnDeleteRange sets up a callback that's invoked whenever an MVCC range
// deletion tombstone is written.
func WithOnDeleteRange(f OnDeleteRange) Option {
return optionFunc(func(c *config) {
c.onDeleteRange = f
})
}

// OnFrontierAdvance is called when the rangefeed frontier is advanced with the
// new frontier timestamp.
type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp)
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ func (f *RangeFeed) processEvents(
"received unexpected rangefeed SST event with no OnSSTable handler")
}
f.onSSTable(ctx, ev.SST)
case ev.DeleteRange != nil:
if f.onDeleteRange == nil {
return errors.AssertionFailedf(
"received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s", ev)
}
f.onDeleteRange(ctx, ev.DeleteRange)
case ev.Error != nil:
// Intentionally do nothing, we'll get an error returned from the
// call to RangeFeed.
Expand Down
157 changes: 157 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sstutil"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -615,6 +616,162 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) {
require.Equal(t, expectKVs, seenKVs)
}

// TestWithOnDeleteRange tests that the rangefeed emits MVCC range tombstones.
//
// TODO(erikgrinaker): These kinds of tests should really use a data-driven test
// harness, for more exhaustive testing. But it'll do for now.
func TestWithOnDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)
db := srv.DB()

_, _, err := tc.SplitRange(roachpb.Key("a"))
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())

_, err = tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
f, err := rangefeed.NewFactory(srv.Stopper(), db, srv.ClusterSettings(), nil)
require.NoError(t, err)

// We lay down a few MVCC range tombstones and points. The first range
// tombstone should not be visible, because initial scans do not emit
// tombstones, nor should the points covered by it. The second range tombstone
// should be visible, because catchup scans do emit tombstones. The range
// tombstone should be ordered after the initial point, but before the foo
// catchup point, and the previous values should respect the range tombstones.
require.NoError(t, db.Put(ctx, "covered", "covered"))
require.NoError(t, db.Put(ctx, "foo", "covered"))
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z"))
require.NoError(t, db.Put(ctx, "foo", "initial"))
rangeFeedTS := db.Clock().Now()
require.NoError(t, db.Put(ctx, "covered", "catchup"))
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z"))
require.NoError(t, db.Put(ctx, "foo", "catchup"))

// We start the rangefeed over a narrower span than the DeleteRanges (c-g),
// to ensure the DeleteRange event is truncated to the registration span.
var checkpointOnce sync.Once
checkpointC := make(chan struct{})
deleteRangeC := make(chan *roachpb.RangeFeedDeleteRange)
rowC := make(chan *roachpb.RangeFeedValue)

spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}}
r, err := f.RangeFeed(ctx, "test", spans, rangeFeedTS,
func(ctx context.Context, e *roachpb.RangeFeedValue) {
select {
case rowC <- e:
case <-ctx.Done():
}
},
rangefeed.WithDiff(true),
rangefeed.WithInitialScan(nil),
rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) {
checkpointOnce.Do(func() {
close(checkpointC)
})
}),
rangefeed.WithOnDeleteRange(func(ctx context.Context, e *roachpb.RangeFeedDeleteRange) {
select {
case deleteRangeC <- e:
case <-ctx.Done():
}
}),
)
require.NoError(t, err)
defer r.Close()

// Wait for initial scan. We should see the foo=initial point, but not the
// range tombstone nor the covered points.
select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "initial", string(value))
prevValue, err := e.PrevValue.GetBytes()
require.NoError(t, err)
require.Equal(t, "initial", string(prevValue)) // initial scans supply current as prev
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for initial scan event")
}

// Wait for catchup scan. We should see the second range tombstone, truncated
// to the rangefeed bounds (c-g), and it should be ordered before the points
// covered=catchup and foo=catchup. both points should have a tombstone as the
// previous value.
select {
case e := <-deleteRangeC:
require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span)
require.NotEmpty(t, e.Timestamp)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

select {
case e := <-rowC:
require.Equal(t, roachpb.Key("covered"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "catchup", string(value))
prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes)
require.NoError(t, err)
require.True(t, prevValue.IsTombstone())
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for foo=catchup event")
}

select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "catchup", string(value))
prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes)
require.NoError(t, err)
require.True(t, prevValue.IsTombstone())
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for foo=catchup event")
}

// Wait for checkpoint after catchup scan.
select {
case <-checkpointC:
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for checkpoint")
}

// Send another DeleteRange, and wait for the rangefeed event. This should
// be truncated to the rangefeed bounds (c-g).
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z"))
select {
case e := <-deleteRangeC:
require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span)
require.NotEmpty(t, e.Timestamp)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

// A final point write should be emitted with a tombstone as the previous value.
require.NoError(t, db.Put(ctx, "foo", "final"))
select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "final", string(value))
prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes)
require.NoError(t, err)
require.True(t, prevValue.IsTombstone())
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for foo=final event")
}
}

// TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced
// to callers.
func TestUnrecoverableErrors(t *testing.T) {
Expand Down
86 changes: 72 additions & 14 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func NewCatchUpIterator(
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
storage.MVCCIncrementalIterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
StartKey: span.Key,
EndKey: span.EndKey,
StartTime: startTime,
EndTime: hlc.MaxTimestamp,
Expand Down Expand Up @@ -96,25 +98,23 @@ type outputEventFn func(e *roachpb.RangeFeedEvent) error

// CatchUpScan iterates over all changes in the configured key/time span, and
// emits them as RangeFeedEvents via outputFn in chronological order.
//
// MVCC range tombstones are emitted at their start key, in chronological order.
// Because the start key itself is not timestamped, these will be ordered before
// all of the timestamped point keys that they overlap. For more details, see
// MVCC range key info on storage.SimpleMVCCIterator.
//
// For example, with MVCC range tombstones [a-f)@5 and [a-f)@3 overlapping point
// keys a@6, a@4, and b@2, the emitted order is [a-f)@3,[a-f)@5,a@4,a@6,b@2 because
// the start key "a" is ordered before all of the timestamped point keys.
func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error {
var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
// events for the same key until a different key is encountered, then output
// the encountered values in reverse. This also allows us to buffer events
// as we fill in previous values.
var lastKey roachpb.Key
reorderBuf := make([]roachpb.RangeFeedEvent, 0, 5)
addPrevToLastEvent := func(val []byte) {
if l := len(reorderBuf); l > 0 {
if reorderBuf[l-1].Val.PrevValue.IsPresent() {
panic("RangeFeedValue.PrevVal unexpectedly set")
}
// TODO(sumeer): find out if it is deliberate that we are not populating
// PrevValue.Timestamp.
reorderBuf[l-1].Val.PrevValue.RawBytes = val
}
}

outputEvents := func() error {
for i := len(reorderBuf) - 1; i >= 0; i-- {
Expand All @@ -130,7 +130,9 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
// Iterate though all keys using Next. We want to publish all committed
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var lastKey roachpb.Key
var meta enginepb.MVCCMetadata
var rangeKeysStart roachpb.Key
i.SeekGE(storage.MVCCKey{Key: i.span.Key})
for {
if ok, err := i.Valid(); err != nil {
Expand All @@ -139,6 +141,45 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
break
}

hasPoint, hasRange := i.HasPointAndRange()

// Emit any new MVCC range tombstones when their start key is encountered.
// Range keys can currently only be MVCC range tombstones.
//
// TODO(erikgrinaker): Find a faster/better way to detect range key changes
// that doesn't involve constant comparisons. Pebble probably already knows,
// we just need a way to ask it.
if hasRange {
if rangeBounds := i.RangeBounds(); !rangeBounds.Key.Equal(rangeKeysStart) {
rangeKeysStart = append(rangeKeysStart[:0], rangeBounds.Key...)

// Emit events for these MVCC range tombstones, in chronological order.
rangeKeys := i.RangeKeys()
for i := len(rangeKeys) - 1; i >= 0; i-- {
var span roachpb.Span
a, span.Key = a.Copy(rangeBounds.Key, 0)
a, span.EndKey = a.Copy(rangeBounds.EndKey, 0)
err := outputFn(&roachpb.RangeFeedEvent{
DeleteRange: &roachpb.RangeFeedDeleteRange{
Span: span,
Timestamp: rangeKeys[i].RangeKey.Timestamp,
},
})
if err != nil {
return err
}
}
}
}

// If there's no point key here (i.e. we found a bare range key above), then
// step onto the next key. This may be a point key version at the same key
// as the range key's start bound, or a later point/range key.
if !hasPoint {
i.Next()
continue
}

unsafeKey := i.UnsafeKey()
unsafeValRaw := i.UnsafeValue()
if !unsafeKey.IsValue() {
Expand Down Expand Up @@ -217,9 +258,26 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
var val []byte
a, val = a.Copy(unsafeVal, 0)
if withDiff {
// Update the last version with its
// previous value (this version).
addPrevToLastEvent(val)
// Update the last version with its previous value (this version).
if l := len(reorderBuf) - 1; l >= 0 {
if reorderBuf[l].Val.PrevValue.IsPresent() {
return errors.AssertionFailedf("unexpected previous value %s for key %s",
reorderBuf[l].Val.PrevValue, key)
}
// If an MVCC range tombstone exists between this value and the next
// one, we don't emit the value after all -- it should be a tombstone.
//
// TODO(erikgrinaker): We can't save range keys when we detect changes
// to rangeKeysStart above, because NextIgnoringTime() could reveal
// additional MVCC range tombstones below StartTime that cover this
// point. We need to find a more performant way to handle this.
if !hasRange || !storage.HasRangeKeyBetween(
i.RangeKeys(), reorderBuf[l].Val.Value.Timestamp, ts) {
// TODO(sumeer): find out if it is deliberate that we are not populating
// PrevValue.Timestamp.
reorderBuf[l].Val.PrevValue.RawBytes = val
}
}
}

if !ignore {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo
opts.FS = vfs.Default
opts.LBaseMaxBytes = lBaseMaxBytes
opts.ReadOnly = readOnly
opts.FormatMajorVersion = pebble.FormatBlockPropertyCollector
opts.FormatMajorVersion = pebble.FormatRangeKeys
peb, err := storage.NewPebble(
context.Background(),
storage.PebbleConfig{
Expand Down
Loading

0 comments on commit 566dff2

Please sign in to comment.