Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: expose separate WriteBatch interface #98712

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestPebbleEncryption(t *testing.T) {
require.Equal(t, int32(enginepbccl.EncryptionType_AES128_CTR), stats.EncryptionType)
t.Logf("EnvStats:\n%+v\n\n", *stats)

batch := db.NewUnindexedBatch(true /* writeOnly */)
batch := db.NewWriteBatch()
defer batch.Close()
require.NoError(t, batch.PutUnversioned(roachpb.Key("a"), []byte("a")))
require.NoError(t, batch.Commit(true))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (sc *AbortSpan) max() roachpb.Key {

// ClearData removes all persisted items stored in the cache.
func (sc *AbortSpan) ClearData(e storage.Engine) error {
b := e.NewUnindexedBatch(false /* writeOnly */)
b := e.NewUnindexedBatch()
defer b.Close()
err := b.ClearMVCCIteratorRange(sc.min(), sc.max(), true /* pointKeys */, false /* rangeKeys */)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type SyncCallback interface {
func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
// Use an unindexed batch because we don't need to read our writes, and
// it is more efficient.
return eng.NewUnindexedBatch(false /* writeOnly */)
return eng.NewUnindexedBatch()
}

// StoreEntries persists newly appended Raft log Entries to the log storage,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (t *raftLogTruncator) tryEnactTruncations(
}
// Do the truncation of persistent raft entries, specified by enactIndex
// (this subsumes all the preceding queued truncations).
batch := t.store.getEngine().NewUnindexedBatch(false /* writeOnly */)
batch := t.store.getEngine().NewUnindexedBatch()
defer batch.Close()
apply, err := handleTruncatedStateBelowRaftPreApply(ctx, &truncState,
&pendingTruncs.mu.truncs[enactIndex].RaftTruncatedState, stateLoader, batch)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (t *RaftTransport) SendSnapshot(
storePool *storepool.StorePool,
header kvserverpb.SnapshotRequest_Header,
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
Expand All @@ -644,7 +644,7 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent)
return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent)
}

// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3111,8 +3111,8 @@ func (r *Replica) followerSendSnapshot(
Strategy: kvserverpb.SnapshotRequest_KV_BATCH,
Type: req.Type,
}
newBatchFn := func() storage.Batch {
return r.store.TODOEngine().NewUnindexedBatch(true /* writeOnly */)
newBatchFn := func() storage.WriteBatch {
return r.store.TODOEngine().NewWriteBatch()
}
sent := func() {
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
startTime := timeutil.Now()

ms := r.GetMVCCStats()
batch := r.store.TODOEngine().NewUnindexedBatch(true /* writeOnly */)
batch := r.store.TODOEngine().NewWriteBatch()
defer batch.Close()
desc := r.Desc()
inited := desc.IsInitialized()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,7 @@ func getExpectedSnapshotSizeBytes(
}
defer snap.Close()

b := originStore.TODOEngine().NewUnindexedBatch(true)
b := originStore.TODOEngine().NewWriteBatch()
defer b.Close()

err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type kvBatchSnapshotStrategy struct {
// Limiter for sending KV batches. Only used on the sender side.
limiter *rate.Limiter
// Only used on the sender side.
newBatch func() storage.Batch
newWriteBatch func() storage.WriteBatch

// The approximate size of the SST chunk to buffer in memory on the receiver
// before flushing to disk. Only used on the receiver side.
Expand Down Expand Up @@ -535,7 +535,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(

// Iterate over all keys (point keys and range keys) and stream out batches of
// key-values.
var b storage.Batch
var b storage.WriteBatch
defer func() {
if b != nil {
b.Close()
Expand Down Expand Up @@ -572,7 +572,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
kvs++
if b == nil {
b = kvSS.newBatch()
b = kvSS.newWriteBatch()
}
key, err := iter.UnsafeEngineKey()
if err != nil {
Expand All @@ -599,7 +599,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
for _, rkv := range iter.EngineRangeKeys() {
rangeKVs++
if b == nil {
b = kvSS.newBatch()
b = kvSS.newWriteBatch()
}
err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value)
if err != nil {
Expand Down Expand Up @@ -635,7 +635,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
func (kvSS *kvBatchSnapshotStrategy) sendBatch(
ctx context.Context,
stream outgoingSnapshotStream,
batch storage.Batch,
batch storage.WriteBatch,
timerTag *snapshotTimingTag,
) error {
timerTag.start("rateLimit")
Expand Down Expand Up @@ -1468,7 +1468,7 @@ func SendEmptySnapshot(
noopStorePool{},
header,
&outgoingSnap,
eng.NewBatch,
eng.NewWriteBatch,
func() {},
nil, /* recordBytesSent */
)
Expand All @@ -1488,7 +1488,7 @@ func sendSnapshot(
storePool SnapshotStorePool,
header kvserverpb.SnapshotRequest_Header,
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
Expand Down Expand Up @@ -1555,10 +1555,10 @@ func sendSnapshot(
switch header.Strategy {
case kvserverpb.SnapshotRequest_KV_BATCH:
ss = &kvBatchSnapshotStrategy{
batchSize: batchSize,
limiter: limiter,
newBatch: newBatch,
st: st,
batchSize: batchSize,
limiter: limiter,
newWriteBatch: newWriteBatch,
st: st,
}
default:
log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3013,7 +3013,7 @@ func TestSendSnapshotThrottling(t *testing.T) {
Desc: &roachpb.RangeDescriptor{RangeID: 1},
},
}
newBatch := e.NewBatch
newBatch := e.NewWriteBatch

// Test that a failed Recv() causes a fail throttle
{
Expand Down
32 changes: 18 additions & 14 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func appender(s string) []byte {
return mustMarshal(v)
}

func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch) error) {
func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b WriteBatch) error) {
e := NewDefaultInMemForTesting()
defer e.Close()

var b Batch
var b WriteBatch
if writeOnly {
b = e.NewUnindexedBatch(true /* writeOnly */)
b = e.NewWriteBatch()
} else {
b = e.NewBatch()
}
Expand Down Expand Up @@ -107,9 +107,9 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch
{Key: mvccKey("c"), Value: appender("foobar")},
{Key: mvccKey("e"), Value: []byte{}},
}
if !writeOnly {
if r, ok := b.(Reader); !writeOnly && ok {
// Scan values from batch directly.
kvs, err = Scan(b, localMax, roachpb.KeyMax, 0)
kvs, err = Scan(r, localMax, roachpb.KeyMax, 0)
require.NoError(t, err)
require.Equal(t, expValues, kvs)
}
Expand All @@ -126,7 +126,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch
func TestBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error {
return b.Commit(false /* sync */)
})
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestReadOnlyBasics(t *testing.T) {
func TestBatchRepr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error {
repr := b.Repr()

r, err := NewPebbleBatchReader(repr)
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBatchRepr(t *testing.T) {
func TestWriteBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error {
testBatchBasics(t, true /* writeOnly */, func(e Engine, b WriteBatch) error {
return b.Commit(false /* sync */)
})
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestUnindexedBatchThatSupportsReader(t *testing.T) {
t.Fatal(err)
}

b := e.NewUnindexedBatch(false /* writeOnly */)
b := e.NewUnindexedBatch()
defer b.Close()
if err := b.PutUnversioned(mvccKey("b").Key, []byte("c")); err != nil {
t.Fatal(err)
Expand All @@ -684,22 +684,26 @@ func TestUnindexedBatchThatSupportsReader(t *testing.T) {
require.Equal(t, []byte("c"), mvccGetRaw(t, e, mvccKey("b")))
}

func TestUnindexedBatchThatDoesNotSupportReaderPanics(t *testing.T) {
func TestWriteBatchPanicsAsReader(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

e := NewDefaultInMemForTesting()
defer e.Close()

batch := e.NewUnindexedBatch(true /* writeOnly */)
batch := e.NewWriteBatch()
defer batch.Close()

// The underlying type returned by NewWriteBatch does implement Reader.
// Ensure that if a user coerces the WriteBatch into a Reader, it panics.
r := batch.(Reader)

// The various Reader methods on the batch should panic.
a := mvccKey("a")
b := mvccKey("b")
testCases := []func(){
func() { _ = batch.MVCCIterate(a.Key, b.Key, MVCCKeyIterKind, IterKeyTypePointsOnly, nil) },
func() { _ = batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: roachpb.KeyMax}) },
func() { _ = r.MVCCIterate(a.Key, b.Key, MVCCKeyIterKind, IterKeyTypePointsOnly, nil) },
func() { _ = r.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: roachpb.KeyMax}) },
}
for i, f := range testCases {
func() {
Expand Down Expand Up @@ -823,7 +827,7 @@ func TestBatchCombine(t *testing.T) {
}
k := fmt.Sprint(v)

b := e.NewUnindexedBatch(true /* writeOnly */)
b := e.NewWriteBatch()
if err := b.PutUnversioned(mvccKey(k).Key, []byte(k)); err != nil {
errs <- errors.Wrap(err, "put failed")
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func runClearRange(
b.ResetTimer()

for i := 0; i < b.N; i++ {
batch := eng.NewUnindexedBatch(false /* writeOnly */)
batch := eng.NewUnindexedBatch()
if err := clearRange(eng, batch, MVCCKey{Key: keys.LocalMax}, MVCCKeyMax); err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func runBatchApplyBatchRepr(
})
}

batch := eng.NewUnindexedBatch(true /* writeOnly */)
batch := eng.NewWriteBatch()
defer batch.Close() // NB: hold open so batch.Repr() doesn't get reused

for i := 0; i < batchSize; i++ {
Expand All @@ -1504,9 +1504,9 @@ func runBatchApplyBatchRepr(
b.ResetTimer()

for i := 0; i < b.N; i++ {
var batch Batch
var batch WriteBatch
if !indexed {
batch = eng.NewUnindexedBatch(true /* writeOnly */)
batch = eng.NewWriteBatch()
} else {
batch = eng.NewBatch()
}
Expand Down
36 changes: 22 additions & 14 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,21 +894,21 @@ type Engine interface {
// NewUnindexedBatch returns a new instance of a batched engine which wraps
// this engine. It is unindexed, in that writes to the batch are not
// visible to reads until after it commits. The batch accumulates all
// mutations and applies them atomically on a call to Commit(). Read
// operations return an error, unless writeOnly is set to false.
// mutations and applies them atomically on a call to Commit().
//
// When writeOnly is false, reads will be satisfied by reading from the
// underlying engine, i.e., the caller does not see its own writes. This
// setting should be used only when the caller is certain that this
// optimization is correct, and beneficial. There are subtleties here -- see
// the discussion on https://github.com/cockroachdb/cockroach/pull/57661 for
// more details.
// Reads will be satisfied by reading from the underlying engine, i.e., the
// caller does not see its own writes. This setting should be used only when
// the caller is certain that this optimization is correct, and beneficial.
// There are subtleties here -- see the discussion on
// https://github.com/cockroachdb/cockroach/pull/57661 for more details.
//
// TODO(sumeer): We should separate the writeOnly=true case into a
// separate method, that returns a WriteBatch interface. Even better would
// be not having an option to pass writeOnly=false, and have the caller
// explicitly work with a separate WriteBatch and Reader.
NewUnindexedBatch(writeOnly bool) Batch
// TODO(sumeer,jackson): Remove this method and force the caller to operate
// explicitly with a separate WriteBatch and Reader.
NewUnindexedBatch() Batch
// NewWriteBatch returns a new write batch that will commit to the
// underlying Engine. The batch accumulates all mutations and applies them
// atomically on a call to Commit().
NewWriteBatch() WriteBatch
// NewSnapshot returns a new instance of a read-only snapshot
// engine. Snapshots are instantaneous and, as long as they're
// released relatively quickly, inexpensive. Snapshots are released
Expand Down Expand Up @@ -974,7 +974,15 @@ type Batch interface {
// iterator creation. To guarantee that they see all the mutations, the
// iterator has to be repositioned using a seek operation, after the
// mutations were done.
ReadWriter
Reader
WriteBatch
}

// WriteBatch is the interface for write batch specific operations.
type WriteBatch interface {
Writer
// Close closes the batch, freeing up any outstanding resources.
Close()
// Commit atomically applies any batched updates to the underlying
// engine. This is a noop unless the batch was created via NewBatch(). If
// sync is true, the batch is synchronously committed to disk.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ func TestEngineIteratorVisibility(t *testing.T) {
readOwnWrites: true,
},
"UnindexedBatch": {
makeReader: func(e Engine) Reader { return e.NewUnindexedBatch(false) },
makeReader: func(e Engine) Reader { return e.NewUnindexedBatch() },
expectConsistent: true,
canWrite: true,
readOwnWrites: false,
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ func (p *Pebble) ClearMVCCVersions(start, end MVCCKey) error {
// ClearMVCCIteratorRange implements the Engine interface.
func (p *Pebble) ClearMVCCIteratorRange(start, end roachpb.Key, pointKeys, rangeKeys bool) error {
// Write all the tombstones in one batch.
batch := p.NewUnindexedBatch(false /* writeOnly */)
batch := p.NewUnindexedBatch()
defer batch.Close()

if err := batch.ClearMVCCIteratorRange(start, end, pointKeys, rangeKeys); err != nil {
Expand Down Expand Up @@ -1842,8 +1842,13 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter {
}

// NewUnindexedBatch implements the Engine interface.
func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch {
return newPebbleBatch(p.db, p.db.NewBatch(), writeOnly, p.settings)
func (p *Pebble) NewUnindexedBatch() Batch {
return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings)
}

// NewWriteBatch implements the Engine interface.
func (p *Pebble) NewWriteBatch() WriteBatch {
return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings)
}

// NewSnapshot implements the Engine interface.
Expand Down