From c12ba6a5d0b4a0202eaafece2e16bf543cf4f277 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 9 Nov 2022 14:45:01 -0500 Subject: [PATCH] db: add DB.ApplyNoSyncWait for asynchronous apply ApplyNoSyncWait must only be used when WriteOptions.Sync is true. It enqueues the Batch to the WAL, adds to the memtable, and waits until the batch is visible in the memtable, and then returns to the caller. The caller is responsible for calling Batch.SyncWait to wait until the write to the WAL is fsynced. This change required splitting the WaitGroup in the Batch into two WaitGroups, so waiting for the visibility can happen separately from waiting for the WAL write. Additionally, the channel used as a semaphore for reserving space in the two lock-free queues is split into two channels, since dequeueing from these queues can happen in arbitrary order. There may be some performance overhead of pushing and popping from two channels instead of one. Informs https://github.com/cockroachdb/cockroach/issues/17500 See discussion thread https://github.com/cockroachdb/cockroach/pull/87050#pullrequestreview-1119047188 --- batch.go | 25 ++++++++++ batch_test.go | 26 +++++++++++ commit.go | 98 ++++++++++++++++++++++++++++++--------- commit_test.go | 54 +++++++++++++++++---- db.go | 27 ++++++++++- open.go | 1 + record/log_writer.go | 20 ++++++-- record/log_writer_test.go | 27 ++++++++++- 8 files changed, 242 insertions(+), 36 deletions(-) diff --git a/batch.go b/batch.go index 37fabc1270..1afb976cae 100644 --- a/batch.go +++ b/batch.go @@ -255,7 +255,22 @@ type Batch struct { // memtable. flushable *flushableBatch + // Synchronous Apply uses the commit WaitGroup for both publishing the + // seqnum and waiting for the WAL fsync (if needed). Asynchronous + // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit + // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for + // waiting for the WAL fsync. + // + // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with + // SyncWait is causing higher memory usage because of the time duration + // between when the sync is already done, and a goroutine calls SyncWait + // (followed by Batch.Close), we could separate out {fsyncWait, commitErr} + // into a separate struct that is allocated separately (using another + // sync.Pool), and only that struct needs to outlive Batch.Close (which + // could then be called immediately after ApplyNoSncWait). commit sync.WaitGroup + fsyncWait sync.WaitGroup + commitErr error applied uint32 // updated atomically } @@ -1089,6 +1104,7 @@ func (b *Batch) Reset() { b.rangeKeysSeqNum = 0 b.flushable = nil b.commit = sync.WaitGroup{} + b.fsyncWait = sync.WaitGroup{} b.commitErr = nil atomic.StoreUint32(&b.applied, 0) if b.data != nil { @@ -1205,6 +1221,15 @@ func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) { return data[v:], data[:v], true } +// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait. +func (b *Batch) SyncWait() error { + b.fsyncWait.Wait() + if b.commitErr != nil { + b.db = nil // prevent batch reuse on error + } + return b.commitErr +} + // BatchReader iterates over the entries contained in a batch. type BatchReader []byte diff --git a/batch_test.go b/batch_test.go index 5c9902222f..1687fd238f 100644 --- a/batch_test.go +++ b/batch_test.go @@ -223,6 +223,31 @@ func TestBatchEmpty(t *testing.T) { require.NoError(t, iter2.Close()) } +func TestBatchApplyNoSyncWait(t *testing.T) { + db, err := Open("", &Options{ + FS: vfs.NewMem(), + }) + require.NoError(t, err) + defer db.Close() + var batches []*Batch + options := &WriteOptions{Sync: true} + for i := 0; i < 10000; i++ { + b := db.NewBatch() + str := fmt.Sprintf("a%d", i) + require.NoError(t, b.Set([]byte(str), []byte(str), nil)) + require.NoError(t, db.ApplyNoSyncWait(b, options)) + val, closer, err := db.Get([]byte(str)) + require.NoError(t, err) + require.Equal(t, str, string(val)) + closer.Close() + batches = append(batches, b) + } + for _, b := range batches { + require.NoError(t, b.SyncWait()) + b.Close() + } +} + func TestBatchReset(t *testing.T) { db, err := Open("", &Options{ FS: vfs.NewMem(), @@ -244,6 +269,7 @@ func TestBatchReset(t *testing.T) { b.applied = 1 b.commitErr = errors.New("test-error") b.commit.Add(1) + b.fsyncWait.Add(1) require.Equal(t, uint32(3), b.Count()) require.Equal(t, uint64(1), b.countRangeDels) require.Equal(t, uint64(1), b.countRangeKeys) diff --git a/commit.go b/commit.go index 9bec350d5f..d93824c2d6 100644 --- a/commit.go +++ b/commit.go @@ -60,7 +60,7 @@ func (q *commitQueue) enqueue(b *Batch) { ptrs := atomic.LoadUint64(&q.headTail) head, tail := q.unpack(ptrs) if (tail+uint32(len(q.slots)))&(1< syncWAL +func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error { if b.Empty() { return nil } - p.sem <- struct{}{} + // Acquire semaphores. + p.commitQueueSem <- struct{}{} + if syncWAL { + p.logSyncQSem <- struct{}{} + } // Prepare the batch for committing: enqueuing the batch in the pending // queue, determining the batch sequence number and writing the data to the @@ -250,27 +287,41 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error { // // NB: We set Batch.commitErr on error so that the batch won't be a candidate // for reuse. See Batch.release(). - mem, err := p.prepare(b, syncWAL) + mem, err := p.prepare(b, syncWAL, noSyncWait) if err != nil { b.db = nil // prevent batch reuse on error + // NB: we are not doing <-p.commitQueueSem since the batch is still + // sitting in the pending queue. We should consider fixing this by also + // removing the batch from the pending queue. return err } // Apply the batch to the memtable. if err := p.env.apply(b, mem); err != nil { b.db = nil // prevent batch reuse on error + // NB: we are not doing <-p.commitQueueSem since the batch is still + // sitting in the pending queue. We should consider fixing this by also + // removing the batch from the pending queue. return err } // Publish the batch sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem - if b.commitErr != nil { - b.db = nil // prevent batch reuse on error + if !noSyncWait { + // Already waited for commit, so look at the error. + if b.commitErr != nil { + b.db = nil // prevent batch reuse on error + err = b.commitErr + } } - return b.commitErr + // Else noSyncWait. The LogWriter can be concurrently writing to + // b.commitErr. We will read b.commitErr in Batch.SyncWait after the + // LogWriter is done writing. + + return err } // AllocateSeqNum allocates count sequence numbers, invokes the prepare @@ -294,7 +345,7 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se b.setCount(uint32(count)) b.commit.Add(1) - p.sem <- struct{}{} + p.commitQueueSem <- struct{}{} p.mu.Lock() @@ -341,27 +392,30 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se // Publish the sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem } -func (p *commitPipeline) prepare(b *Batch, syncWAL bool) (*memTable, error) { +func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) { n := uint64(b.Count()) if n == invalidBatchCount { return nil, ErrInvalidBatch } - count := 1 - if syncWAL { - count++ - } - // count represents the waiting needed for publish, and optionally the - // waiting needed for the WAL sync. - b.commit.Add(count) - var syncWG *sync.WaitGroup var syncErr *error + commitCount := 1 if syncWAL { - syncWG, syncErr = &b.commit, &b.commitErr + syncErr = &b.commitErr + if noSyncWait { + syncWG = &b.fsyncWait + b.fsyncWait.Add(1) + } else { + syncWG = &b.commit + commitCount++ + } } + // commitCount represents the waiting needed for publish, and optionally the + // waiting needed for the WAL sync. + b.commit.Add(commitCount) p.mu.Lock() diff --git a/commit_test.go b/commit_test.go index c680ef6527..ddac396dea 100644 --- a/commit_test.go +++ b/commit_test.go @@ -31,6 +31,7 @@ type testCommitEnv struct { sync.Mutex buf []uint64 } + queueSemChan chan struct{} } func (e *testCommitEnv) env() commitEnv { @@ -49,10 +50,14 @@ func (e *testCommitEnv) apply(b *Batch, mem *memTable) error { return nil } -func (e *testCommitEnv) write(b *Batch, _ *sync.WaitGroup, _ *error) (*memTable, error) { +func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable, error) { n := int64(len(b.data)) atomic.AddInt64(&e.writePos, n) atomic.AddUint64(&e.writeCount, 1) + if wg != nil { + wg.Done() + <-e.queueSemChan + } return nil, nil } @@ -100,7 +105,7 @@ func TestCommitPipeline(t *testing.T) { defer wg.Done() var b Batch _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) - _ = p.Commit(&b, false) + _ = p.Commit(&b, false, false) }(i) } wg.Wait() @@ -120,6 +125,37 @@ func TestCommitPipeline(t *testing.T) { } } +func TestCommitPipelineSync(t *testing.T) { + var e testCommitEnv + p := newCommitPipeline(e.env()) + e.queueSemChan = p.logSyncQSem + + n := 10000 + if invariants.RaceEnabled { + // Under race builds we have to limit the concurrency or we hit the + // following error: + // + // race: limit on 8128 simultaneously alive goroutines is exceeded, dying + n = 1000 + } + + for _, noSyncWait := range []bool{false, true} { + t.Run(fmt.Sprintf("no-sync-wait=%t", noSyncWait), func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + var b Batch + _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) + _ = p.Commit(&b, true, noSyncWait) + }(i) + } + wg.Wait() + }) + } +} + func TestCommitPipelineAllocateSeqNum(t *testing.T) { var e testCommitEnv p := newCommitPipeline(e.env()) @@ -185,9 +221,7 @@ func TestCommitPipelineWALClose(t *testing.T) { } // A basic commitEnv which writes to a WAL. - wal := record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), - }) + var wal *record.LogWriter var walDone sync.WaitGroup testEnv := commitEnv{ logSeqNum: new(uint64), @@ -203,11 +237,15 @@ func TestCommitPipelineWALClose(t *testing.T) { }, } p := newCommitPipeline(testEnv) + wal = record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + QueueSemChan: p.logSyncQSem, + }) // Launch N (commitConcurrency) goroutines which each create a batch and // commit it with sync==true. Because of the syncDelayFile, none of these // operations can complete until syncDelayFile.done is closed. - errCh := make(chan error, cap(p.sem)) + errCh := make(chan error, cap(p.commitQueueSem)) walDone.Add(cap(errCh)) for i := 0; i < cap(errCh); i++ { go func(i int) { @@ -216,7 +254,7 @@ func TestCommitPipelineWALClose(t *testing.T) { errCh <- err return } - errCh <- p.Commit(b, true /* sync */) + errCh <- p.Commit(b, true /* sync */, false) }(i) } @@ -284,7 +322,7 @@ func BenchmarkCommitPipeline(b *testing.B) { batch := newBatch(nil) binary.BigEndian.PutUint64(buf, rng.Uint64()) batch.Set(buf, buf, nil) - if err := p.Commit(batch, true /* sync */); err != nil { + if err := p.Commit(batch, true /* sync */, false); err != nil { b.Fatal(err) } batch.release() diff --git a/db.go b/db.go index d5b294b507..ff5b718352 100644 --- a/db.go +++ b/db.go @@ -724,6 +724,30 @@ func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error { // // It is safe to modify the contents of the arguments after Apply returns. func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { + return d.applyInternal(batch, opts, false) +} + +// ApplyNoSyncWait must only be used when opts.Sync is true and the caller +// does not want to wait for the WAL fsync to happen. The method will return +// once the mutation is applied to the memtable and is visible (note that a +// mutation is visible before the WAL sync even in the wait case, so we have +// not weakened the durability semantics). The caller must call Batch.SyncWait +// to wait for the WAL fsync. The caller must not Close the batch without +// first calling Batch.SyncWait. +// +// RECOMMENDATION: Prefer using Apply unless you really understand why you +// need ApplyNoSyncWait. +// EXPERIMENTAL: API/feature subject to change. Do not yet use outside +// CockroachDB. +func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error { + if !opts.Sync { + return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false") + } + return d.applyInternal(batch, opts, true) +} + +// REQUIRES: noSyncWait => opts.Sync +func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error { if err := d.closed.Load(); err != nil { panic(err) } @@ -762,7 +786,7 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { if int(batch.memTableSize) >= d.largeBatchThreshold { batch.flushable = newFlushableBatch(batch, d.opts.Comparer) } - if err := d.commit.Commit(batch, sync); err != nil { + if err := d.commit.Commit(batch, sync, noSyncWait); err != nil { // There isn't much we can do on an error here. The commit pipeline will be // horked at this point. d.opts.Logger.Fatalf("%v", err) @@ -1988,6 +2012,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{ WALFsyncLatency: d.mu.log.metrics.fsyncLatency, WALMinSyncInterval: d.opts.WALMinSyncInterval, + QueueSemChan: d.commit.logSyncQSem, }) } diff --git a/open.go b/open.go index 4d0cf96990..0c3614c710 100644 --- a/open.go +++ b/open.go @@ -426,6 +426,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { logWriterConfig := record.LogWriterConfig{ WALMinSyncInterval: d.opts.WALMinSyncInterval, WALFsyncLatency: d.mu.log.metrics.fsyncLatency, + QueueSemChan: d.commit.logSyncQSem, } d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig) d.mu.versions.metrics.WAL.Files++ diff --git a/record/log_writer.go b/record/log_writer.go index 6098d19260..a79892d543 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -133,7 +133,8 @@ func (q *syncQueue) load() (head, tail, realLength uint32) { return head, tail, realLength } -func (q *syncQueue) pop(head, tail uint32, err error) error { +// REQUIRES: queueSemChan is non-nil. +func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error { if tail == head { // Queue is empty. return nil @@ -153,6 +154,10 @@ func (q *syncQueue) pop(head, tail uint32, err error) error { // will try to enqueue before we've "freed" space in the queue. atomic.AddUint64(&q.headTail, 1) wg.Done() + // Is always non-nil in production. + if queueSemChan != nil { + <-queueSemChan + } } return nil @@ -291,12 +296,20 @@ type LogWriter struct { // used for min-sync-interval. In normal operation this points to // time.AfterFunc. afterFunc func(d time.Duration, f func()) syncTimer + + // See the comment for LogWriterConfig.QueueSemChan. + queueSemChan chan struct{} } // LogWriterConfig is a struct used for configuring new LogWriters type LogWriterConfig struct { WALMinSyncInterval durationFunc WALFsyncLatency prometheus.Histogram + // QueueSemChan is an optional channel to pop from when popping from + // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents + // the syncQueue from overflowing (which will cause a panic). All production + // code ensures this is non-nil. + QueueSemChan chan struct{} } // CapAllocatedBlocks is the maximum number of blocks allocated by the @@ -319,6 +332,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon afterFunc: func(d time.Duration, f func()) syncTimer { return time.AfterFunc(d, f) }, + queueSemChan: logWriterConfig.QueueSemChan, } r.free.cond.L = &r.free.Mutex r.free.blocks = make([]*block, 0, CapAllocatedBlocks) @@ -441,7 +455,7 @@ func (w *LogWriter) flushLoop(context.Context) { // If flusher has an error, we propagate it to waiters. Note in spite of // error we consume the pending list above to free blocks for writers. if f.err != nil { - f.syncQ.pop(head, tail, f.err) + f.syncQ.pop(head, tail, f.err, w.queueSemChan) // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. @@ -518,7 +532,7 @@ func (w *LogWriter) flushPending( syncLatency, err = w.syncWithLatency() } f := &w.flusher - if popErr := f.syncQ.pop(head, tail, err); popErr != nil { + if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil { return synced, syncLatency, bytesWritten, popErr } } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 31574797e4..eaa603f700 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -42,7 +42,7 @@ func TestSyncQueue(t *testing.T) { return } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -98,7 +98,7 @@ func TestFlusherCond(t *testing.T) { } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -199,6 +199,29 @@ func TestSyncRecord(t *testing.T) { } } +func TestSyncRecordWithSignalChan(t *testing.T) { + f := &syncFile{} + semChan := make(chan struct{}, 5) + for i := 0; i < cap(semChan); i++ { + semChan <- struct{}{} + } + w := NewLogWriter(f, 0, LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + QueueSemChan: semChan, + }) + require.Equal(t, cap(semChan), len(semChan)) + var syncErr error + for i := 0; i < 5; i++ { + var syncWG sync.WaitGroup + syncWG.Add(1) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + require.NoError(t, err) + syncWG.Wait() + require.NoError(t, syncErr) + require.Equal(t, cap(semChan)-(i+1), len(semChan)) + } +} + type fakeTimer struct { f func() }