Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

O_TMPFILE #2566

Merged
merged 6 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changelog for NeoFS Node

### Fixed

### Changed
- FSTree storage now uses more efficient and safe temporary files under Linux (#2566)

### Removed

### Updated
Expand Down
139 changes: 139 additions & 0 deletions pkg/local_object_storage/blobstor/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package blobstor_test

import (
"crypto/rand"
"fmt"
"path/filepath"
"sync"
"testing"
"time"

bbczt "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func benchmarkPutMN(b *testing.B, depth, width uint64, parallel bool) {
nBlobovniczas := uint64(1)
for i := uint64(1); i <= depth+1; i++ {
nBlobovniczas *= width
}

const objSizeLimit = 4 << 10
const fullSizeLimit = 100 << 20

bbcz := bbczt.NewBlobovniczaTree(
bbczt.WithRootPath(b.TempDir()),
bbczt.WithObjectSizeLimit(objSizeLimit),
bbczt.WithBlobovniczaSize(fullSizeLimit/nBlobovniczas),
bbczt.WithBlobovniczaShallowWidth(width),
bbczt.WithBlobovniczaShallowDepth(depth),
)

require.NoError(b, bbcz.Open(false))
b.Cleanup(func() { _ = bbcz.Close() })
require.NoError(b, bbcz.Init())

benchmark(b, bbcz, objSizeLimit, 20)
}

func BenchmarkBlobovniczas_Put(b *testing.B) {
for _, testCase := range []struct {
width, depth uint64
}{
{1, 0},
{10, 0},
{2, 2},
{4, 4},
} {
b.Run(fmt.Sprintf("tree=%dx%d", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, false)
})
b.Run(fmt.Sprintf("tree=%dx%d_parallel", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, true)
})
}
}

func testPeapodPath(tb testing.TB) string {
return filepath.Join(tb.TempDir(), "peapod.db")
}

func newTestPeapod(tb testing.TB) common.Storage {
return peapod.New(testPeapodPath(tb), 0600, 10*time.Millisecond)
}

func newTestFSTree(tb testing.TB) common.Storage {
return fstree.New(
fstree.WithDepth(4), // Default.
fstree.WithPath(tb.TempDir()),
fstree.WithDirNameLen(1), // Default.
fstree.WithNoSync(false), // Default.
)
}

func benchmark(b *testing.B, p common.Storage, objSize uint64, nThreads int) {
data := make([]byte, objSize)
rand.Read(data)

prm := common.PutPrm{
RawData: data,
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
var wg sync.WaitGroup

for i := 0; i < nThreads; i++ {
wg.Add(1)
go func() {
defer wg.Done()

prm := prm
prm.Address = oidtest.Address()

_, err := p.Put(prm)
require.NoError(b, err)
}()
}

wg.Wait()
}
}

func BenchmarkPut(b *testing.B) {
for _, tc := range []struct {
objSize uint64
nThreads int
}{
{1, 1},
{1, 20},
{1, 100},
{1 << 10, 1},
{1 << 10, 20},
{1 << 10, 100},
{100 << 10, 1},
{100 << 10, 20},
{100 << 10, 100},
} {
b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) {
for name, creat := range map[string]func(testing.TB) common.Storage{
"peapod": newTestPeapod,
"fstree": newTestFSTree,
} {
b.Run(name, func(b *testing.B) {
ptt := creat(b)
require.NoError(b, ptt.Open(false))
require.NoError(b, ptt.Init())
b.Cleanup(func() { _ = ptt.Close() })

benchmark(b, ptt, tc.objSize, tc.nThreads)
})
}
})
}
}
85 changes: 0 additions & 85 deletions pkg/local_object_storage/blobstor/blobovniczatree/put_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package blobovniczatree_test

import (
"crypto/rand"
"errors"
"fmt"
"sync"
"testing"

. "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
Expand Down Expand Up @@ -42,84 +38,3 @@ func TestSingleDir(t *testing.T) {
})
require.NoError(t, err)
}

