From efe12b87adc0be667f571f1d1731fe8a6e446a7e Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 25 Jan 2023 15:58:37 -0500 Subject: [PATCH] storage: add CommitNoSyncWait and SyncWait to Batch interface Extracted from #94165. Picks up github.com/cockroachdb/pebble/pull/2117. Release note: None Epic: None --- pkg/kv/kvserver/spanset/batch.go | 8 ++++++ pkg/storage/engine.go | 8 ++++++ pkg/storage/pebble_batch.go | 45 ++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 595b260e8c05..bdbbe338b709 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -752,6 +752,14 @@ func (s spanSetBatch) Commit(sync bool) error { return s.b.Commit(sync) } +func (s spanSetBatch) CommitNoSyncWait() error { + return s.b.CommitNoSyncWait() +} + +func (s spanSetBatch) SyncWait() error { + return s.b.CommitNoSyncWait() +} + func (s spanSetBatch) Empty() bool { return s.b.Empty() } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 0ba221311b25..309e046fb87c 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -978,6 +978,14 @@ type Batch interface { // engine. This is a noop unless the batch was created via NewBatch(). If // sync is true, the batch is synchronously committed to disk. Commit(sync bool) error + // CommitNoSyncWait atomically applies any batched updates to the underlying + // engine and initiates a disk write, but does not wait for that write to + // complete. The caller must call SyncWait to wait for the fsync to complete. + // The caller must not Close the Batch without first calling SyncWait. + CommitNoSyncWait() error + // SyncWait waits for the disk write initiated by a call to CommitNoSyncWait + // to complete. + SyncWait() error // Empty returns whether the batch has been written to or not. Empty() bool // Count returns the number of memtable-modifying operations in the batch. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 7cb4079e6187..8711e7a535ec 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -550,6 +550,51 @@ func (p *pebbleBatch) Commit(sync bool) error { } err := p.batch.Commit(opts) if err != nil { + // TODO(storage): ensure that these errors are only ever due to invariant + // violations and never due to unrecoverable Pebble states. Then switch to + // returning the error instead of panicking. + // + // Once we do that, document on the storage.Batch interface the meaning of + // an error returned from this method and the guarantees that callers have + // or don't have after they receive an error from this method. + panic(err) + } + return err +} + +// CommitNoSyncWait implements the Batch interface. +func (p *pebbleBatch) CommitNoSyncWait() error { + if p.batch == nil { + panic("called with nil batch") + } + err := p.db.ApplyNoSyncWait(p.batch, pebble.Sync) + if err != nil { + // TODO(storage): ensure that these errors are only ever due to invariant + // violations and never due to unrecoverable Pebble states. Then switch to + // returning the error instead of panicking. + // + // Once we do that, document on the storage.Batch interface the meaning of + // an error returned from this method and the guarantees that callers have + // or don't have after they receive an error from this method. + panic(err) + } + return err +} + +// SyncWait implements the Batch interface. +func (p *pebbleBatch) SyncWait() error { + if p.batch == nil { + panic("called with nil batch") + } + err := p.batch.SyncWait() + if err != nil { + // TODO(storage): ensure that these errors are only ever due to invariant + // violations and never due to unrecoverable Pebble states. Then switch to + // returning the error instead of panicking. + // + // Once we do that, document on the storage.Batch interface the meaning of + // an error returned from this method and the guarantees that callers have + // or don't have after they receive an error from this method. panic(err) } return err