diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index ef126ed268ce..db870460321c 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1934,7 +1934,7 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions iter = &p.prefixIter } if iter.inuse { - panic("iterator already in use") + return newPebbleIterator(p.parent.db, p.iter, opts, p.durability) } // Ensures no timestamp hints etc. checkOptionsForIterReuse(opts) @@ -1970,7 +1970,7 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { iter = &p.prefixEngineIter } if iter.inuse { - panic("iterator already in use") + return newPebbleIterator(p.parent.db, p.iter, opts, p.durability) } // Ensures no timestamp hints etc. checkOptionsForIterReuse(opts) diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index cd20b3cdf5ca..34f2d7d1e023 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -228,8 +228,12 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if opts.Prefix { iter = &p.prefixIter } + handle := pebble.Reader(p.batch) + if !p.batch.Indexed() { + handle = p.db + } if iter.inuse { - panic("iterator already in use") + return newPebbleIterator(handle, p.iter, opts, StandardDurability) } // Ensures no timestamp hints etc. checkOptionsForIterReuse(opts) @@ -237,11 +241,7 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - if p.batch.Indexed() { - iter.init(p.batch, p.iter, p.iterUnused, opts, StandardDurability) - } else { - iter.init(p.db, p.iter, p.iterUnused, opts, StandardDurability) - } + iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -271,8 +271,12 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { if opts.Prefix { iter = &p.prefixEngineIter } + handle := pebble.Reader(p.batch) + if !p.batch.Indexed() { + handle = p.db + } if iter.inuse { - panic("iterator already in use") + return newPebbleIterator(handle, p.iter, opts, StandardDurability) } // Ensures no timestamp hints etc. checkOptionsForIterReuse(opts) @@ -280,11 +284,7 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - if p.batch.Indexed() { - iter.init(p.batch, p.iter, p.iterUnused, opts, StandardDurability) - } else { - iter.init(p.db, p.iter, p.iterUnused, opts, StandardDurability) - } + iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability) if p.iter == nil { // For future cloning. p.iter = iter.iter diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 19831b167079..9ac2069a46a8 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1118,3 +1119,78 @@ func TestPebbleFlushCallbackAndDurabilityRequirement(t *testing.T) { defer roGuaranteed2.Close() require.Equal(t, "a1", string(checkGetAndIter(roGuaranteed2))) } + +// TestPebbleReaderMultipleIterators tests that all Pebble readers support +// multiple concurrent iterators of the same type. +func TestPebbleReaderMultipleIterators(t *testing.T) { + defer leaktest.AfterTest(t)() + + eng := NewDefaultInMemForTesting() + defer eng.Close() + + a1 := MVCCKey{Key: roachpb.Key("a"), Timestamp: hlc.Timestamp{WallTime: 1}} + b1 := MVCCKey{Key: roachpb.Key("b"), Timestamp: hlc.Timestamp{WallTime: 1}} + c1 := MVCCKey{Key: roachpb.Key("c"), Timestamp: hlc.Timestamp{WallTime: 1}} + + require.NoError(t, eng.PutMVCC(a1, []byte{1})) + require.NoError(t, eng.PutMVCC(b1, []byte{2})) + require.NoError(t, eng.PutMVCC(c1, []byte{3})) + + readOnly := eng.NewReadOnly(StandardDurability) + defer readOnly.Close() + require.NoError(t, readOnly.PinEngineStateForIterators()) + + snapshot := eng.NewSnapshot() + defer snapshot.Close() + require.NoError(t, snapshot.PinEngineStateForIterators()) + + batch := eng.NewBatch() + defer batch.Close() + require.NoError(t, batch.PinEngineStateForIterators()) + + // These writes should not be visible to any of the pinned iterators. + require.NoError(t, eng.PutMVCC(a1, []byte{9})) + require.NoError(t, eng.PutMVCC(b1, []byte{9})) + require.NoError(t, eng.PutMVCC(c1, []byte{9})) + + testcases := map[string]Reader{ + "Engine": eng, + "ReadOnly": readOnly, + "Snapshot": snapshot, + "Batch": batch, + } + for name, r := range testcases { + t.Run(name, func(t *testing.T) { + // Make sure we can create two iterators of the same type. + i1 := r.NewMVCCIterator(MVCCKeyIterKind, IterOptions{LowerBound: a1.Key, UpperBound: keys.MaxKey}) + i2 := r.NewMVCCIterator(MVCCKeyIterKind, IterOptions{LowerBound: b1.Key, UpperBound: keys.MaxKey}) + + // Make sure the iterators are independent. + i1.SeekGE(a1) + i2.SeekGE(a1) + require.Equal(t, a1, i1.UnsafeKey()) + require.Equal(t, b1, i2.UnsafeKey()) // b1 because of LowerBound + + // Check iterator consistency. + if r.ConsistentIterators() { + require.Equal(t, []byte{1}, i1.UnsafeValue()) + require.Equal(t, []byte{2}, i2.UnsafeValue()) + } else { + require.Equal(t, []byte{9}, i1.UnsafeValue()) + require.Equal(t, []byte{9}, i2.UnsafeValue()) + } + + // Closing one iterator shouldn't affect the other. + i1.Close() + i2.Next() + require.Equal(t, c1, i2.UnsafeKey()) + i2.Close() + + // Quick check for engine iterators too. + e1 := r.NewEngineIterator(IterOptions{UpperBound: keys.MaxKey}) + defer e1.Close() + e2 := r.NewEngineIterator(IterOptions{UpperBound: keys.MaxKey}) + defer e2.Close() + }) + } +}