func benchmarkPutMN(b *testing.B, depth, width uint64, parallel bool) {
nBlobovniczas := uint64(1)
for i := uint64(1); i <= depth+1; i++ {
nBlobovniczas *= width
}

const objSizeLimit = 4 << 10
const fullSizeLimit = 100 << 20

bbcz := NewBlobovniczaTree(
WithRootPath(b.TempDir()),
WithObjectSizeLimit(objSizeLimit),
WithBlobovniczaSize(fullSizeLimit/nBlobovniczas),
WithBlobovniczaShallowWidth(width),
WithBlobovniczaShallowDepth(depth),
)

require.NoError(b, bbcz.Open(false))
b.Cleanup(func() { _ = bbcz.Close() })
require.NoError(b, bbcz.Init())

prm := common.PutPrm{
RawData: make([]byte, objSizeLimit),
}

rand.Read(prm.RawData)

var wg sync.WaitGroup

f := func(prm common.PutPrm) {
defer wg.Done()

var err error

for i := 0; i < b.N; i++ {
prm.Address = oidtest.Address()

_, err = bbcz.Put(prm)
if err != nil {
if errors.Is(err, common.ErrNoSpace) {
break
}
require.NoError(b, err)
}
}
}

nRoutine := 1
if parallel {
nRoutine = 20
}

b.ReportAllocs()
b.ResetTimer()

for j := 0; j < nRoutine; j++ {
wg.Add(1)
go f(prm)
}

wg.Wait()
}

func BenchmarkBlobovniczas_Put(b *testing.B) {
for _, testCase := range []struct {
width, depth uint64
}{
{1, 0},
{10, 0},
{2, 2},
{4, 4},
} {
b.Run(fmt.Sprintf("tree=%dx%d", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, false)
})
b.Run(fmt.Sprintf("tree=%dx%d_parallel", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, true)
})
}
}
100 changes: 1 addition & 99 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
Expand Down Expand Up @@ -247,103 +245,7 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
if !prm.DontCompress {
prm.RawData = t.Compress(prm.RawData)
}

// Here is a situation:
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""}
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"}
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1}
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false}
//
// 1. We put an object on node 1.
// 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2.
// 3. PUT operation started by client at (1) also puts an object here.
// 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error.
// Even more than that, concurrent writes can corrupt data.
//
// So here is a solution:
// 1. Write a file to 'name + 1'.
// 2. If it exists, retry with temporary name being 'name + 2'.
// 3. Set some reasonable number of attempts.
//
// It is a bit kludgey, but I am unusually proud about having found this out after
// hours of research on linux kernel, dirsync mount option and ext4 FS, turned out
// to be so hecking simple.
// In a very rare situation we can have multiple partially written copies on disk,
// this will be fixed in another issue (we should remove garbage on start).
const retryCount = 5
for i := 0; i < retryCount; i++ {
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
err := t.writeAndRename(tmpPath, p, prm.RawData)
if err != syscall.EEXIST || i == retryCount-1 {
return common.PutRes{StorageID: []byte{}}, err
}
}

// unreachable, but precaution never hurts, especially 1 day before release.
return common.PutRes{StorageID: []byte{}}, fmt.Errorf("couldn't read file after %d retries", retryCount)
}

// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
err := t.writeFile(tmpPath, data)
if err != nil {
var pe *fs.PathError
if errors.As(err, &pe) {
switch pe.Err {
case syscall.ENOSPC:
err = common.ErrNoSpace
_ = os.RemoveAll(tmpPath)
case syscall.EEXIST:
return syscall.EEXIST
}
}
} else {
err = os.Rename(tmpPath, p)
}
return err
}

func (t *FSTree) writeFlags() int {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL
if t.noSync {
return flags
}
return flags | os.O_SYNC
}

// writeFile writes data to a file with path p.
// The code is copied from `os.WriteFile` with minor corrections for flags.
func (t *FSTree) writeFile(p string, data []byte) error {
f, err := os.OpenFile(p, t.writeFlags(), t.Permissions)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}

// PutStream puts executes handler on a file opened for write.
func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error {
if t.readOnly {
return common.ErrReadOnly
}

p := t.treePath(addr)

if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
return err
}

f, err := os.OpenFile(p, t.writeFlags(), t.Permissions)
if err != nil {
return err
}
defer f.Close()

return handler(f)
return common.PutRes{StorageID: []byte{}}, t.writeData(p, prm.RawData)
}

// Get returns an object from the storage by address.
Expand Down
Loading
Loading