Skip to content

Commit

Permalink
rangefeed: remove obsolete LegacyIntentScanner
Browse files Browse the repository at this point in the history
LegacyIntentScanner was only used in tests to prep data.
This PR removes it and replaces its use with properly
populated engine instead.

Epic: none
Fixes: cockroachdb#108278

Release note: None
  • Loading branch information
aliher1911 committed Sep 25, 2023
1 parent 3d76521 commit 083fa93
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 195 deletions.
137 changes: 102 additions & 35 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *kvpb.RangeFeedEve
})
}

type storeOp struct {
kv storage.MVCCKeyValue
txn *roachpb.Transaction
}

func makeTestEngineWithData(ops []storeOp) (storage.Engine, error) {
ctx := context.Background()
engine := storage.NewDefaultInMemForTesting()
for _, op := range ops {
kv := op.kv
err := storage.MVCCPut(ctx, engine, kv.Key.Key, kv.Key.Timestamp, roachpb.Value{RawBytes: kv.Value}, storage.MVCCWriteOptions{Txn: op.txn})
if err != nil {
engine.Close()
return nil, err
}
}
return engine, nil
}

const testProcessorEventCCap = 16
const testProcessorEventCTimeout = 10 * time.Millisecond

Expand Down Expand Up @@ -238,9 +257,13 @@ func withMetrics(m *Metrics) option {
}
}

func withRtsIter(rtsIter storage.SimpleMVCCIterator) option {
func withRtsScanner(scanner IntentScanner) option {
return func(config *testConfig) {
config.isc = makeIntentScannerConstructor(rtsIter)
if scanner != nil {
config.isc = func() IntentScanner {
return scanner
}
}
}
}

Expand Down Expand Up @@ -268,11 +291,52 @@ func withSpan(span roachpb.RSpan) option {
}
}

