Skip to content

Commit

Permalink
Merge pull request #13481 from petermattis/pmattis/sync-raft-log
Browse files Browse the repository at this point in the history
storage: sync Raft log writes
  • Loading branch information
petermattis authored Feb 8, 2017
2 parents 7b91fbc + 04be417 commit 5e945a2
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 66 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func evalWriteBatch(
return storage.EvalResult{}, errors.New("WriteBatch can only be called on empty ranges")
}

if err := batch.ApplyBatchRepr(args.Data); err != nil {
if err := batch.ApplyBatchRepr(args.Data, false /* !sync */); err != nil {
return storage.EvalResult{}, err
}
return storage.EvalResult{}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/abort_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (sc *AbortCache) ClearData(e engine.Engine) error {
if err != nil {
return err
}
return b.Commit()
return b.Commit(false /* !sync */)
}

// Get looks up an abort cache entry recorded for this transaction ID.
Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch
func TestBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
return b.Commit()
return b.Commit(false /* !sync */)
})
}

Expand Down Expand Up @@ -231,14 +231,14 @@ func TestBatchRepr(t *testing.T) {
t.Fatalf("expected %v, but found %v", expOps, ops)
}

return e.ApplyBatchRepr(repr)
return e.ApplyBatchRepr(repr, false /* !sync */)
})
}

func TestWriteBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error {
return b.Commit()
return b.Commit(false /* !sync */)
})
}

