diff --git a/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go b/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go index a207b1e1d921..eef8784373ee 100644 --- a/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go +++ b/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go @@ -278,7 +278,7 @@ func TestPebbleEncryption(t *testing.T) { require.Equal(t, int32(enginepbccl.EncryptionType_AES128_CTR), stats.EncryptionType) t.Logf("EnvStats:\n%+v\n\n", *stats) - batch := db.NewUnindexedBatch(true /* writeOnly */) + batch := db.NewWriteBatch() defer batch.Close() require.NoError(t, batch.PutUnversioned(roachpb.Key("a"), []byte("a"))) require.NoError(t, batch.Commit(true)) diff --git a/pkg/kv/kvserver/abortspan/abortspan.go b/pkg/kv/kvserver/abortspan/abortspan.go index e93e6043c1d1..01942e11faa5 100644 --- a/pkg/kv/kvserver/abortspan/abortspan.go +++ b/pkg/kv/kvserver/abortspan/abortspan.go @@ -76,7 +76,7 @@ func (sc *AbortSpan) max() roachpb.Key { // ClearData removes all persisted items stored in the cache. func (sc *AbortSpan) ClearData(e storage.Engine) error { - b := e.NewUnindexedBatch(false /* writeOnly */) + b := e.NewUnindexedBatch() defer b.Close() err := b.ClearMVCCIteratorRange(sc.min(), sc.max(), true /* pointKeys */, false /* rangeKeys */) if err != nil { diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index a8986cf826f1..a621f71fb284 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -119,7 +119,7 @@ type SyncCallback interface { func newStoreEntriesBatch(eng storage.Engine) storage.Batch { // Use an unindexed batch because we don't need to read our writes, and // it is more efficient. - return eng.NewUnindexedBatch(false /* writeOnly */) + return eng.NewUnindexedBatch() } // StoreEntries persists newly appended Raft log Entries to the log storage, diff --git a/pkg/kv/kvserver/raft_log_truncator.go b/pkg/kv/kvserver/raft_log_truncator.go index 47c34d7e612d..17f2ffb790c5 100644 --- a/pkg/kv/kvserver/raft_log_truncator.go +++ b/pkg/kv/kvserver/raft_log_truncator.go @@ -551,7 +551,7 @@ func (t *raftLogTruncator) tryEnactTruncations( } // Do the truncation of persistent raft entries, specified by enactIndex // (this subsumes all the preceding queued truncations). - batch := t.store.getEngine().NewUnindexedBatch(false /* writeOnly */) + batch := t.store.getEngine().NewUnindexedBatch() defer batch.Close() apply, err := handleTruncatedStateBelowRaftPreApply(ctx, &truncState, &pendingTruncs.mu.truncs[enactIndex].RaftTruncatedState, stateLoader, batch) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 9fc321ff86a5..2d2ef8a41e31 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -623,7 +623,7 @@ func (t *RaftTransport) SendSnapshot( storePool *storepool.StorePool, header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, - newBatch func() storage.Batch, + newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, ) error { @@ -644,7 +644,7 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %+v", err) } }() - return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent) + return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent) } // DelegateSnapshot sends a DelegateSnapshotRequest to a remote store diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index db370553da9e..ccdd699a403a 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3111,8 +3111,8 @@ func (r *Replica) followerSendSnapshot( Strategy: kvserverpb.SnapshotRequest_KV_BATCH, Type: req.Type, } - newBatchFn := func() storage.Batch { - return r.store.TODOEngine().NewUnindexedBatch(true /* writeOnly */) + newBatchFn := func() storage.WriteBatch { + return r.store.TODOEngine().NewWriteBatch() } sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 022c72d50145..26a3ed2a6ed6 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -111,7 +111,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb startTime := timeutil.Now() ms := r.GetMVCCStats() - batch := r.store.TODOEngine().NewUnindexedBatch(true /* writeOnly */) + batch := r.store.TODOEngine().NewWriteBatch() defer batch.Close() desc := r.Desc() inited := desc.IsInitialized() diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 08f3b4252317..34c162f58b51 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -1995,7 +1995,7 @@ func getExpectedSnapshotSizeBytes( } defer snap.Close() - b := originStore.TODOEngine().NewUnindexedBatch(true) + b := originStore.TODOEngine().NewWriteBatch() defer b.Close() err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 348bd1bf182f..72f68241c430 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -149,7 +149,7 @@ type kvBatchSnapshotStrategy struct { // Limiter for sending KV batches. Only used on the sender side. limiter *rate.Limiter // Only used on the sender side. - newBatch func() storage.Batch + newWriteBatch func() storage.WriteBatch // The approximate size of the SST chunk to buffer in memory on the receiver // before flushing to disk. Only used on the receiver side. @@ -535,7 +535,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // Iterate over all keys (point keys and range keys) and stream out batches of // key-values. - var b storage.Batch + var b storage.WriteBatch defer func() { if b != nil { b.Close() @@ -572,7 +572,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { kvs++ if b == nil { - b = kvSS.newBatch() + b = kvSS.newWriteBatch() } key, err := iter.UnsafeEngineKey() if err != nil { @@ -599,7 +599,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( for _, rkv := range iter.EngineRangeKeys() { rangeKVs++ if b == nil { - b = kvSS.newBatch() + b = kvSS.newWriteBatch() } err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) if err != nil { @@ -635,7 +635,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( func (kvSS *kvBatchSnapshotStrategy) sendBatch( ctx context.Context, stream outgoingSnapshotStream, - batch storage.Batch, + batch storage.WriteBatch, timerTag *snapshotTimingTag, ) error { timerTag.start("rateLimit") @@ -1468,7 +1468,7 @@ func SendEmptySnapshot( noopStorePool{}, header, &outgoingSnap, - eng.NewBatch, + eng.NewWriteBatch, func() {}, nil, /* recordBytesSent */ ) @@ -1488,7 +1488,7 @@ func sendSnapshot( storePool SnapshotStorePool, header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, - newBatch func() storage.Batch, + newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, ) error { @@ -1555,10 +1555,10 @@ func sendSnapshot( switch header.Strategy { case kvserverpb.SnapshotRequest_KV_BATCH: ss = &kvBatchSnapshotStrategy{ - batchSize: batchSize, - limiter: limiter, - newBatch: newBatch, - st: st, + batchSize: batchSize, + limiter: limiter, + newWriteBatch: newWriteBatch, + st: st, } default: log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index e14fce15435b..790417acdadc 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3013,7 +3013,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Desc: &roachpb.RangeDescriptor{RangeID: 1}, }, } - newBatch := e.NewBatch + newBatch := e.NewWriteBatch // Test that a failed Recv() causes a fail throttle { diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 182b15595114..8eb6f26b9043 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -61,13 +61,13 @@ func appender(s string) []byte { return mustMarshal(v) } -func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch) error) { +func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b WriteBatch) error) { e := NewDefaultInMemForTesting() defer e.Close() - var b Batch + var b WriteBatch if writeOnly { - b = e.NewUnindexedBatch(true /* writeOnly */) + b = e.NewWriteBatch() } else { b = e.NewBatch() } @@ -107,9 +107,9 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch {Key: mvccKey("c"), Value: appender("foobar")}, {Key: mvccKey("e"), Value: []byte{}}, } - if !writeOnly { + if r, ok := b.(Reader); !writeOnly && ok { // Scan values from batch directly. - kvs, err = Scan(b, localMax, roachpb.KeyMax, 0) + kvs, err = Scan(r, localMax, roachpb.KeyMax, 0) require.NoError(t, err) require.Equal(t, expValues, kvs) } @@ -126,7 +126,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch func TestBatchBasics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error { + testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error { return b.Commit(false /* sync */) }) } @@ -248,7 +248,7 @@ func TestReadOnlyBasics(t *testing.T) { func TestBatchRepr(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error { + testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error { repr := b.Repr() r, err := NewPebbleBatchReader(repr) @@ -296,7 +296,7 @@ func TestBatchRepr(t *testing.T) { func TestWriteBatchBasics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error { + testBatchBasics(t, true /* writeOnly */, func(e Engine, b WriteBatch) error { return b.Commit(false /* sync */) }) } @@ -661,7 +661,7 @@ func TestUnindexedBatchThatSupportsReader(t *testing.T) { t.Fatal(err) } - b := e.NewUnindexedBatch(false /* writeOnly */) + b := e.NewUnindexedBatch() defer b.Close() if err := b.PutUnversioned(mvccKey("b").Key, []byte("c")); err != nil { t.Fatal(err) @@ -684,22 +684,26 @@ func TestUnindexedBatchThatSupportsReader(t *testing.T) { require.Equal(t, []byte("c"), mvccGetRaw(t, e, mvccKey("b"))) } -func TestUnindexedBatchThatDoesNotSupportReaderPanics(t *testing.T) { +func TestWriteBatchPanicsAsReader(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) e := NewDefaultInMemForTesting() defer e.Close() - batch := e.NewUnindexedBatch(true /* writeOnly */) + batch := e.NewWriteBatch() defer batch.Close() + // The underlying type returned by NewWriteBatch does implement Reader. + // Ensure that if a user coerces the WriteBatch into a Reader, it panics. + r := batch.(Reader) + // The various Reader methods on the batch should panic. a := mvccKey("a") b := mvccKey("b") testCases := []func(){ - func() { _ = batch.MVCCIterate(a.Key, b.Key, MVCCKeyIterKind, IterKeyTypePointsOnly, nil) }, - func() { _ = batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: roachpb.KeyMax}) }, + func() { _ = r.MVCCIterate(a.Key, b.Key, MVCCKeyIterKind, IterKeyTypePointsOnly, nil) }, + func() { _ = r.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: roachpb.KeyMax}) }, } for i, f := range testCases { func() { @@ -823,7 +827,7 @@ func TestBatchCombine(t *testing.T) { } k := fmt.Sprint(v) - b := e.NewUnindexedBatch(true /* writeOnly */) + b := e.NewWriteBatch() if err := b.PutUnversioned(mvccKey(k).Key, []byte(k)); err != nil { errs <- errors.Wrap(err, "put failed") return diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 5d7a5b8ff6ea..d747464f79b6 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -1268,7 +1268,7 @@ func runClearRange( b.ResetTimer() for i := 0; i < b.N; i++ { - batch := eng.NewUnindexedBatch(false /* writeOnly */) + batch := eng.NewUnindexedBatch() if err := clearRange(eng, batch, MVCCKey{Key: keys.LocalMax}, MVCCKeyMax); err != nil { b.Fatal(err) } @@ -1487,7 +1487,7 @@ func runBatchApplyBatchRepr( }) } - batch := eng.NewUnindexedBatch(true /* writeOnly */) + batch := eng.NewWriteBatch() defer batch.Close() // NB: hold open so batch.Repr() doesn't get reused for i := 0; i < batchSize; i++ { @@ -1504,9 +1504,9 @@ func runBatchApplyBatchRepr( b.ResetTimer() for i := 0; i < b.N; i++ { - var batch Batch + var batch WriteBatch if !indexed { - batch = eng.NewUnindexedBatch(true /* writeOnly */) + batch = eng.NewWriteBatch() } else { batch = eng.NewBatch() } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index dde08f12a2d3..d4d621a2d0df 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -894,21 +894,21 @@ type Engine interface { // NewUnindexedBatch returns a new instance of a batched engine which wraps // this engine. It is unindexed, in that writes to the batch are not // visible to reads until after it commits. The batch accumulates all - // mutations and applies them atomically on a call to Commit(). Read - // operations return an error, unless writeOnly is set to false. + // mutations and applies them atomically on a call to Commit(). // - // When writeOnly is false, reads will be satisfied by reading from the - // underlying engine, i.e., the caller does not see its own writes. This - // setting should be used only when the caller is certain that this - // optimization is correct, and beneficial. There are subtleties here -- see - // the discussion on https://github.com/cockroachdb/cockroach/pull/57661 for - // more details. + // Reads will be satisfied by reading from the underlying engine, i.e., the + // caller does not see its own writes. This setting should be used only when + // the caller is certain that this optimization is correct, and beneficial. + // There are subtleties here -- see the discussion on + // https://github.com/cockroachdb/cockroach/pull/57661 for more details. // - // TODO(sumeer): We should separate the writeOnly=true case into a - // separate method, that returns a WriteBatch interface. Even better would - // be not having an option to pass writeOnly=false, and have the caller - // explicitly work with a separate WriteBatch and Reader. - NewUnindexedBatch(writeOnly bool) Batch + // TODO(sumeer,jackson): Remove this method and force the caller to operate + // explicitly with a separate WriteBatch and Reader. + NewUnindexedBatch() Batch + // NewWriteBatch returns a new write batch that will commit to the + // underlying Engine. The batch accumulates all mutations and applies them + // atomically on a call to Commit(). + NewWriteBatch() WriteBatch // NewSnapshot returns a new instance of a read-only snapshot // engine. Snapshots are instantaneous and, as long as they're // released relatively quickly, inexpensive. Snapshots are released @@ -974,7 +974,15 @@ type Batch interface { // iterator creation. To guarantee that they see all the mutations, the // iterator has to be repositioned using a seek operation, after the // mutations were done. - ReadWriter + Reader + WriteBatch +} + +// WriteBatch is the interface for write batch specific operations. +type WriteBatch interface { + Writer + // Close closes the batch, freeing up any outstanding resources. + Close() // Commit atomically applies any batched updates to the underlying // engine. This is a noop unless the batch was created via NewBatch(). If // sync is true, the batch is synchronously committed to disk. diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 7302fdf2a153..c53cb0470700 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -1748,7 +1748,7 @@ func TestEngineIteratorVisibility(t *testing.T) { readOwnWrites: true, }, "UnindexedBatch": { - makeReader: func(e Engine) Reader { return e.NewUnindexedBatch(false) }, + makeReader: func(e Engine) Reader { return e.NewUnindexedBatch() }, expectConsistent: true, canWrite: true, readOwnWrites: false, diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index cf1adcc95481..d24762cbb81a 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1447,7 +1447,7 @@ func (p *Pebble) ClearMVCCVersions(start, end MVCCKey) error { // ClearMVCCIteratorRange implements the Engine interface. func (p *Pebble) ClearMVCCIteratorRange(start, end roachpb.Key, pointKeys, rangeKeys bool) error { // Write all the tombstones in one batch. - batch := p.NewUnindexedBatch(false /* writeOnly */) + batch := p.NewUnindexedBatch() defer batch.Close() if err := batch.ClearMVCCIteratorRange(start, end, pointKeys, rangeKeys); err != nil { @@ -1842,8 +1842,13 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter { } // NewUnindexedBatch implements the Engine interface. -func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch { - return newPebbleBatch(p.db, p.db.NewBatch(), writeOnly, p.settings) +func (p *Pebble) NewUnindexedBatch() Batch { + return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings) +} + +// NewWriteBatch implements the Engine interface. +func (p *Pebble) NewWriteBatch() WriteBatch { + return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings) } // NewSnapshot implements the Engine interface.