func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScannerConstructor {
if rtsIter == nil {
return nil
// blockingScanner is a test intent scanner that allows test to track lifecycle
// of tasks.
// 1. it will always block on startup and will wait for block to be closed to
// proceed
// 2. when closed it will close done channel to signal completion
type blockingScanner struct {
wrapped IntentScanner

block chan interface{}
done chan interface{}
}

func (s *blockingScanner) ConsumeIntents(
ctx context.Context, startKey roachpb.Key, endKey roachpb.Key, consumer eventConsumer,
) error {
if s.block != nil {
select {
case <-s.block:
case <-ctx.Done():
return ctx.Err()
}
}
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
return s.wrapped.ConsumeIntents(ctx, startKey, endKey, consumer)
}

func (s *blockingScanner) Close() {
s.wrapped.Close()
close(s.done)
}

func makeIntentScanner(data []storeOp, span roachpb.RSpan) (*blockingScanner, func(), error) {
engine, err := makeTestEngineWithData(data)
if err != nil {
return nil, nil, err
}
scanner, err := NewSeparatedIntentScanner(engine, span)
if err != nil {
return nil, nil, err
}
return &blockingScanner{
wrapped: scanner,
block: make(chan interface{}),
done: make(chan interface{}),
}, func() {
engine.Close()
}, nil
}

func newTestProcessor(
Expand Down Expand Up @@ -824,32 +888,36 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) {
txn1, txn2 := uuid.MakeV4(), uuid.MakeV4()
rtsIter := newTestIterator([]storage.MVCCKeyValue{
makeKV("a", "val1", 10),
makeIntent("c", txn1, "txnKey1", 15),
makeProvisionalKV("c", "txnKey1", 15),
makeKV("c", "val3", 11),
makeKV("c", "val4", 9),
makeIntent("d", txn2, "txnKey2", 21),
makeProvisionalKV("d", "txnKey2", 21),
makeKV("d", "val5", 20),
makeKV("d", "val6", 19),
makeKV("m", "val8", 1),
makeIntent("n", txn1, "txnKey1", 12),
makeProvisionalKV("n", "txnKey1", 12),
makeIntent("r", txn1, "txnKey1", 19),
makeProvisionalKV("r", "txnKey1", 19),
makeKV("r", "val9", 4),
makeIntent("w", txn1, "txnKey1", 3),
makeProvisionalKV("w", "txnKey1", 3),
makeIntent("z", txn2, "txnKey2", 21),
makeProvisionalKV("z", "txnKey2", 21),
makeKV("z", "val11", 4),
}, nil)
rtsIter.block = make(chan struct{})

p, h, stopper := newTestProcessor(t, withRtsIter(rtsIter), withProcType(pt))
txn1 := makeTxn("txn1", uuid.MakeV4(), isolation.Serializable, hlc.Timestamp{})
txn2 := makeTxn("txn2", uuid.MakeV4(), isolation.Serializable, hlc.Timestamp{})
txnWithTs := func(txn roachpb.Transaction, ts int64) *roachpb.Transaction {
txnTs := hlc.Timestamp{WallTime: ts}
txn.TxnMeta.MinTimestamp = txnTs
txn.TxnMeta.WriteTimestamp = txnTs
txn.ReadTimestamp = txnTs
return &txn
}
data := []storeOp{
{kv: makeKV("a", "val1", 10)},
{kv: makeKV("c", "val4", 9)},
{kv: makeKV("c", "val3", 11)},
{kv: makeProvisionalKV("c", "txnKey1", 15), txn: txnWithTs(txn1, 15)},
{kv: makeKV("d", "val6", 19)},
{kv: makeKV("d", "val5", 20)},
{kv: makeProvisionalKV("d", "txnKey2", 21), txn: txnWithTs(txn2, 21)},
{kv: makeKV("m", "val8", 1)},
{kv: makeProvisionalKV("n", "txnKey1", 12), txn: txnWithTs(txn1, 12)},
{kv: makeKV("r", "val9", 4)},
{kv: makeProvisionalKV("r", "txnKey1", 19), txn: txnWithTs(txn1, 19)},
{kv: makeProvisionalKV("w", "txnKey1", 3), txn: txnWithTs(txn1, 3)},
{kv: makeKV("z", "val11", 4)},
{kv: makeProvisionalKV("z", "txnKey2", 21), txn: txnWithTs(txn2, 21)},
}
scanner, cleanup, err := makeIntentScanner(data, roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("zz")})
require.NoError(t, err, "failed to prepare test data")
defer cleanup()

p, h, stopper := newTestProcessor(t, withRtsScanner(scanner), withProcType(pt))
ctx := context.Background()
defer stopper.Stop(ctx)

Expand Down Expand Up @@ -893,9 +961,8 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
require.Equal(t, hlc.Timestamp{}, h.rts.Get())

// Let the scan proceed.
close(rtsIter.block)
<-rtsIter.done
require.True(t, rtsIter.closed)
close(scanner.block)
<-scanner.done

// Synchronize the event channel then verify that the resolved timestamp is
// initialized and that it's blocked on the oldest unresolved intent's txn
Expand Down
73 changes: 0 additions & 73 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,79 +170,6 @@ func (s *SeparatedIntentScanner) ConsumeIntents(
// Close implements the IntentScanner interface.
func (s *SeparatedIntentScanner) Close() { s.iter.Close() }

// LegacyIntentScanner is an IntentScanner that assumes intents might
// not be separated.
//
// MVCCIterator Contract:
//
// The provided MVCCIterator must observe all intents in the Processor's keyspan.
// An important implication of this is that if the iterator is a
// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest
// known resolved timestamp, if one has ever been recorded. If one has never
// been recorded, the TimeBoundIterator cannot have any lower bound.
//
// The LegacyIntentScanner is unused outside of tests and will be removed as
// part of #108278.
type LegacyIntentScanner struct {
iter storage.SimpleMVCCIterator
}

// NewLegacyIntentScanner returns an IntentScanner appropriate for use
// when the separated intents migration has not yet completed.
func NewLegacyIntentScanner(iter storage.SimpleMVCCIterator) IntentScanner {
return &LegacyIntentScanner{iter: iter}
}

// ConsumeIntents implements the IntentScanner interface.
func (l *LegacyIntentScanner) ConsumeIntents(
ctx context.Context, start roachpb.Key, end roachpb.Key, consumer eventConsumer,
) error {
startKey := storage.MakeMVCCMetadataKey(start)
endKey := storage.MakeMVCCMetadataKey(end)
// Iterate through all keys using NextKey. This will look at the first MVCC
// version for each key. We're only looking for MVCCMetadata versions, which
// will always be the first version of a key if it exists, so its fine that
// we skip over all other versions of keys.
var meta enginepb.MVCCMetadata
for l.iter.SeekGE(startKey); ; l.iter.NextKey() {
if ok, err := l.iter.Valid(); err != nil {
return err
} else if !ok || !l.iter.UnsafeKey().Less(endKey) {
break
}

// If the key is not a metadata key, ignore it.
unsafeKey := l.iter.UnsafeKey()
if unsafeKey.IsValue() {
continue
}

// Found a metadata key. Unmarshal.
v, err := l.iter.UnsafeValue()
if err != nil {
return err
}
if err := protoutil.Unmarshal(v, &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey)
}

// If this is an intent, inform the Processor.
if meta.Txn != nil {
consumer(enginepb.MVCCWriteIntentOp{
TxnID: meta.Txn.ID,
TxnKey: meta.Txn.Key,
TxnIsoLevel: meta.Txn.IsoLevel,
TxnMinTimestamp: meta.Txn.MinTimestamp,
Timestamp: meta.Txn.WriteTimestamp,
})
}
}
return nil
}

// Close implements the IntentScanner interface.
func (l *LegacyIntentScanner) Close() { l.iter.Close() }

// TxnPusher is capable of pushing transactions to a new timestamp and
// cleaning up the intents of transactions that are found to be committed.
type TxnPusher interface {
Expand Down
Loading

0 comments on commit 083fa93

Please sign in to comment.