Skip to content

Commit

Permalink
Add support for WAL preallocation
Browse files Browse the repository at this point in the history
On Linux, preallocation makes a huge difference in sync performance.
WAL reuse (aka recycling...not implemented yet) provides a further
improvement. And direct IO provides more stable performance on GCE Local
SSD. Note that direct IO implies WAL reuse. The numbers below were
gathered on an AWS m5.xlarge.

name                                  time/op
DirectIOWrite/wsize=4096-4            34.4µs ± 1%
DirectIOWrite/wsize=8192-4            61.0µs ± 0%
DirectIOWrite/wsize=16384-4            122µs ± 0%
DirectIOWrite/wsize=32768-4            244µs ± 0%
SyncWrite/no-prealloc/wsize=64-4       128µs ± 8%
SyncWrite/no-prealloc/wsize=512-4      146µs ± 0%
SyncWrite/no-prealloc/wsize=1024-4     155µs ± 0%
SyncWrite/no-prealloc/wsize=2048-4     172µs ± 0%
SyncWrite/no-prealloc/wsize=4096-4     206µs ± 0%
SyncWrite/no-prealloc/wsize=8192-4     206µs ± 0%
SyncWrite/no-prealloc/wsize=16384-4    274µs ± 0%
SyncWrite/no-prealloc/wsize=32768-4    407µs ± 4%
SyncWrite/prealloc-4MB/wsize=64-4     34.2µs ± 7%
SyncWrite/prealloc-4MB/wsize=512-4    47.5µs ± 0%
SyncWrite/prealloc-4MB/wsize=1024-4   60.4µs ± 0%
SyncWrite/prealloc-4MB/wsize=2048-4   86.4µs ± 0%
SyncWrite/prealloc-4MB/wsize=4096-4    137µs ± 0%
SyncWrite/prealloc-4MB/wsize=8192-4    143µs ± 7%
SyncWrite/prealloc-4MB/wsize=16384-4   214µs ± 0%
SyncWrite/prealloc-4MB/wsize=32768-4   337µs ± 0%
SyncWrite/reuse/wsize=64-4            31.6µs ± 4%
SyncWrite/reuse/wsize=512-4           31.8µs ± 4%
SyncWrite/reuse/wsize=1024-4          32.4µs ± 7%
SyncWrite/reuse/wsize=2048-4          31.3µs ± 1%
SyncWrite/reuse/wsize=4096-4          32.2µs ± 5%
SyncWrite/reuse/wsize=8192-4          61.1µs ± 0%
SyncWrite/reuse/wsize=16384-4          122µs ± 0%
SyncWrite/reuse/wsize=32768-4          244µs ± 0%

See #41
  • Loading branch information
petermattis committed Apr 17, 2019
1 parent 86ac56a commit 00f6085
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 29 deletions.
15 changes: 14 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,16 @@ func (d *DB) AsyncFlush() error {
return err
}

func (d *DB) walPreallocateSize() int {
// Set the WAL preallocate size to 110% of the memtable size. Note that there
// is a bit of apples and oranges in units here as the memtabls size
// corresponds to the memory usage of the memtable while the WAL size is the
// size of the batches (plus overhead) stored in the WAL.
size := d.opts.MemTableSize
size = (size / 10) + size
return size
}

func (d *DB) throttleWrite() {
if len(d.mu.versions.currentVersion().files[0]) <= d.opts.L0SlowdownWritesThreshold {
return
Expand Down Expand Up @@ -726,7 +736,10 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
newLogFile.Close()
}
}
newLogFile = storage.NewSyncingFile(newLogFile, d.opts.BytesPerSync)
newLogFile = storage.NewSyncingFile(newLogFile, storage.SyncingFileOptions{
BytesPerSync: d.opts.BytesPerSync,
PreallocateSize: d.walPreallocateSize(),
})

d.mu.Lock()
d.mu.mem.switching = false
Expand Down
5 changes: 4 additions & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
if err != nil {
return nil, err
}
logFile = storage.NewSyncingFile(logFile, d.opts.BytesPerSync)
logFile = storage.NewSyncingFile(logFile, storage.SyncingFileOptions{
BytesPerSync: d.opts.BytesPerSync,
PreallocateSize: d.walPreallocateSize(),
})
d.mu.log.LogWriter = record.NewLogWriter(logFile)

