From 00f60854d65c93d990df5d57934b1eba968e27f8 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sat, 13 Apr 2019 19:18:52 -0400 Subject: [PATCH] Add support for WAL preallocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Linux, preallocation makes a huge difference in sync performance. WAL reuse (aka recycling...not implemented yet) provides a further improvement. And direct IO provides more stable performance on GCE Local SSD. Note that direct IO implies WAL reuse. The numbers below were gathered on an AWS m5.xlarge. name time/op DirectIOWrite/wsize=4096-4 34.4µs ± 1% DirectIOWrite/wsize=8192-4 61.0µs ± 0% DirectIOWrite/wsize=16384-4 122µs ± 0% DirectIOWrite/wsize=32768-4 244µs ± 0% SyncWrite/no-prealloc/wsize=64-4 128µs ± 8% SyncWrite/no-prealloc/wsize=512-4 146µs ± 0% SyncWrite/no-prealloc/wsize=1024-4 155µs ± 0% SyncWrite/no-prealloc/wsize=2048-4 172µs ± 0% SyncWrite/no-prealloc/wsize=4096-4 206µs ± 0% SyncWrite/no-prealloc/wsize=8192-4 206µs ± 0% SyncWrite/no-prealloc/wsize=16384-4 274µs ± 0% SyncWrite/no-prealloc/wsize=32768-4 407µs ± 4% SyncWrite/prealloc-4MB/wsize=64-4 34.2µs ± 7% SyncWrite/prealloc-4MB/wsize=512-4 47.5µs ± 0% SyncWrite/prealloc-4MB/wsize=1024-4 60.4µs ± 0% SyncWrite/prealloc-4MB/wsize=2048-4 86.4µs ± 0% SyncWrite/prealloc-4MB/wsize=4096-4 137µs ± 0% SyncWrite/prealloc-4MB/wsize=8192-4 143µs ± 7% SyncWrite/prealloc-4MB/wsize=16384-4 214µs ± 0% SyncWrite/prealloc-4MB/wsize=32768-4 337µs ± 0% SyncWrite/reuse/wsize=64-4 31.6µs ± 4% SyncWrite/reuse/wsize=512-4 31.8µs ± 4% SyncWrite/reuse/wsize=1024-4 32.4µs ± 7% SyncWrite/reuse/wsize=2048-4 31.3µs ± 1% SyncWrite/reuse/wsize=4096-4 32.2µs ± 5% SyncWrite/reuse/wsize=8192-4 61.1µs ± 0% SyncWrite/reuse/wsize=16384-4 122µs ± 0% SyncWrite/reuse/wsize=32768-4 244µs ± 0% See #41 --- db.go | 15 +++- open.go | 5 +- sstable/writer.go | 4 +- storage/preallocate_darwin.go | 64 ++++++++++++++++ storage/preallocate_linux.go | 34 +++++++++ storage/syncing_file.go | 61 +++++++++++---- storage/syncing_file_linux.go | 15 ++-- storage/syncing_file_linux_test.go | 81 ++++++++++++++++++++ storage/syncing_file_test.go | 117 ++++++++++++++++++++++++++++- 9 files changed, 367 insertions(+), 29 deletions(-) create mode 100644 storage/preallocate_darwin.go create mode 100644 storage/preallocate_linux.go create mode 100644 storage/syncing_file_linux_test.go diff --git a/db.go b/db.go index b2da324ae08..3b5968f91ba 100644 --- a/db.go +++ b/db.go @@ -662,6 +662,16 @@ func (d *DB) AsyncFlush() error { return err } +func (d *DB) walPreallocateSize() int { + // Set the WAL preallocate size to 110% of the memtable size. Note that there + // is a bit of apples and oranges in units here as the memtabls size + // corresponds to the memory usage of the memtable while the WAL size is the + // size of the batches (plus overhead) stored in the WAL. + size := d.opts.MemTableSize + size = (size / 10) + size + return size +} + func (d *DB) throttleWrite() { if len(d.mu.versions.currentVersion().files[0]) <= d.opts.L0SlowdownWritesThreshold { return @@ -726,7 +736,10 @@ func (d *DB) makeRoomForWrite(b *Batch) error { newLogFile.Close() } } - newLogFile = storage.NewSyncingFile(newLogFile, d.opts.BytesPerSync) + newLogFile = storage.NewSyncingFile(newLogFile, storage.SyncingFileOptions{ + BytesPerSync: d.opts.BytesPerSync, + PreallocateSize: d.walPreallocateSize(), + }) d.mu.Lock() d.mu.mem.switching = false diff --git a/open.go b/open.go index 9a74431984e..558fd882210 100644 --- a/open.go +++ b/open.go @@ -173,7 +173,10 @@ func Open(dirname string, opts *db.Options) (*DB, error) { if err != nil { return nil, err } - logFile = storage.NewSyncingFile(logFile, d.opts.BytesPerSync) + logFile = storage.NewSyncingFile(logFile, storage.SyncingFileOptions{ + BytesPerSync: d.opts.BytesPerSync, + PreallocateSize: d.walPreallocateSize(), + }) d.mu.log.LogWriter = record.NewLogWriter(logFile) // Write a new manifest to disk. diff --git a/sstable/writer.go b/sstable/writer.go index e74d5ae3676..b432ce410a9 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -537,7 +537,9 @@ func NewWriter(f storage.File, o *db.Options, lo db.LevelOptions) *Writer { lo = *lo.EnsureDefaults() if f != nil { - f = storage.NewSyncingFile(f, o.BytesPerSync) + f = storage.NewSyncingFile(f, storage.SyncingFileOptions{ + BytesPerSync: o.BytesPerSync, + }) } w := &Writer{ diff --git a/storage/preallocate_darwin.go b/storage/preallocate_darwin.go new file mode 100644 index 00000000000..946ab56b7a7 --- /dev/null +++ b/storage/preallocate_darwin.go @@ -0,0 +1,64 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build darwin + +package storage + +import ( + "syscall" + "unsafe" +) + +func preallocExtend(fd uintptr, offset, length int64) error { + if err := preallocFixed(fd, offset, length); err != nil { + return err + } + return syscall.Ftruncate(int(fd), offset+length) +} + +func preallocFixed(fd uintptr, offset, length int64) error { + // allocate all requested space or no space at all + // TODO: allocate contiguous space on disk with F_ALLOCATECONTIG flag + fstore := &syscall.Fstore_t{ + Flags: syscall.F_ALLOCATEALL, + Posmode: syscall.F_PEOFPOSMODE, + Length: length} + p := unsafe.Pointer(fstore) + _, _, errno := syscall.Syscall(syscall.SYS_FCNTL, fd, uintptr(syscall.F_PREALLOCATE), uintptr(p)) + if errno == 0 || errno == syscall.ENOTSUP { + return nil + } + + // wrong argument to fallocate syscall + if errno == syscall.EINVAL { + // filesystem "st_blocks" are allocated in the units of + // "Allocation Block Size" (run "diskutil info /" command) + var stat syscall.Stat_t + syscall.Fstat(int(fd), &stat) + + // syscall.Statfs_t.Bsize is "optimal transfer block size" + // and contains matching 4096 value when latest OS X kernel + // supports 4,096 KB filesystem block size + var statfs syscall.Statfs_t + syscall.Fstatfs(int(fd), &statfs) + blockSize := int64(statfs.Bsize) + + if stat.Blocks*blockSize >= offset+length { + // enough blocks are already allocated + return nil + } + } + return errno +} diff --git a/storage/preallocate_linux.go b/storage/preallocate_linux.go new file mode 100644 index 00000000000..d93018ca4ea --- /dev/null +++ b/storage/preallocate_linux.go @@ -0,0 +1,34 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package storage + +import ( + "syscall" +) + +func preallocExtend(fd uintptr, offset, length int64) error { + err := syscall.Fallocate(int(fd), 0 /* mode */, offset, length) + if err != nil { + errno, ok := err.(syscall.Errno) + // not supported; fallback + // fallocate EINTRs frequently in some environments; fallback + if ok && (errno == syscall.ENOTSUP || errno == syscall.EINTR) { + return syscall.Ftruncate(int(fd), offset+length) + } + } + return err +} diff --git a/storage/syncing_file.go b/storage/syncing_file.go index 8bd5fe13aa2..5a88e222bf3 100644 --- a/storage/syncing_file.go +++ b/storage/syncing_file.go @@ -8,16 +8,24 @@ import ( "sync/atomic" ) +// SyncingFileOptions holds the options for a syncingFile. +type SyncingFileOptions struct { + BytesPerSync int + PreallocateSize int +} + type syncingFile struct { File - fd int - useSyncRange bool - bytesPerSync int64 - atomic struct { + fd uintptr + useSyncRange bool + bytesPerSync int64 + preallocateSize int64 + atomic struct { offset int64 syncOffset int64 } - syncTo func(offset int64) error + preallocatedBlocks int64 + syncTo func(offset int64) error } // NewSyncingFile wraps a writable file and ensures that data is synced @@ -26,21 +34,28 @@ type syncingFile struct { // decides to write out a large chunk of dirty filesystem buffers. If // bytesPerSync is zero, the original file is returned as no syncing is // requested. -func NewSyncingFile(f File, bytesPerSync int) File { - if bytesPerSync <= 0 { - return f - } +func NewSyncingFile(f File, opts SyncingFileOptions) File { s := &syncingFile{ - File: f, - fd: -1, - bytesPerSync: int64(bytesPerSync), + File: f, + bytesPerSync: int64(opts.BytesPerSync), + preallocateSize: int64(opts.PreallocateSize), + } + + type fd interface { + Fd() uintptr } + if d, ok := f.(fd); ok { + s.fd = d.Fd() + } + s.init() return s } // NB: syncingFile.Write is unsafe for concurrent use! func (f *syncingFile) Write(p []byte) (n int, err error) { + _ = f.preallocate(atomic.LoadInt64(&f.atomic.offset) + int64(n)) + n, err = f.File.Write(p) if err != nil { return n, err @@ -54,6 +69,22 @@ func (f *syncingFile) Write(p []byte) (n int, err error) { return n, nil } +func (f *syncingFile) preallocate(offset int64) error { + if f.fd == 0 || f.preallocateSize == 0 { + return nil + } + + newPreallocatedBlocks := (offset + f.preallocateSize - 1) / f.preallocateSize + if newPreallocatedBlocks <= f.preallocatedBlocks { + return nil + } + + length := f.preallocateSize * (newPreallocatedBlocks - f.preallocatedBlocks) + offset = f.preallocateSize * f.preallocatedBlocks + f.preallocatedBlocks = newPreallocatedBlocks + return preallocExtend(f.fd, offset, length) +} + func (f *syncingFile) ratchetSyncOffset(offset int64) { for { syncOffset := atomic.LoadInt64(&f.atomic.syncOffset) @@ -76,6 +107,10 @@ func (f *syncingFile) Sync() error { } func (f *syncingFile) maybeSync() error { + if f.bytesPerSync <= 0 { + return nil + } + // From the RocksDB source: // // We try to avoid sync to the last 1MB of data. For two reasons: @@ -98,7 +133,7 @@ func (f *syncingFile) maybeSync() error { return nil } - if f.fd < 0 { + if f.fd == 0 { return f.Sync() } return f.syncTo(syncToOffset) diff --git a/storage/syncing_file_linux.go b/storage/syncing_file_linux.go index 670b7a3aacf..06984a056e5 100644 --- a/storage/syncing_file_linux.go +++ b/storage/syncing_file_linux.go @@ -7,14 +7,13 @@ package storage import ( - "os" "sync/atomic" "syscall" ) -func isSyncRangeSupported(fd int) bool { +func isSyncRangeSupported(fd uintptr) bool { var stat syscall.Statfs_t - if err := syscall.Fstatfs(fd, &stat); err != nil { + if err := syscall.Fstatfs(int(fd), &stat); err != nil { return false } @@ -32,11 +31,9 @@ func isSyncRangeSupported(fd int) bool { } func (f *syncingFile) init() { - t, ok := f.File.(*os.File) - if !ok { + if f.fd == 0 { return } - f.fd = int(t.Fd()) f.useSyncRange = isSyncRangeSupported(f.fd) if f.useSyncRange { f.syncTo = f.syncToRange @@ -46,10 +43,10 @@ func (f *syncingFile) init() { } func (f *syncingFile) syncData() error { - if f.fd < 0 { + if f.fd == 0 { return f.File.Sync() } - return syscall.Fdatasync(f.fd) + return syscall.Fdatasync(int(f.fd)) } func (f *syncingFile) syncToFdatasync(_ int64) error { @@ -65,5 +62,5 @@ func (f *syncingFile) syncToRange(offset int64) error { ) f.ratchetSyncOffset(offset) - return syscall.SyncFileRange(f.fd, 0, offset, write|waitBefore) + return syscall.SyncFileRange(int(f.fd), 0, offset, write|waitBefore) } diff --git a/storage/syncing_file_linux_test.go b/storage/syncing_file_linux_test.go new file mode 100644 index 00000000000..e18e39b0925 --- /dev/null +++ b/storage/syncing_file_linux_test.go @@ -0,0 +1,81 @@ +// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build linux + +package storage + +import ( + "fmt" + "io/ioutil" + "os" + "syscall" + "testing" + "unsafe" +) + +func BenchmarkDirectIOWrite(b *testing.B) { + const targetSize = 16 << 20 + const alignment = 4096 + + var wsizes []int + if testing.Verbose() { + wsizes = []int{4 << 10, 8 << 10, 16 << 10, 32 << 10} + } else { + wsizes = []int{4096} + } + + for _, wsize := range wsizes { + b.Run(fmt.Sprintf("wsize=%d", wsize), func(b *testing.B) { + tmpf, err := ioutil.TempFile("", "pebble-db-syncing-file-") + if err != nil { + b.Fatal(err) + } + filename := tmpf.Name() + _ = tmpf.Close() + defer os.Remove(filename) + + var f *os.File + var size int + buf := make([]byte, wsize+alignment) + if a := uintptr(unsafe.Pointer(&buf[0])) & uintptr(alignment-1); a != 0 { + buf = buf[alignment-a:] + } + buf = buf[:wsize] + init := true + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if f == nil { + b.StopTimer() + f, err = os.OpenFile(filename, syscall.O_DIRECT|os.O_RDWR, 0666) + if err != nil { + b.Fatal(err) + } + if init { + for size = 0; size < targetSize; size += len(buf) { + if _, err := f.WriteAt(buf, int64(size)); err != nil { + b.Fatal(err) + } + } + } + if err := f.Sync(); err != nil { + b.Fatal(err) + } + size = 0 + b.StartTimer() + } + if _, err := f.WriteAt(buf, int64(size)); err != nil { + b.Fatal(err) + } + size += len(buf) + if size >= targetSize { + _ = f.Close() + f = nil + } + } + b.StopTimer() + }) + } +} diff --git a/storage/syncing_file_test.go b/storage/syncing_file_test.go index fa4e45f779e..5f8fabaa935 100644 --- a/storage/syncing_file_test.go +++ b/storage/syncing_file_test.go @@ -5,6 +5,7 @@ package storage import ( + "fmt" "io/ioutil" "os" "sync/atomic" @@ -25,11 +26,11 @@ func TestSyncingFile(t *testing.T) { if err != nil { t.Fatal(err) } - s := NewSyncingFile(f, 0) - if s != f { - t.Fatalf("unexpected wrapping: %p != %p", f, s) + s := NewSyncingFile(f, SyncingFileOptions{}) + if s == f { + t.Fatalf("failed to wrap: %p != %p", f, s) } - s = NewSyncingFile(f, 8<<10 /* 8 KB */) + s = NewSyncingFile(f, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}) s.(*syncingFile).fd = 1 s.(*syncingFile).syncTo = func(offset int64) error { s.(*syncingFile).ratchetSyncOffset(offset) @@ -59,3 +60,111 @@ func TestSyncingFile(t *testing.T) { } } } + +func BenchmarkSyncWrite(b *testing.B) { + const targetSize = 16 << 20 + + var wsizes []int + if testing.Verbose() { + wsizes = []int{64, 512, 1 << 10, 2 << 10, 4 << 10, 8 << 10, 16 << 10, 32 << 10} + } else { + wsizes = []int{64} + } + + run := func(b *testing.B, wsize int, newFile func(string) File) { + tmpf, err := ioutil.TempFile("", "pebble-db-syncing-file-") + if err != nil { + b.Fatal(err) + } + filename := tmpf.Name() + _ = tmpf.Close() + defer os.Remove(filename) + + var f File + var size int + buf := make([]byte, wsize) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if f == nil { + b.StopTimer() + f = newFile(filename) + size = 0 + b.StartTimer() + } + if _, err := f.Write(buf); err != nil { + b.Fatal(err) + } + if err := f.Sync(); err != nil { + b.Fatal(err) + } + size += len(buf) + if size >= targetSize { + _ = f.Close() + f = nil + } + } + b.StopTimer() + } + + b.Run("no-prealloc", func(b *testing.B) { + for _, wsize := range wsizes { + b.Run(fmt.Sprintf("wsize=%d", wsize), func(b *testing.B) { + run(b, wsize, func(filename string) File { + _ = os.Remove(filename) + t, err := os.Create(filename) + if err != nil { + b.Fatal(err) + } + return NewSyncingFile(t, SyncingFileOptions{PreallocateSize: 0}) + }) + }) + } + }) + + b.Run("prealloc-4MB", func(b *testing.B) { + for _, wsize := range wsizes { + b.Run(fmt.Sprintf("wsize=%d", wsize), func(b *testing.B) { + run(b, wsize, func(filename string) File { + _ = os.Remove(filename) + t, err := os.Create(filename) + if err != nil { + b.Fatal(err) + } + return NewSyncingFile(t, SyncingFileOptions{PreallocateSize: 4 << 20}) + }) + }) + } + }) + + b.Run("reuse", func(b *testing.B) { + for _, wsize := range wsizes { + b.Run(fmt.Sprintf("wsize=%d", wsize), func(b *testing.B) { + init := true + run(b, wsize, func(filename string) File { + if init { + init = false + + t, err := os.OpenFile(filename, os.O_RDWR, 0755) + if err != nil { + b.Fatal(err) + } + if _, err := t.Write(make([]byte, targetSize)); err != nil { + b.Fatal(err) + } + if err := t.Sync(); err != nil { + b.Fatal(err) + } + t.Close() + } + + t, err := os.OpenFile(filename, os.O_RDWR, 0755) + if err != nil { + b.Fatal(err) + } + return NewSyncingFile(t, SyncingFileOptions{PreallocateSize: 0}) + }) + }) + } + }) +}