diff --git a/pkg/ccl/storageccl/writebatch.go b/pkg/ccl/storageccl/writebatch.go index bc468825f511..17b32210dec4 100644 --- a/pkg/ccl/storageccl/writebatch.go +++ b/pkg/ccl/storageccl/writebatch.go @@ -72,7 +72,7 @@ func evalWriteBatch( return storage.EvalResult{}, errors.New("WriteBatch can only be called on empty ranges") } - if err := batch.ApplyBatchRepr(args.Data); err != nil { + if err := batch.ApplyBatchRepr(args.Data, false /* !sync */); err != nil { return storage.EvalResult{}, err } return storage.EvalResult{}, nil diff --git a/pkg/storage/abort_cache.go b/pkg/storage/abort_cache.go index 853fb698a69e..31a45ee2d855 100644 --- a/pkg/storage/abort_cache.go +++ b/pkg/storage/abort_cache.go @@ -94,7 +94,7 @@ func (sc *AbortCache) ClearData(e engine.Engine) error { if err != nil { return err } - return b.Commit() + return b.Commit(false /* !sync */) } // Get looks up an abort cache entry recorded for this transaction ID. diff --git a/pkg/storage/engine/batch_test.go b/pkg/storage/engine/batch_test.go index d27831a3ca20..9ea9c37f8001 100644 --- a/pkg/storage/engine/batch_test.go +++ b/pkg/storage/engine/batch_test.go @@ -129,7 +129,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch func TestBatchBasics(t *testing.T) { defer leaktest.AfterTest(t)() testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error { - return b.Commit() + return b.Commit(false /* !sync */) }) } @@ -231,14 +231,14 @@ func TestBatchRepr(t *testing.T) { t.Fatalf("expected %v, but found %v", expOps, ops) } - return e.ApplyBatchRepr(repr) + return e.ApplyBatchRepr(repr, false /* !sync */) }) } func TestWriteBatchBasics(t *testing.T) { defer leaktest.AfterTest(t)() testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error { - return b.Commit() + return b.Commit(false /* !sync */) }) } @@ -264,7 +264,7 @@ func TestApplyBatchRepr(t *testing.T) { b2 := e.NewBatch() defer b2.Close() - if err := b2.ApplyBatchRepr(repr1); err != nil { + if err := b2.ApplyBatchRepr(repr1, false /* !sync */); err != nil { t.Fatal(err) } repr2 := b2.Repr() @@ -290,11 +290,11 @@ func TestApplyBatchRepr(t *testing.T) { b4 := e.NewBatch() defer b4.Close() - if err := b4.ApplyBatchRepr(repr); err != nil { + if err := b4.ApplyBatchRepr(repr, false /* !sync */); err != nil { t.Fatal(err) } // Intentionally don't call Repr() because the expected user wouldn't. - if err := b4.Commit(); err != nil { + if err := b4.Commit(false /* !sync */); err != nil { t.Fatal(err) } @@ -456,7 +456,7 @@ func TestBatchProto(t *testing.T) { t.Fatalf("expected GetProto to fail ok=%t: %s", ok, err) } // Commit and verify the proto can be read directly from the engine. - if err := b.Commit(); err != nil { + if err := b.Commit(false /* !sync */); err != nil { t.Fatal(err) } if ok, _, _, err := e.GetProto(mvccKey("proto"), getVal); !ok || err != nil { @@ -545,7 +545,7 @@ func TestBatchScan(t *testing.T) { } // Now, commit batch and re-scan using engine direct to compare results. - if err := b.Commit(); err != nil { + if err := b.Commit(false /* !sync */); err != nil { t.Fatal(err) } for i, scan := range scans { @@ -912,7 +912,7 @@ func TestBatchDistinctPanics(t *testing.T) { func() { _ = batch.Put(a, nil) }, func() { _ = batch.Merge(a, nil) }, func() { _ = batch.Clear(a) }, - func() { _ = batch.ApplyBatchRepr(nil) }, + func() { _ = batch.ApplyBatchRepr(nil, false) }, func() { _, _ = batch.Get(a) }, func() { _, _, _, _ = batch.GetProto(a, nil) }, func() { _ = batch.Iterate(a, a, nil) }, diff --git a/pkg/storage/engine/bench_test.go b/pkg/storage/engine/bench_test.go index d7594dfbeb62..8ac7194b841f 100644 --- a/pkg/storage/engine/bench_test.go +++ b/pkg/storage/engine/bench_test.go @@ -124,7 +124,7 @@ func setupMVCCData( // sstables. if scaled := len(order) / 20; i > 0 && (i%scaled) == 0 { log.Infof(context.Background(), "committing (%d/~%d)", i/scaled, 20) - if err := batch.Commit(); err != nil { + if err := batch.Commit(false /* !sync */); err != nil { b.Fatal(err) } batch.Close() @@ -143,7 +143,7 @@ func setupMVCCData( b.Fatal(err) } } - if err := batch.Commit(); err != nil { + if err := batch.Commit(false /* !sync */); err != nil { b.Fatal(err) } batch.Close() @@ -352,7 +352,7 @@ func runMVCCBatchPut(emk engineMaker, valueSize, batchSize int, b *testing.B) { } } - if err := batch.Commit(); err != nil { + if err := batch.Commit(false /* !sync */); err != nil { b.Fatal(err) } @@ -403,7 +403,7 @@ func runMVCCBatchTimeSeries(emk engineMaker, batchSize int, b *testing.B) { } } - if err := batch.Commit(); err != nil { + if err := batch.Commit(false /* !sync */); err != nil { b.Fatal(err) } batch.Close() @@ -561,7 +561,7 @@ func runBatchApplyBatchRepr( } else { batch = eng.NewBatch() } - if err := batch.ApplyBatchRepr(repr); err != nil { + if err := batch.ApplyBatchRepr(repr, false /* !sync */); err != nil { b.Fatal(err) } batch.Close() diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index d2275e7b1060..a800c3483d4e 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -125,8 +125,10 @@ type Reader interface { type Writer interface { // ApplyBatchRepr atomically applies a set of batched updates. Created by // calling Repr() on a batch. Using this method is equivalent to constructing - // and committing a batch whose Repr() equals repr. - ApplyBatchRepr(repr []byte) error + // and committing a batch whose Repr() equals repr. If sync is true, the + // batch is synchronously written to disk. It is an error to specify + // sync=true if the Writer is a Batch. + ApplyBatchRepr(repr []byte, sync bool) error // Clear removes the item from the db with the given key. // Note that clear actually removes entries from the storage // engine, rather than inserting tombstones. @@ -202,8 +204,9 @@ type Engine interface { type Batch interface { ReadWriter // Commit atomically applies any batched updates to the underlying - // engine. This is a noop unless the engine was created via NewBatch(). - Commit() error + // engine. This is a noop unless the engine was created via NewBatch(). If + // sync is true, the batch is synchronously committed to disk. + Commit(sync bool) error // Distinct returns a view of the existing batch which only sees writes that // were performed before the Distinct batch was created. That is, the // returned batch will not read its own writes, but it will read writes to diff --git a/pkg/storage/engine/engine_test.go b/pkg/storage/engine/engine_test.go index 3574de21032d..8f8ffd5824b8 100644 --- a/pkg/storage/engine/engine_test.go +++ b/pkg/storage/engine/engine_test.go @@ -115,7 +115,7 @@ func TestEngineBatchCommit(t *testing.T) { t.Fatal(err) } } - if err := batch.Commit(); err != nil { + if err := batch.Commit(false /* !sync */); err != nil { t.Fatal(err) } close(writesDone) @@ -320,7 +320,7 @@ func TestEngineBatch(t *testing.T) { } iter.Close() // Commit the batch and try getting the value from the engine. - if err := b.Commit(); err != nil { + if err := b.Commit(false /* !sync */); err != nil { t.Errorf("%d: %v", i, err) continue } diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 8906f3b6612c..4c218d1445d0 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -441,8 +441,8 @@ func (r *RocksDB) Merge(key MVCCKey, value []byte) error { // ApplyBatchRepr atomically applies a set of batched updates. Created by // calling Repr() on a batch. Using this method is equivalent to constructing // and committing a batch whose Repr() equals repr. -func (r *RocksDB) ApplyBatchRepr(repr []byte) error { - return dbApplyBatchRepr(r.rdb, repr) +func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error { + return dbApplyBatchRepr(r.rdb, repr, sync) } // Get returns the value for the given key. @@ -965,13 +965,13 @@ func (r *rocksDBBatch) Merge(key MVCCKey, value []byte) error { // ApplyBatchRepr atomically applies a set of batched updates to the current // batch (the receiver). -func (r *rocksDBBatch) ApplyBatchRepr(repr []byte) error { +func (r *rocksDBBatch) ApplyBatchRepr(repr []byte, sync bool) error { if r.distinctOpen { panic("distinct batch open") } r.flushMutations() r.flushes++ // make sure that Repr() doesn't take a shortcut - return dbApplyBatchRepr(r.batch, repr) + return dbApplyBatchRepr(r.batch, repr, sync) } func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) { @@ -1065,7 +1065,7 @@ func (r *rocksDBBatch) NewIterator(prefix bool) Iterator { return iter } -func (r *rocksDBBatch) Commit() error { +func (r *rocksDBBatch) Commit(sync bool) error { if r.closed() { panic("this batch was already committed") } @@ -1078,7 +1078,7 @@ func (r *rocksDBBatch) Commit() error { // We've previously flushed mutations to the C++ batch, so we have to flush // any remaining mutations as well and then commit the batch. r.flushMutations() - if err := statusToError(C.DBCommitAndCloseBatch(r.batch)); err != nil { + if err := statusToError(C.DBCommitAndCloseBatch(r.batch, C.bool(sync))); err != nil { return err } r.batch = nil @@ -1088,7 +1088,7 @@ func (r *rocksDBBatch) Commit() error { // Fast-path which avoids flushing mutations to the C++ batch. Instead, we // directly apply the mutations to the database. - if err := r.parent.ApplyBatchRepr(r.builder.Finish()); err != nil { + if err := r.parent.ApplyBatchRepr(r.builder.Finish(), sync); err != nil { return err } C.DBClose(r.batch) @@ -1132,7 +1132,7 @@ func (r *rocksDBBatch) flushMutations() { r.flushes++ r.flushedCount += r.builder.count r.flushedSize += len(r.builder.repr) - if err := r.ApplyBatchRepr(r.builder.Finish()); err != nil { + if err := r.ApplyBatchRepr(r.builder.Finish(), false); err != nil { panic(err) } // Force a seek of the underlying iterator on the next Seek/ReverseSeek. @@ -1471,8 +1471,8 @@ func dbMerge(rdb *C.DBEngine, key MVCCKey, value []byte) error { return statusToError(C.DBMerge(rdb, goToCKey(key), goToCSlice(value))) } -func dbApplyBatchRepr(rdb *C.DBEngine, repr []byte) error { - return statusToError(C.DBApplyBatchRepr(rdb, goToCSlice(repr))) +func dbApplyBatchRepr(rdb *C.DBEngine, repr []byte, sync bool) error { + return statusToError(C.DBApplyBatchRepr(rdb, goToCSlice(repr), C.bool(sync))) } // dbGet returns the value for the given key. diff --git a/pkg/storage/engine/rocksdb/db.cc b/pkg/storage/engine/rocksdb/db.cc index 77c31d00559a..bd989d407a23 100644 --- a/pkg/storage/engine/rocksdb/db.cc +++ b/pkg/storage/engine/rocksdb/db.cc @@ -62,8 +62,8 @@ struct DBEngine { virtual DBStatus Merge(DBKey key, DBSlice value) = 0; virtual DBStatus Delete(DBKey key) = 0; virtual DBStatus DeleteRange(DBKey start, DBKey end) = 0; - virtual DBStatus CommitBatch() = 0; - virtual DBStatus ApplyBatchRepr(DBSlice repr) = 0; + virtual DBStatus CommitBatch(bool sync) = 0; + virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync) = 0; virtual DBSlice BatchRepr() = 0; virtual DBStatus Get(DBKey key, DBString* value) = 0; virtual DBIterator* NewIter(bool prefix) = 0; @@ -103,8 +103,8 @@ struct DBImpl : public DBEngine { virtual DBStatus Merge(DBKey key, DBSlice value); virtual DBStatus Delete(DBKey key); virtual DBStatus DeleteRange(DBKey start, DBKey end); - virtual DBStatus CommitBatch(); - virtual DBStatus ApplyBatchRepr(DBSlice repr); + virtual DBStatus CommitBatch(bool sync); + virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync); virtual DBSlice BatchRepr(); virtual DBStatus Get(DBKey key, DBString* value); virtual DBIterator* NewIter(bool prefix); @@ -124,8 +124,8 @@ struct DBBatch : public DBEngine { virtual DBStatus Merge(DBKey key, DBSlice value); virtual DBStatus Delete(DBKey key); virtual DBStatus DeleteRange(DBKey start, DBKey end); - virtual DBStatus CommitBatch(); - virtual DBStatus ApplyBatchRepr(DBSlice repr); + virtual DBStatus CommitBatch(bool sync); + virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync); virtual DBSlice BatchRepr(); virtual DBStatus Get(DBKey key, DBString* value); virtual DBIterator* NewIter(bool prefix); @@ -144,8 +144,8 @@ struct DBWriteOnlyBatch : public DBEngine { virtual DBStatus Merge(DBKey key, DBSlice value); virtual DBStatus Delete(DBKey key); virtual DBStatus DeleteRange(DBKey start, DBKey end); - virtual DBStatus CommitBatch(); - virtual DBStatus ApplyBatchRepr(DBSlice repr); + virtual DBStatus CommitBatch(bool sync); + virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync); virtual DBSlice BatchRepr(); virtual DBStatus Get(DBKey key, DBString* value); virtual DBIterator* NewIter(bool prefix); @@ -169,8 +169,8 @@ struct DBSnapshot : public DBEngine { virtual DBStatus Merge(DBKey key, DBSlice value); virtual DBStatus Delete(DBKey key); virtual DBStatus DeleteRange(DBKey start, DBKey end); - virtual DBStatus CommitBatch(); - virtual DBStatus ApplyBatchRepr(DBSlice repr); + virtual DBStatus CommitBatch(bool sync); + virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync); virtual DBSlice BatchRepr(); virtual DBStatus Get(DBKey key, DBString* value); virtual DBIterator* NewIter(bool prefix); @@ -1554,6 +1554,10 @@ rocksdb::Options DBMakeOptions(DBOptions db_opts) { options.statistics = rocksdb::CreateDBStatistics(); options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); options.max_open_files = db_opts.max_open_files; + // Periodically sync the WAL to smooth out writes. Not performing + // such syncs can be faster but can cause performance blips when the + // OS decides it needs to flush data. + options.wal_bytes_per_sync = 256 << 10; // 256 KB // Do not create bloom filters for the last level (i.e. the largest // level which contains data in the LSM store). Setting this option @@ -1837,45 +1841,51 @@ DBStatus DBDeleteIterRange(DBEngine* db, DBIterator *iter, DBKey start, DBKey en return kSuccess; } -DBStatus DBImpl::CommitBatch() { +DBStatus DBImpl::CommitBatch(bool sync) { return FmtStatus("unsupported"); } -DBStatus DBBatch::CommitBatch() { +DBStatus DBBatch::CommitBatch(bool sync) { if (updates == 0) { return kSuccess; } rocksdb::WriteOptions options; + options.sync = sync; return ToDBStatus(rep->Write(options, batch.GetWriteBatch())); } -DBStatus DBWriteOnlyBatch::CommitBatch() { +DBStatus DBWriteOnlyBatch::CommitBatch(bool sync) { if (updates == 0) { return kSuccess; } rocksdb::WriteOptions options; + options.sync = sync; return ToDBStatus(rep->Write(options, &batch)); } -DBStatus DBSnapshot::CommitBatch() { +DBStatus DBSnapshot::CommitBatch(bool sync) { return FmtStatus("unsupported"); } -DBStatus DBCommitAndCloseBatch(DBEngine* db) { - DBStatus status = db->CommitBatch(); +DBStatus DBCommitAndCloseBatch(DBEngine* db, bool sync) { + DBStatus status = db->CommitBatch(sync); if (status.data == NULL) { DBClose(db); } return status; } -DBStatus DBImpl::ApplyBatchRepr(DBSlice repr) { +DBStatus DBImpl::ApplyBatchRepr(DBSlice repr, bool sync) { rocksdb::WriteBatch batch(ToString(repr)); rocksdb::WriteOptions options; + options.sync = sync; return ToDBStatus(rep->Write(options, &batch)); } -DBStatus DBBatch::ApplyBatchRepr(DBSlice repr) { +DBStatus DBBatch::ApplyBatchRepr(DBSlice repr, bool sync) { + if (sync) { + return FmtStatus("unsupported"); + } // TODO(peter): It would be slightly more efficient to iterate over // repr directly instead of first converting it to a string. DBBatchInserter inserter(&batch); @@ -1885,7 +1895,10 @@ DBStatus DBBatch::ApplyBatchRepr(DBSlice repr) { return kSuccess; } -DBStatus DBWriteOnlyBatch::ApplyBatchRepr(DBSlice repr) { +DBStatus DBWriteOnlyBatch::ApplyBatchRepr(DBSlice repr, bool sync) { + if (sync) { + return FmtStatus("unsupported"); + } // TODO(peter): It would be slightly more efficient to iterate over // repr directly instead of first converting it to a string. DBBatchInserter inserter(&batch); @@ -1895,12 +1908,12 @@ DBStatus DBWriteOnlyBatch::ApplyBatchRepr(DBSlice repr) { return kSuccess; } -DBStatus DBSnapshot::ApplyBatchRepr(DBSlice repr) { +DBStatus DBSnapshot::ApplyBatchRepr(DBSlice repr, bool sync) { return FmtStatus("unsupported"); } -DBStatus DBApplyBatchRepr(DBEngine* db, DBSlice repr) { - return db->ApplyBatchRepr(repr); +DBStatus DBApplyBatchRepr(DBEngine* db, DBSlice repr, bool sync) { + return db->ApplyBatchRepr(repr, sync); } DBSlice DBImpl::BatchRepr() { diff --git a/pkg/storage/engine/rocksdb/db.h b/pkg/storage/engine/rocksdb/db.h index 87c28cc92221..177db743cbe5 100644 --- a/pkg/storage/engine/rocksdb/db.h +++ b/pkg/storage/engine/rocksdb/db.h @@ -126,13 +126,13 @@ DBStatus DBDeleteIterRange(DBEngine* db, DBIterator *iter, DBKey start, DBKey en // this function on an engine created by DBNewBatch. If an error is // returned, the batch is not closed and it is the caller's // responsibility to call DBClose. -DBStatus DBCommitAndCloseBatch(DBEngine* db); +DBStatus DBCommitAndCloseBatch(DBEngine* db, bool sync); // ApplyBatchRepr applies a batch of mutations encoded using that // batch representation returned by DBBatchRepr(). It is only valid to // call this function on an engine created by DBOpen() or DBNewBatch() // (i.e. not a snapshot). -DBStatus DBApplyBatchRepr(DBEngine* db, DBSlice repr); +DBStatus DBApplyBatchRepr(DBEngine* db, DBSlice repr, bool sync); // Returns the internal batch representation. The returned value is // only valid until the next call to a method using the DBEngine and diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go index 7f394f267de5..363c78f82fe2 100644 --- a/pkg/storage/engine/rocksdb_test.go +++ b/pkg/storage/engine/rocksdb_test.go @@ -76,7 +76,7 @@ func TestBatchIterReadOwnWrite(t *testing.T) { t.Fatal("uncommitted write seen by non-batch iter") } - if err := b.Commit(); err != nil { + if err := b.Commit(false /* !sync */); err != nil { t.Fatal(err) } @@ -375,7 +375,7 @@ func TestConcurrentBatch(t *testing.T) { // Concurrently write all the batches. for _, batch := range batches { go func(batch Batch) { - errChan <- batch.Commit() + errChan <- batch.Commit(false /* !sync */) }(batch) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 073b701cf108..a479fd8d2635 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -91,6 +91,8 @@ var txnAutoGC = true var tickQuiesced = envutil.EnvOrDefaultBool("COCKROACH_TICK_QUIESCED", true) +var syncRaftLog = envutil.EnvOrDefaultBool("COCKROACH_SYNC_RAFT_LOG", true) + // Whether to enable experimental support for proposer-evaluated KV. var propEvalKV = func() bool { enabled := envutil.EnvOrDefaultBool("COCKROACH_PROPOSER_EVALUATED_KV", false) @@ -714,7 +716,7 @@ func (r *Replica) destroyDataRaftMuLocked( if err := r.setTombstoneKey(ctx, batch, &consistentDesc); err != nil { return err } - if err := batch.Commit(); err != nil { + if err := batch.Commit(false); err != nil { return err } commitTime := timeutil.Now() @@ -2702,7 +2704,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, err } } - if err := batch.Commit(); err != nil { + writer.Close() + // Synchronously commit the batch with the Raft log entries and Raft hard + // state as we're promising not to lose this data. + if err := batch.Commit(syncRaftLog); err != nil { return stats, err } @@ -3782,8 +3787,9 @@ func (r *Replica) applyRaftCommand( batch := r.store.Engine().NewWriteOnlyBatch() defer batch.Close() + if writeBatch != nil { - if err := batch.ApplyBatchRepr(writeBatch.Data); err != nil { + if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( errors.Wrap(err, "unable to apply WriteBatch"))) } @@ -3821,7 +3827,7 @@ func (r *Replica) applyRaftCommand( // the future. writer.Close() - if err := batch.Commit(); err != nil { + if err := batch.Commit(false); err != nil { return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( errors.Wrap(err, "could not commit batch"))) } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 78d47ed8b3d1..b95f1222026e 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -617,7 +617,7 @@ func (r *Replica) applySnapshot( // Write the snapshot into the range. for _, batchRepr := range inSnap.Batches { - if err := batch.ApplyBatchRepr(batchRepr); err != nil { + if err := batch.ApplyBatchRepr(batchRepr, false); err != nil { return err } } @@ -664,7 +664,8 @@ func (r *Replica) applySnapshot( s.RaftAppliedIndex, snap.Metadata.Index) } - if err := batch.Commit(); err != nil { + // We've written Raft log entries, so we need to sync the WAL. + if err := batch.Commit(syncRaftLog); err != nil { return err } stats.commit = timeutil.Now() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 263ad0cfeb29..945923ab9ca0 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1040,7 +1040,7 @@ func (s *Store) migrate(ctx context.Context, desc roachpb.RangeDescriptor) { if err := migrate7310And6991(ctx, batch, desc); err != nil { log.Fatal(ctx, errors.Wrap(err, "during migration")) } - if err := batch.Commit(); err != nil { + if err := batch.Commit(false /* !sync */); err != nil { log.Fatal(ctx, errors.Wrap(err, "could not migrate Raft state")) } } @@ -1719,7 +1719,7 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error { } *ms = updatedMS - return batch.Commit() + return batch.Commit(true /* sync */) } // ClusterID accessor.