diff --git a/batch.go b/batch.go index 20dd566003f..49c814d495f 100644 --- a/batch.go +++ b/batch.go @@ -319,7 +319,11 @@ type BatchCommitStats struct { // commitPipeline.Commit. SemaphoreWaitDuration time.Duration // WALQueueWaitDuration is the wait time for allocating memory blocks in the - // LogWriter (due to the LogWriter not writing fast enough). + // LogWriter (due to the LogWriter not writing fast enough). At the moment + // this is duration is always zero because a single WAL will allow + // allocating memory blocks up to the entire memtable size. In the future, + // we may pipeline WALs and bound the WAL queued blocks separately, so this + // field is preserved for that possibility. WALQueueWaitDuration time.Duration // MemTableWriteStallDuration is the wait caused by a write stall due to too // many memtables (due to not flushing fast enough). diff --git a/batch_test.go b/batch_test.go index 5a0d73dc7d5..c0b1fe0030f 100644 --- a/batch_test.go +++ b/batch_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/testkeys" - "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -1233,28 +1232,6 @@ func TestBatchCommitStats(t *testing.T) { } } - // WAL queue stall funcs. - // - // The LogWriter gets changed when stalling/unstalling the memtable, so we - // need to use a hook to tell us about the latest LogWriter. - var unstallWALQueue func() - stallWALQueue := func() { - var unstallLatestWALQueue func() - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = func(w *record.LogWriter) { - // db.mu will be held when this is called. - unstallLatestWALQueue = w.ReserveAllFreeBlocksForTesting() - } - db.mu.log.registerLogWriterForTesting(db.mu.log.LogWriter) - unstallWALQueue = func() { - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = nil - unstallLatestWALQueue() - } - } - // Commit wait stall funcs. var unstallCommitWait func() stallCommitWait := func() { @@ -1268,12 +1245,9 @@ func TestBatchCommitStats(t *testing.T) { stallCommitSemaphore() stallMemtable() stallL0ReadAmp() - stallWALQueue() stallCommitWait() // Exceed initialMemTableSize -- this is needed to make stallMemtable work. - // It also exceeds record.blockSize, requiring a new block to be allocated, - // which is what we need for stallWALQueue to work. require.NoError(t, b.Set(make([]byte, initialMemTableSize), nil, nil)) var commitWG sync.WaitGroup @@ -1291,8 +1265,6 @@ func TestBatchCommitStats(t *testing.T) { time.Sleep(sleepDuration) unstallL0ReadAmp() time.Sleep(sleepDuration) - unstallWALQueue() - time.Sleep(sleepDuration) unstallCommitWait() // Wait for Apply to return. @@ -1303,10 +1275,6 @@ func TestBatchCommitStats(t *testing.T) { return errors.Errorf("SemaphoreWaitDuration %s is too low", stats.SemaphoreWaitDuration.String()) } - if expectedDuration > stats.WALQueueWaitDuration { - return errors.Errorf("WALQueueWaitDuration %s is too low", - stats.WALQueueWaitDuration.String()) - } if expectedDuration > stats.MemTableWriteStallDuration { return errors.Errorf("MemTableWriteStallDuration %s is too low", stats.MemTableWriteStallDuration.String()) diff --git a/commit_test.go b/commit_test.go index 0e1ad4ac3c7..ee627a113f0 100644 --- a/commit_test.go +++ b/commit_test.go @@ -245,7 +245,7 @@ func TestCommitPipelineWALClose(t *testing.T) { return nil }, write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) { - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return nil, err }, } @@ -316,7 +316,7 @@ func BenchmarkCommitPipeline(b *testing.B) { break } - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return mem, err }, } diff --git a/db.go b/db.go index 4b4a6aa3c41..dd9c4ed8fa7 100644 --- a/db.go +++ b/db.go @@ -909,7 +909,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem b.flushable.setSeqNum(b.SeqNum()) if !d.opts.DisableWAL { var err error - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } @@ -947,7 +947,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem } if b.flushable == nil { - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } diff --git a/record/log_writer.go b/record/log_writer.go index 90f8c2ef340..7cc7a83063b 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -266,10 +266,7 @@ type LogWriter struct { block *block free struct { sync.Mutex - // Condition variable used to signal a block is freed. - cond sync.Cond - blocks []*block - allocated int + blocks []*block } flusher struct { @@ -313,9 +310,10 @@ type LogWriterConfig struct { QueueSemChan chan struct{} } -// CapAllocatedBlocks is the maximum number of blocks allocated by the -// LogWriter. -const CapAllocatedBlocks = 16 +// initialAllocatedBlocksCap is the initial capacity of the various slices +// intended to hold LogWriter blocks. The LogWriter may allocate more blocks +// than this threshold allows. +const initialAllocatedBlocksCap = 32 // NewLogWriter returns a new LogWriter. func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter { @@ -335,9 +333,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon }, queueSemChan: logWriterConfig.QueueSemChan, } - r.free.cond.L = &r.free.Mutex - r.free.blocks = make([]*block, 0, CapAllocatedBlocks) - r.free.allocated = 1 + r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap) r.block = &block{} r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ) r.flusher.closed = make(chan struct{}) @@ -405,8 +401,12 @@ func (w *LogWriter) flushLoop(context.Context) { // the flush work (w.block.written.Load()). // The list of full blocks that need to be written. This is copied from - // f.pending on every loop iteration, though the number of elements is small - // (usually 1, max 16). + // f.pending on every loop iteration, though the number of elements is + // usually small (most frequently 1). In the case of the WAL LogWriter, the + // number of blocks is bounded by the size of the WAL's corresponding + // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables, + // this works out to at most 2048 elements if the entirety of the memtable's + // contents are queued. pending := make([]*block, 0, cap(f.pending)) for { for { @@ -432,8 +432,7 @@ func (w *LogWriter) flushLoop(context.Context) { // Found work to do, so no longer idle. workStartTime := time.Now() idleDuration := workStartTime.Sub(idleStartTime) - pending = pending[:len(f.pending)] - copy(pending, f.pending) + pending = append(pending[:0], f.pending...) f.pending = f.pending[:0] f.metrics.PendingBufferLen.AddSample(int64(len(pending))) @@ -556,28 +555,18 @@ func (w *LogWriter) flushBlock(b *block) error { b.flushed = 0 w.free.Lock() w.free.blocks = append(w.free.blocks, b) - w.free.cond.Signal() w.free.Unlock() return nil } // queueBlock queues the current block for writing to the underlying writer, // allocates a new block and reserves space for the next header. -func (w *LogWriter) queueBlock() (waitDuration time.Duration) { +func (w *LogWriter) queueBlock() { // Allocate a new block, blocking until one is available. We do this first // because w.block is protected by w.flusher.Mutex. w.free.Lock() if len(w.free.blocks) == 0 { - if w.free.allocated < cap(w.free.blocks) { - w.free.allocated++ - w.free.blocks = append(w.free.blocks, &block{}) - } else { - now := time.Now() - for len(w.free.blocks) == 0 { - w.free.cond.Wait() - } - waitDuration = time.Since(now) - } + w.free.blocks = append(w.free.blocks, &block{}) } nextBlock := w.free.blocks[len(w.free.blocks)-1] w.free.blocks = w.free.blocks[:len(w.free.blocks)-1] @@ -592,28 +581,6 @@ func (w *LogWriter) queueBlock() (waitDuration time.Duration) { f.Unlock() w.blockNum++ - return waitDuration -} - -// ReserveAllFreeBlocksForTesting is used to only for testing. -func (w *LogWriter) ReserveAllFreeBlocksForTesting() (releaseFunc func()) { - w.free.Lock() - defer w.free.Unlock() - free := w.free.blocks - w.free.blocks = nil - return func() { - w.free.Lock() - defer w.free.Unlock() - // It is possible that someone has pushed a free block and w.free.blocks - // is no longer nil. That is harmless. Also, the waiter loops on the - // condition len(w.free.blocks) == 0, so to actually unblock it we need to - // give it a free block. - if len(free) == 0 { - free = append(free, &block{}) - } - w.free.blocks = free - w.free.cond.Broadcast() - } } // Close flushes and syncs any unwritten data and closes the writer. @@ -665,7 +632,7 @@ func (w *LogWriter) Close() error { // of the record. // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) WriteRecord(p []byte) (int64, error) { - logSize, _, err := w.SyncRecord(p, nil, nil) + logSize, err := w.SyncRecord(p, nil, nil) return logSize, err } @@ -676,9 +643,9 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) { // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) SyncRecord( p []byte, wg *sync.WaitGroup, err *error, -) (logSize int64, waitDuration time.Duration, err2 error) { +) (logSize int64, err2 error) { if w.err != nil { - return -1, 0, w.err + return -1, w.err } // The `i == 0` condition ensures we handle empty records. Such records can @@ -686,9 +653,7 @@ func (w *LogWriter) SyncRecord( // MANIFEST is currently written using Writer, it is good to support the same // semantics with LogWriter. for i := 0; i == 0 || len(p) > 0; i++ { - var wd time.Duration - p, wd = w.emitFragment(i, p) - waitDuration += wd + p = w.emitFragment(i, p) } if wg != nil { @@ -707,7 +672,7 @@ func (w *LogWriter) SyncRecord( // race with our read. That's ok because the only error we could be seeing is // one to syncing for which the caller can receive notification of by passing // in a non-nil err argument. - return offset, waitDuration, nil + return offset, nil } // Size returns the current size of the file. @@ -728,7 +693,7 @@ func (w *LogWriter) emitEOFTrailer() { b.written.Store(i + int32(recyclableHeaderSize)) } -func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDuration time.Duration) { +func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) { b := w.block i := b.written.Load() first := n == 0 @@ -762,9 +727,9 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDurati for i := b.written.Load(); i < blockSize; i++ { b.buf[i] = 0 } - waitDuration = w.queueBlock() + w.queueBlock() } - return p[r:], waitDuration + return p[r:] } // Metrics must be called after Close. The callee will no longer modify the diff --git a/record/log_writer_test.go b/record/log_writer_test.go index e17577b74e4..c83544008a8 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -6,6 +6,7 @@ package record import ( "bytes" + "fmt" "math" "sort" "sync" @@ -14,7 +15,10 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/errorfs" + "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/pebble/vfs/vfstest" "github.com/prometheus/client_golang/prometheus" prometheusgo "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -148,7 +152,7 @@ func TestSyncError(t *testing.T) { var syncErr error var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() if injectedErr != syncErr { @@ -186,7 +190,7 @@ func TestSyncRecord(t *testing.T) { for i := 0; i < 100000; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - offset, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + offset, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -214,7 +218,7 @@ func TestSyncRecordWithSignalChan(t *testing.T) { for i := 0; i < 5; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -273,7 +277,7 @@ func TestMinSyncInterval(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -344,7 +348,7 @@ func TestMinSyncIntervalClose(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -379,7 +383,7 @@ func TestMetricsWithoutSync(t *testing.T) { f := &syncFileWithWait{} f.writeWG.Add(1) w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) - offset, _, err := w.SyncRecord([]byte("hello"), nil, nil) + offset, err := w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) const recordSize = 16 require.EqualValues(t, recordSize, offset) @@ -388,7 +392,7 @@ func TestMetricsWithoutSync(t *testing.T) { // constitutes ~14 blocks (each 32KB). const numRecords = 28 << 10 for i := 0; i < numRecords; i++ { - _, _, err = w.SyncRecord([]byte("hello"), nil, nil) + _, err = w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) } // Unblock the flush loop. It will run once or twice to write these blocks, @@ -430,7 +434,7 @@ func TestMetricsWithSync(t *testing.T) { wg.Add(100) for i := 0; i < 100; i++ { var syncErr error - _, _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) require.NoError(t, err) } // Unblock the flush loop. It may have run once or twice for these writes, @@ -506,3 +510,73 @@ func valueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float return val } + +// TestQueueWALBlocks tests queueing many un-flushed WAL blocks when syncing is +// blocked. +func TestQueueWALBlocks(t *testing.T) { + blockSyncCh := make(chan struct{}, 1) + f := errorfs.WrapFile(vfstest.DiscardFile, errorfs.InjectorFunc(func(op errorfs.Op, path string) error { + if op == errorfs.OpFileSync { + <-blockSyncCh + } + return nil + })) + w := NewLogWriter(f, 0, LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + }) + const numBlocks = 1024 + var b [blockSize]byte + var logSize int64 + for i := 0; i < numBlocks; i++ { + var err error + logSize, err = w.SyncRecord(b[:], nil, nil) + if err != nil { + t.Fatal(err) + } + } + m := w.Metrics() + t.Logf("LogSize is %s", humanize.Bytes.Int64(logSize)) + t.Logf("Mean pending buffer len is %.2f", m.PendingBufferLen.Mean()) + require.GreaterOrEqual(t, logSize, int64(numBlocks*blockSize)) + close(blockSyncCh) + require.NoError(t, w.Close()) +} + +// BenchmarkQueueWALBlocks exercises queueing within the LogWriter. It can be +// useful to measure allocations involved when flushing is slow enough to +// accumulate a large backlog fo queued blocks. +func BenchmarkQueueWALBlocks(b *testing.B) { + const dataVolume = 64 << 20 /* 64 MB */ + for _, writeSize := range []int64{64, 512, 1024, 2048, 32768} { + b.Run(fmt.Sprintf("record-size=%s", humanize.Bytes.Int64(writeSize)), func(b *testing.B) { + record := make([]byte, writeSize) + numRecords := int(dataVolume / writeSize) + + for j := 0; j < b.N; j++ { + b.StopTimer() + blockSyncCh := make(chan struct{}, 1) + f := errorfs.WrapFile(vfstest.DiscardFile, errorfs.InjectorFunc(func(op errorfs.Op, path string) error { + if op == errorfs.OpFileSync { + <-blockSyncCh + } + return nil + })) + w := NewLogWriter(f, 0, LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + }) + + b.StartTimer() + for n := numRecords; n > 0; n-- { + if _, err := w.SyncRecord(record[:], nil, nil); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + + b.SetBytes(dataVolume) + close(blockSyncCh) + require.NoError(b, w.Close()) + } + }) + } +}