Expand All @@ -264,7 +264,7 @@ func TestApplyBatchRepr(t *testing.T) {

b2 := e.NewBatch()
defer b2.Close()
if err := b2.ApplyBatchRepr(repr1); err != nil {
if err := b2.ApplyBatchRepr(repr1, false /* !sync */); err != nil {
t.Fatal(err)
}
repr2 := b2.Repr()
Expand All @@ -290,11 +290,11 @@ func TestApplyBatchRepr(t *testing.T) {

b4 := e.NewBatch()
defer b4.Close()
if err := b4.ApplyBatchRepr(repr); err != nil {
if err := b4.ApplyBatchRepr(repr, false /* !sync */); err != nil {
t.Fatal(err)
}
// Intentionally don't call Repr() because the expected user wouldn't.
if err := b4.Commit(); err != nil {
if err := b4.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -456,7 +456,7 @@ func TestBatchProto(t *testing.T) {
t.Fatalf("expected GetProto to fail ok=%t: %s", ok, err)
}
// Commit and verify the proto can be read directly from the engine.
if err := b.Commit(); err != nil {
if err := b.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}
if ok, _, _, err := e.GetProto(mvccKey("proto"), getVal); !ok || err != nil {
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestBatchScan(t *testing.T) {
}

// Now, commit batch and re-scan using engine direct to compare results.
if err := b.Commit(); err != nil {
if err := b.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}
for i, scan := range scans {
Expand Down Expand Up @@ -912,7 +912,7 @@ func TestBatchDistinctPanics(t *testing.T) {
func() { _ = batch.Put(a, nil) },
func() { _ = batch.Merge(a, nil) },
func() { _ = batch.Clear(a) },
func() { _ = batch.ApplyBatchRepr(nil) },
func() { _ = batch.ApplyBatchRepr(nil, false) },
func() { _, _ = batch.Get(a) },
func() { _, _, _, _ = batch.GetProto(a, nil) },
func() { _ = batch.Iterate(a, a, nil) },
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func setupMVCCData(
// sstables.
if scaled := len(order) / 20; i > 0 && (i%scaled) == 0 {
log.Infof(context.Background(), "committing (%d/~%d)", i/scaled, 20)
if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand All @@ -143,7 +143,7 @@ func setupMVCCData(
b.Fatal(err)
}
}
if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand Down Expand Up @@ -352,7 +352,7 @@ func runMVCCBatchPut(emk engineMaker, valueSize, batchSize int, b *testing.B) {
}
}

if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}

Expand Down Expand Up @@ -403,7 +403,7 @@ func runMVCCBatchTimeSeries(emk engineMaker, batchSize int, b *testing.B) {
}
}

if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand Down Expand Up @@ -561,7 +561,7 @@ func runBatchApplyBatchRepr(
} else {
batch = eng.NewBatch()
}
if err := batch.ApplyBatchRepr(repr); err != nil {
if err := batch.ApplyBatchRepr(repr, false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand Down
11 changes: 7 additions & 4 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ type Reader interface {
type Writer interface {
// ApplyBatchRepr atomically applies a set of batched updates. Created by
// calling Repr() on a batch. Using this method is equivalent to constructing
// and committing a batch whose Repr() equals repr.
ApplyBatchRepr(repr []byte) error
// and committing a batch whose Repr() equals repr. If sync is true, the
// batch is synchronously written to disk. It is an error to specify
// sync=true if the Writer is a Batch.
ApplyBatchRepr(repr []byte, sync bool) error
// Clear removes the item from the db with the given key.
// Note that clear actually removes entries from the storage
// engine, rather than inserting tombstones.
Expand Down Expand Up @@ -202,8 +204,9 @@ type Engine interface {
type Batch interface {
ReadWriter
// Commit atomically applies any batched updates to the underlying
// engine. This is a noop unless the engine was created via NewBatch().
Commit() error
// engine. This is a noop unless the engine was created via NewBatch(). If
// sync is true, the batch is synchronously committed to disk.
Commit(sync bool) error
// Distinct returns a view of the existing batch which only sees writes that
// were performed before the Distinct batch was created. That is, the
// returned batch will not read its own writes, but it will read writes to
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestEngineBatchCommit(t *testing.T) {
t.Fatal(err)
}
}
if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}
close(writesDone)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestEngineBatch(t *testing.T) {
}
iter.Close()
// Commit the batch and try getting the value from the engine.
if err := b.Commit(); err != nil {
if err := b.Commit(false /* !sync */); err != nil {
t.Errorf("%d: %v", i, err)
continue
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ func (r *RocksDB) Merge(key MVCCKey, value []byte) error {
// ApplyBatchRepr atomically applies a set of batched updates. Created by
// calling Repr() on a batch. Using this method is equivalent to constructing
// and committing a batch whose Repr() equals repr.
func (r *RocksDB) ApplyBatchRepr(repr []byte) error {
return dbApplyBatchRepr(r.rdb, repr)
func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error {
return dbApplyBatchRepr(r.rdb, repr, sync)
}

// Get returns the value for the given key.
Expand Down Expand Up @@ -968,13 +968,13 @@ func (r *rocksDBBatch) Merge(key MVCCKey, value []byte) error {

// ApplyBatchRepr atomically applies a set of batched updates to the current
// batch (the receiver).
func (r *rocksDBBatch) ApplyBatchRepr(repr []byte) error {
func (r *rocksDBBatch) ApplyBatchRepr(repr []byte, sync bool) error {
if r.distinctOpen {
panic("distinct batch open")
}
r.flushMutations()
r.flushes++ // make sure that Repr() doesn't take a shortcut
return dbApplyBatchRepr(r.batch, repr)
return dbApplyBatchRepr(r.batch, repr, sync)
}

func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1068,7 +1068,7 @@ func (r *rocksDBBatch) NewIterator(prefix bool) Iterator {
return iter
}

func (r *rocksDBBatch) Commit() error {
func (r *rocksDBBatch) Commit(sync bool) error {
if r.closed() {
panic("this batch was already committed")
}
Expand All @@ -1081,7 +1081,7 @@ func (r *rocksDBBatch) Commit() error {
// We've previously flushed mutations to the C++ batch, so we have to flush
// any remaining mutations as well and then commit the batch.
r.flushMutations()
if err := statusToError(C.DBCommitAndCloseBatch(r.batch)); err != nil {
if err := statusToError(C.DBCommitAndCloseBatch(r.batch, C.bool(sync))); err != nil {
return err
}
r.batch = nil
Expand All @@ -1091,7 +1091,7 @@ func (r *rocksDBBatch) Commit() error {

// Fast-path which avoids flushing mutations to the C++ batch. Instead, we
// directly apply the mutations to the database.
if err := r.parent.ApplyBatchRepr(r.builder.Finish()); err != nil {
if err := r.parent.ApplyBatchRepr(r.builder.Finish(), sync); err != nil {
return err
}
C.DBClose(r.batch)
Expand Down Expand Up @@ -1135,7 +1135,7 @@ func (r *rocksDBBatch) flushMutations() {
r.flushes++
r.flushedCount += r.builder.count
r.flushedSize += len(r.builder.repr)
if err := r.ApplyBatchRepr(r.builder.Finish()); err != nil {
if err := r.ApplyBatchRepr(r.builder.Finish(), false); err != nil {
panic(err)
}
// Force a seek of the underlying iterator on the next Seek/ReverseSeek.
Expand Down Expand Up @@ -1474,8 +1474,8 @@ func dbMerge(rdb *C.DBEngine, key MVCCKey, value []byte) error {
return statusToError(C.DBMerge(rdb, goToCKey(key), goToCSlice(value)))
}

func dbApplyBatchRepr(rdb *C.DBEngine, repr []byte) error {
return statusToError(C.DBApplyBatchRepr(rdb, goToCSlice(repr)))
func dbApplyBatchRepr(rdb *C.DBEngine, repr []byte, sync bool) error {
return statusToError(C.DBApplyBatchRepr(rdb, goToCSlice(repr), C.bool(sync)))
}

// dbGet returns the value for the given key.
Expand Down
Loading

0 comments on commit 5e945a2

Please sign in to comment.