Skip to content

Commit

Permalink
storage: expose separate WriteBatch interface
Browse files Browse the repository at this point in the history
Adapt NewUnindexedBatch to no longer take a writeOnly parameter. Instead, a new
NewWriteBatch method is exposed that returns a WriteBatch type that does not
provide Reader facilities. Future work may remove UnindexedBatch altogether,
updating callers to explicitly maintain separate Readers and WriteBatches.

Epic: None
Release note: None
  • Loading branch information
jbowens committed Mar 15, 2023
1 parent 0f743af commit ad639da
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestPebbleEncryption(t *testing.T) {
require.Equal(t, int32(enginepbccl.EncryptionType_AES128_CTR), stats.EncryptionType)
t.Logf("EnvStats:\n%+v\n\n", *stats)

batch := db.NewUnindexedBatch(true /* writeOnly */)
batch := db.NewWriteBatch()
defer batch.Close()
require.NoError(t, batch.PutUnversioned(roachpb.Key("a"), []byte("a")))
require.NoError(t, batch.Commit(true))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (sc *AbortSpan) max() roachpb.Key {

// ClearData removes all persisted items stored in the cache.
func (sc *AbortSpan) ClearData(e storage.Engine) error {
b := e.NewUnindexedBatch(false /* writeOnly */)
b := e.NewUnindexedBatch()
defer b.Close()
err := b.ClearMVCCIteratorRange(sc.min(), sc.max(), true /* pointKeys */, false /* rangeKeys */)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type SyncCallback interface {
func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
// Use an unindexed batch because we don't need to read our writes, and
// it is more efficient.
return eng.NewUnindexedBatch(false /* writeOnly */)
return eng.NewUnindexedBatch()
}

// StoreEntries persists newly appended Raft log Entries to the log storage,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (t *raftLogTruncator) tryEnactTruncations(
}
// Do the truncation of persistent raft entries, specified by enactIndex
// (this subsumes all the preceding queued truncations).
batch := t.store.getEngine().NewUnindexedBatch(false /* writeOnly */)
batch := t.store.getEngine().NewUnindexedBatch()
defer batch.Close()
apply, err := handleTruncatedStateBelowRaftPreApply(ctx, &truncState,
&pendingTruncs.mu.truncs[enactIndex].RaftTruncatedState, stateLoader, batch)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (t *RaftTransport) SendSnapshot(
storePool *storepool.StorePool,
header kvserverpb.SnapshotRequest_Header,
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
Expand All @@ -644,7 +644,7 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent)
return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent)
}

// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3111,8 +3111,8 @@ func (r *Replica) followerSendSnapshot(
Strategy: kvserverpb.SnapshotRequest_KV_BATCH,
Type: req.Type,
}
newBatchFn := func() storage.Batch {
return r.store.TODOEngine().NewUnindexedBatch(true /* writeOnly */)
newBatchFn := func() storage.WriteBatch {
return r.store.TODOEngine().NewWriteBatch()
}
sent := func() {
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
startTime := timeutil.Now()

ms := r.GetMVCCStats()
batch := r.store.TODOEngine().NewUnindexedBatch(true /* writeOnly */)
batch := r.store.TODOEngine().NewWriteBatch()
defer batch.Close()
desc := r.Desc()
inited := desc.IsInitialized()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ func getExpectedSnapshotSizeBytes(
}
defer snap.Close()

b := originStore.TODOEngine().NewUnindexedBatch(true)
b := originStore.TODOEngine().NewWriteBatch()
defer b.Close()

err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type kvBatchSnapshotStrategy struct {
// Limiter for sending KV batches. Only used on the sender side.
limiter *rate.Limiter
// Only used on the sender side.
newBatch func() storage.Batch
newWriteBatch func() storage.WriteBatch

// The approximate size of the SST chunk to buffer in memory on the receiver
// before flushing to disk. Only used on the receiver side.
Expand Down Expand Up @@ -535,7 +535,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(

// Iterate over all keys (point keys and range keys) and stream out batches of
// key-values.
var b storage.Batch
var b storage.WriteBatch
defer func() {
if b != nil {
b.Close()
Expand Down Expand Up @@ -572,7 +572,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
kvs++
if b == nil {
b = kvSS.newBatch()
b = kvSS.newWriteBatch()
}
key, err := iter.UnsafeEngineKey()
if err != nil {
Expand All @@ -599,7 +599,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
for _, rkv := range iter.EngineRangeKeys() {
rangeKVs++
if b == nil {
b = kvSS.newBatch()
b = kvSS.newWriteBatch()
}
err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value)
if err != nil {
Expand Down Expand Up @@ -635,7 +635,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
func (kvSS *kvBatchSnapshotStrategy) sendBatch(
ctx context.Context,
stream outgoingSnapshotStream,
batch storage.Batch,
batch storage.WriteBatch,
timerTag *snapshotTimingTag,
) error {
timerTag.start("rateLimit")
Expand Down Expand Up @@ -1468,7 +1468,7 @@ func SendEmptySnapshot(
noopStorePool{},
header,
&outgoingSnap,
eng.NewBatch,
eng.NewWriteBatch,
func() {},
nil, /* recordBytesSent */
)
Expand All @@ -1488,7 +1488,7 @@ func sendSnapshot(
storePool SnapshotStorePool,
header kvserverpb.SnapshotRequest_Header,
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
Expand Down Expand Up @@ -1555,10 +1555,10 @@ func sendSnapshot(
switch header.Strategy {
case kvserverpb.SnapshotRequest_KV_BATCH:
ss = &kvBatchSnapshotStrategy{
batchSize: batchSize,
limiter: limiter,
newBatch: newBatch,
st: st,
batchSize: batchSize,
limiter: limiter,
newWriteBatch: newWriteBatch,
st: st,
}
default:
log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3008,7 +3008,7 @@ func TestSendSnapshotThrottling(t *testing.T) {
Desc: &roachpb.RangeDescriptor{RangeID: 1},
},
}
newBatch := e.NewBatch
newBatch := e.NewWriteBatch

// Test that a failed Recv() causes a fail throttle
{
Expand Down
51 changes: 10 additions & 41 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func appender(s string) []byte {
return mustMarshal(v)
}

func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch) error) {
func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b WriteBatch) error) {
e := NewDefaultInMemForTesting()
defer e.Close()

var b Batch
var b WriteBatch
if writeOnly {
b = e.NewUnindexedBatch(true /* writeOnly */)
b = e.NewWriteBatch()
} else {
b = e.NewBatch()
}
Expand Down Expand Up @@ -107,9 +107,9 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch
{Key: mvccKey("c"), Value: appender("foobar")},
{Key: mvccKey("e"), Value: []byte{}},
}
if !writeOnly {
if r, ok := b.(Reader); !writeOnly && ok {
// Scan values from batch directly.
kvs, err = Scan(b, localMax, roachpb.KeyMax, 0)
kvs, err = Scan(r, localMax, roachpb.KeyMax, 0)
require.NoError(t, err)
require.Equal(t, expValues, kvs)
}
Expand All @@ -126,7 +126,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch
func TestBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error {
return b.Commit(false /* sync */)
})
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestReadOnlyBasics(t *testing.T) {
func TestBatchRepr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error {
repr := b.Repr()

r, err := NewPebbleBatchReader(repr)
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBatchRepr(t *testing.T) {
func TestWriteBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error {
testBatchBasics(t, true /* writeOnly */, func(e Engine, b WriteBatch) error {
return b.Commit(false /* sync */)
})
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestUnindexedBatchThatSupportsReader(t *testing.T) {
t.Fatal(err)
}

b := e.NewUnindexedBatch(false /* writeOnly */)
b := e.NewUnindexedBatch()
defer b.Close()
if err := b.PutUnversioned(mvccKey("b").Key, []byte("c")); err != nil {
t.Fatal(err)
Expand All @@ -684,37 +684,6 @@ func TestUnindexedBatchThatSupportsReader(t *testing.T) {
require.Equal(t, []byte("c"), mvccGetRaw(t, e, mvccKey("b")))
}

func TestUnindexedBatchThatDoesNotSupportReaderPanics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

e := NewDefaultInMemForTesting()
defer e.Close()

batch := e.NewUnindexedBatch(true /* writeOnly */)
defer batch.Close()

// The various Reader methods on the batch should panic.
a := mvccKey("a")
b := mvccKey("b")
testCases := []func(){
func() { _ = batch.MVCCIterate(a.Key, b.Key, MVCCKeyIterKind, IterKeyTypePointsOnly, nil) },
func() { _ = batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: roachpb.KeyMax}) },
}
for i, f := range testCases {
func() {
defer func(i int) {
if r := recover(); r == nil {
t.Fatalf("%d: test did not panic", i)
} else if r != "write-only batch" {
t.Fatalf("%d: unexpected panic: %v", i, r)
}
}(i)
f()
}()
}
}

func TestBatchIteration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -823,7 +792,7 @@ func TestBatchCombine(t *testing.T) {
}
k := fmt.Sprint(v)

b := e.NewUnindexedBatch(true /* writeOnly */)
b := e.NewWriteBatch()
if err := b.PutUnversioned(mvccKey(k).Key, []byte(k)); err != nil {
errs <- errors.Wrap(err, "put failed")
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func runClearRange(
b.ResetTimer()

for i := 0; i < b.N; i++ {
batch := eng.NewUnindexedBatch(false /* writeOnly */)
batch := eng.NewUnindexedBatch()
if err := clearRange(eng, batch, MVCCKey{Key: keys.LocalMax}, MVCCKeyMax); err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func runBatchApplyBatchRepr(
})
}

batch := eng.NewUnindexedBatch(true /* writeOnly */)
batch := eng.NewWriteBatch()
defer batch.Close() // NB: hold open so batch.Repr() doesn't get reused

for i := 0; i < batchSize; i++ {
Expand All @@ -1504,9 +1504,9 @@ func runBatchApplyBatchRepr(
b.ResetTimer()

for i := 0; i < b.N; i++ {
var batch Batch
var batch WriteBatch
if !indexed {
batch = eng.NewUnindexedBatch(true /* writeOnly */)
batch = eng.NewWriteBatch()
} else {
batch = eng.NewBatch()
}
Expand Down
34 changes: 21 additions & 13 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,21 +894,21 @@ type Engine interface {
// NewUnindexedBatch returns a new instance of a batched engine which wraps
// this engine. It is unindexed, in that writes to the batch are not
// visible to reads until after it commits. The batch accumulates all
// mutations and applies them atomically on a call to Commit(). Read
// operations return an error, unless writeOnly is set to false.
// mutations and applies them atomically on a call to Commit().
//
// When writeOnly is false, reads will be satisfied by reading from the
// underlying engine, i.e., the caller does not see its own writes. This
// setting should be used only when the caller is certain that this
// optimization is correct, and beneficial. There are subtleties here -- see
// the discussion on https://github.com/cockroachdb/cockroach/pull/57661 for
// more details.
// Reads will be satisfied by reading from the underlying engine, i.e., the
// caller does not see its own writes. This setting should be used only when
// the caller is certain that this optimization is correct, and beneficial.
// There are subtleties here -- see the discussion on
// https://github.com/cockroachdb/cockroach/pull/57661 for more details.
//
// TODO(sumeer): We should separate the writeOnly=true case into a
// separate method, that returns a WriteBatch interface. Even better would
// be not having an option to pass writeOnly=false, and have the caller
// explicitly work with a separate WriteBatch and Reader.
NewUnindexedBatch(writeOnly bool) Batch
// TODO(sumeer,jackson): Remove this method and force the caller to operate
// explicitly with a separate WriteBatch and Reader.
NewUnindexedBatch() Batch
// NewWriteBatch returns a new write batch that will commit to the
// underlying Engine. The batch accumulates all mutations and applies them
// atomically on a call to Commit().
NewWriteBatch() WriteBatch
// NewSnapshot returns a new instance of a read-only snapshot
// engine. Snapshots are instantaneous and, as long as they're
// released relatively quickly, inexpensive. Snapshots are released
Expand Down Expand Up @@ -975,6 +975,14 @@ type Batch interface {
// iterator has to be repositioned using a seek operation, after the
// mutations were done.
ReadWriter
WriteBatch
}

// WriteBatch is the interface for write batch specific operations.
type WriteBatch interface {
Writer
// Close closes the batch, freeing up any outstanding resources.
Close()
// Commit atomically applies any batched updates to the underlying
// engine. This is a noop unless the batch was created via NewBatch(). If
// sync is true, the batch is synchronously committed to disk.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ func TestEngineIteratorVisibility(t *testing.T) {
readOwnWrites: true,
},
"UnindexedBatch": {
makeReader: func(e Engine) Reader { return e.NewUnindexedBatch(false) },
makeReader: func(e Engine) Reader { return e.NewUnindexedBatch() },
expectConsistent: true,
canWrite: true,
readOwnWrites: false,
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ func (p *Pebble) ClearMVCCVersions(start, end MVCCKey) error {
// ClearMVCCIteratorRange implements the Engine interface.
func (p *Pebble) ClearMVCCIteratorRange(start, end roachpb.Key, pointKeys, rangeKeys bool) error {
// Write all the tombstones in one batch.
batch := p.NewUnindexedBatch(false /* writeOnly */)
batch := p.NewUnindexedBatch()
defer batch.Close()

if err := batch.ClearMVCCIteratorRange(start, end, pointKeys, rangeKeys); err != nil {
Expand Down Expand Up @@ -1820,8 +1820,13 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter {
}

// NewUnindexedBatch implements the Engine interface.
func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch {
return newPebbleBatch(p.db, p.db.NewBatch(), writeOnly, p.settings)
func (p *Pebble) NewUnindexedBatch() Batch {
return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings)
}

// NewWriteBatch implements the Engine interface.
func (p *Pebble) NewWriteBatch() WriteBatch {
return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings)
}

// NewSnapshot implements the Engine interface.
Expand Down

0 comments on commit ad639da

Please sign in to comment.