Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add CommitNoSyncWait and SyncWait to Batch interface #95862

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,14 @@ func (s spanSetBatch) Commit(sync bool) error {
return s.b.Commit(sync)
}

func (s spanSetBatch) CommitNoSyncWait() error {
return s.b.CommitNoSyncWait()
}

func (s spanSetBatch) SyncWait() error {
return s.b.CommitNoSyncWait()
}

func (s spanSetBatch) Empty() bool {
return s.b.Empty()
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,14 @@ type Batch interface {
// engine. This is a noop unless the batch was created via NewBatch(). If
// sync is true, the batch is synchronously committed to disk.
Commit(sync bool) error
// CommitNoSyncWait atomically applies any batched updates to the underlying
// engine and initiates a disk write, but does not wait for that write to
// complete. The caller must call SyncWait to wait for the fsync to complete.
// The caller must not Close the Batch without first calling SyncWait.
CommitNoSyncWait() error
// SyncWait waits for the disk write initiated by a call to CommitNoSyncWait
// to complete.
SyncWait() error
// Empty returns whether the batch has been written to or not.
Empty() bool
// Count returns the number of memtable-modifying operations in the batch.
Expand Down
45 changes: 45 additions & 0 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,51 @@ func (p *pebbleBatch) Commit(sync bool) error {
}
err := p.batch.Commit(opts)
if err != nil {
// TODO(storage): ensure that these errors are only ever due to invariant
// violations and never due to unrecoverable Pebble states. Then switch to
// returning the error instead of panicking.
//
// Once we do that, document on the storage.Batch interface the meaning of
// an error returned from this method and the guarantees that callers have
// or don't have after they receive an error from this method.
panic(err)
}
return err
}

// CommitNoSyncWait implements the Batch interface.
func (p *pebbleBatch) CommitNoSyncWait() error {
if p.batch == nil {
panic("called with nil batch")
}
err := p.db.ApplyNoSyncWait(p.batch, pebble.Sync)
if err != nil {
// TODO(storage): ensure that these errors are only ever due to invariant
// violations and never due to unrecoverable Pebble states. Then switch to
// returning the error instead of panicking.
//
// Once we do that, document on the storage.Batch interface the meaning of
// an error returned from this method and the guarantees that callers have
// or don't have after they receive an error from this method.
panic(err)
}
return err
}

// SyncWait implements the Batch interface.
func (p *pebbleBatch) SyncWait() error {
if p.batch == nil {
panic("called with nil batch")
}
err := p.batch.SyncWait()
if err != nil {
// TODO(storage): ensure that these errors are only ever due to invariant
// violations and never due to unrecoverable Pebble states. Then switch to
// returning the error instead of panicking.
//
// Once we do that, document on the storage.Batch interface the meaning of
// an error returned from this method and the guarantees that callers have
// or don't have after they receive an error from this method.
panic(err)
}
return err
Expand Down