// Write a new manifest to disk.
Expand Down
4 changes: 3 additions & 1 deletion sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,9 @@ func NewWriter(f storage.File, o *db.Options, lo db.LevelOptions) *Writer {
lo = *lo.EnsureDefaults()

if f != nil {
f = storage.NewSyncingFile(f, o.BytesPerSync)
f = storage.NewSyncingFile(f, storage.SyncingFileOptions{
BytesPerSync: o.BytesPerSync,
})
}

w := &Writer{
Expand Down
64 changes: 64 additions & 0 deletions storage/preallocate_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build darwin

package storage

import (
"syscall"
"unsafe"
)

func preallocExtend(fd uintptr, offset, length int64) error {
if err := preallocFixed(fd, offset, length); err != nil {
return err
}
return syscall.Ftruncate(int(fd), offset+length)
}

func preallocFixed(fd uintptr, offset, length int64) error {
// allocate all requested space or no space at all
// TODO: allocate contiguous space on disk with F_ALLOCATECONTIG flag
fstore := &syscall.Fstore_t{
Flags: syscall.F_ALLOCATEALL,
Posmode: syscall.F_PEOFPOSMODE,
Length: length}
p := unsafe.Pointer(fstore)
_, _, errno := syscall.Syscall(syscall.SYS_FCNTL, fd, uintptr(syscall.F_PREALLOCATE), uintptr(p))
if errno == 0 || errno == syscall.ENOTSUP {
return nil
}

// wrong argument to fallocate syscall
if errno == syscall.EINVAL {
// filesystem "st_blocks" are allocated in the units of
// "Allocation Block Size" (run "diskutil info /" command)
var stat syscall.Stat_t
syscall.Fstat(int(fd), &stat)

// syscall.Statfs_t.Bsize is "optimal transfer block size"
// and contains matching 4096 value when latest OS X kernel
// supports 4,096 KB filesystem block size
var statfs syscall.Statfs_t
syscall.Fstatfs(int(fd), &statfs)
blockSize := int64(statfs.Bsize)

if stat.Blocks*blockSize >= offset+length {
// enough blocks are already allocated
return nil
}
}
return errno
}
34 changes: 34 additions & 0 deletions storage/preallocate_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build linux

package storage

import (
"syscall"
)

func preallocExtend(fd uintptr, offset, length int64) error {
err := syscall.Fallocate(int(fd), 0 /* mode */, offset, length)
if err != nil {
errno, ok := err.(syscall.Errno)
// not supported; fallback
// fallocate EINTRs frequently in some environments; fallback
if ok && (errno == syscall.ENOTSUP || errno == syscall.EINTR) {
return syscall.Ftruncate(int(fd), offset+length)
}
}
return err
}
61 changes: 48 additions & 13 deletions storage/syncing_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@ import (
"sync/atomic"
)

// SyncingFileOptions holds the options for a syncingFile.
type SyncingFileOptions struct {
BytesPerSync int
PreallocateSize int
}

type syncingFile struct {
File
fd int
useSyncRange bool
bytesPerSync int64
atomic struct {
fd uintptr
useSyncRange bool
bytesPerSync int64
preallocateSize int64
atomic struct {
offset int64
syncOffset int64
}
syncTo func(offset int64) error
preallocatedBlocks int64
syncTo func(offset int64) error
}

// NewSyncingFile wraps a writable file and ensures that data is synced
Expand All @@ -26,21 +34,28 @@ type syncingFile struct {
// decides to write out a large chunk of dirty filesystem buffers. If
// bytesPerSync is zero, the original file is returned as no syncing is
// requested.
func NewSyncingFile(f File, bytesPerSync int) File {
if bytesPerSync <= 0 {
return f
}
func NewSyncingFile(f File, opts SyncingFileOptions) File {
s := &syncingFile{
File: f,
fd: -1,
bytesPerSync: int64(bytesPerSync),
File: f,
bytesPerSync: int64(opts.BytesPerSync),
preallocateSize: int64(opts.PreallocateSize),
}

type fd interface {
Fd() uintptr
}
if d, ok := f.(fd); ok {
s.fd = d.Fd()
}

s.init()
return s
}

// NB: syncingFile.Write is unsafe for concurrent use!
func (f *syncingFile) Write(p []byte) (n int, err error) {
_ = f.preallocate(atomic.LoadInt64(&f.atomic.offset) + int64(n))

n, err = f.File.Write(p)
if err != nil {
return n, err
Expand All @@ -54,6 +69,22 @@ func (f *syncingFile) Write(p []byte) (n int, err error) {
return n, nil
}

func (f *syncingFile) preallocate(offset int64) error {
if f.fd == 0 || f.preallocateSize == 0 {
return nil
}

newPreallocatedBlocks := (offset + f.preallocateSize - 1) / f.preallocateSize
if newPreallocatedBlocks <= f.preallocatedBlocks {
return nil
}

length := f.preallocateSize * (newPreallocatedBlocks - f.preallocatedBlocks)
offset = f.preallocateSize * f.preallocatedBlocks
f.preallocatedBlocks = newPreallocatedBlocks
return preallocExtend(f.fd, offset, length)
}

func (f *syncingFile) ratchetSyncOffset(offset int64) {
for {
syncOffset := atomic.LoadInt64(&f.atomic.syncOffset)
Expand All @@ -76,6 +107,10 @@ func (f *syncingFile) Sync() error {
}

func (f *syncingFile) maybeSync() error {
if f.bytesPerSync <= 0 {
return nil
}

// From the RocksDB source:
//
// We try to avoid sync to the last 1MB of data. For two reasons:
Expand All @@ -98,7 +133,7 @@ func (f *syncingFile) maybeSync() error {
return nil
}

if f.fd < 0 {
if f.fd == 0 {
return f.Sync()
}
return f.syncTo(syncToOffset)
Expand Down
15 changes: 6 additions & 9 deletions storage/syncing_file_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
package storage

import (
"os"
"sync/atomic"
"syscall"
)

func isSyncRangeSupported(fd int) bool {
func isSyncRangeSupported(fd uintptr) bool {
var stat syscall.Statfs_t
if err := syscall.Fstatfs(fd, &stat); err != nil {
if err := syscall.Fstatfs(int(fd), &stat); err != nil {
return false
}

Expand All @@ -32,11 +31,9 @@ func isSyncRangeSupported(fd int) bool {
}

func (f *syncingFile) init() {
t, ok := f.File.(*os.File)
if !ok {
if f.fd == 0 {
return
}
f.fd = int(t.Fd())
f.useSyncRange = isSyncRangeSupported(f.fd)
if f.useSyncRange {
f.syncTo = f.syncToRange
Expand All @@ -46,10 +43,10 @@ func (f *syncingFile) init() {
}

func (f *syncingFile) syncData() error {
if f.fd < 0 {
if f.fd == 0 {
return f.File.Sync()
}
return syscall.Fdatasync(f.fd)
return syscall.Fdatasync(int(f.fd))
}

func (f *syncingFile) syncToFdatasync(_ int64) error {
Expand All @@ -65,5 +62,5 @@ func (f *syncingFile) syncToRange(offset int64) error {
)

f.ratchetSyncOffset(offset)
return syscall.SyncFileRange(f.fd, 0, offset, write|waitBefore)
return syscall.SyncFileRange(int(f.fd), 0, offset, write|waitBefore)
}
81 changes: 81 additions & 0 deletions storage/syncing_file_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

// +build linux

package storage

import (
"fmt"
"io/ioutil"
"os"
"syscall"
"testing"
"unsafe"
)

func BenchmarkDirectIOWrite(b *testing.B) {
const targetSize = 16 << 20
const alignment = 4096

var wsizes []int
if testing.Verbose() {
wsizes = []int{4 << 10, 8 << 10, 16 << 10, 32 << 10}
} else {
wsizes = []int{4096}
}

for _, wsize := range wsizes {
b.Run(fmt.Sprintf("wsize=%d", wsize), func(b *testing.B) {
tmpf, err := ioutil.TempFile("", "pebble-db-syncing-file-")
if err != nil {
b.Fatal(err)
}
filename := tmpf.Name()
_ = tmpf.Close()
defer os.Remove(filename)

var f *os.File
var size int
buf := make([]byte, wsize+alignment)
if a := uintptr(unsafe.Pointer(&buf[0])) & uintptr(alignment-1); a != 0 {
buf = buf[alignment-a:]
}
buf = buf[:wsize]
init := true

b.ResetTimer()
for i := 0; i < b.N; i++ {
if f == nil {
b.StopTimer()
f, err = os.OpenFile(filename, syscall.O_DIRECT|os.O_RDWR, 0666)
if err != nil {
b.Fatal(err)
}
if init {
for size = 0; size < targetSize; size += len(buf) {
if _, err := f.WriteAt(buf, int64(size)); err != nil {
b.Fatal(err)
}
}
}
if err := f.Sync(); err != nil {
b.Fatal(err)
}
size = 0
b.StartTimer()
}
if _, err := f.WriteAt(buf, int64(size)); err != nil {
b.Fatal(err)
}
size += len(buf)
if size >= targetSize {
_ = f.Close()
f = nil
}
}
b.StopTimer()
})
}
}
Loading

0 comments on commit 00f6085

Please sign in to comment.