Skip to content

Commit

Permalink
engine: Support reading objects without decoding
Browse files Browse the repository at this point in the history
Previously, the local object storage engine always decoded the requested
objects. In cases where it was necessary to obtain a binary object, an
excessive decoding-encoding round was performed.

Now `GetBytes` method to read object binary is provided. The method
allows to optimize memory buffer allocations via optional parameter.

Closes #1723.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Feb 26, 2024
1 parent 9756cc4 commit 0ceb48b
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 113 deletions.
6 changes: 6 additions & 0 deletions cmd/blobovnicza-to-peapod/blobovniczatree/get.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blobovniczatree

import (
"errors"
"fmt"
"path/filepath"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -141,3 +143,7 @@ func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.G

return common.GetRes{Object: obj, RawData: data}, nil
}

func (b *Blobovniczas) GetBytes(_ oid.Address, _ func(ln int) []byte) ([]byte, error) {
return nil, errors.New("unimplemented Blobovniczas.GetBytes")
}
8 changes: 8 additions & 0 deletions pkg/local_object_storage/blobstor/common/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,11 @@ type GetRes struct {
Object *objectSDK.Object
RawData []byte
}

// GetBytesOptions groups options for [Storage.GetBytes].
type GetBytesOptions struct {
// GetBuffer specifies function replacing built-in make to allocate memory
// buffer of ln length. Buffer allocated via GetBuffer is returned from GetBytes
// regardless of error so caller could handle it properly.
GetBuffer func(ln int) []byte
}
4 changes: 4 additions & 0 deletions pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Storage represents key-value object storage.
Expand All @@ -20,6 +21,9 @@ type Storage interface {
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

// GetBytes reads object by address into memory buffer in a canonical NeoFS
// binary format. Returns [apistatus.ObjectNotFound] if object is missing.
GetBytes(oid.Address, GetBytesOptions) ([]byte, error)
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)
Expand Down
12 changes: 11 additions & 1 deletion pkg/local_object_storage/blobstor/compression/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,22 @@ func (c *Config) NeedsCompression(obj *objectSDK.Object) bool {
return c.Enabled
}

// IsCompressed checks whether given data is compressed.
func (c *Config) IsCompressed(data []byte) bool {
return len(data) >= 4 && bytes.Equal(data[:4], zstdFrameMagic)
}

// Decompress decompresses data if it starts with the magic
// and returns data untouched otherwise.
func (c *Config) Decompress(data []byte) ([]byte, error) {
if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) {
if !c.IsCompressed(data) {
return data, nil
}
return c.DecompressForce(data)
}

// DecompressForce decompresses given compressed data.
func (c *Config) DecompressForce(data []byte) ([]byte, error) {
return c.decoder.DecodeAll(data, nil)
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"crypto/sha256"
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -297,6 +299,62 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, nil
}

// GetBytes reads object from the FSTree by address into memory buffer in a
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
// is missing.
func (t *FSTree) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) {
p := t.treePath(addr)

f, err := os.Open(p)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
return nil, fmt.Errorf("open object file %q: %w", p, err)
}

fi, err := f.Stat()
if err != nil {
return nil, fmt.Errorf("stat object file %q: %w", p, err)
}
sz := fi.Size()
if sz > math.MaxInt {
return nil, fmt.Errorf("too big object file %d > %d", sz, math.MaxInt)
}
if sz == 0 {
return nil, nil
}

var b []byte
if opts.GetBuffer != nil {
b = opts.GetBuffer(int(sz))
} else {
b = make([]byte, sz)
}

_, err = io.ReadFull(f, b)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.ErrUnexpectedEOF
}
return b, fmt.Errorf("read all %d bytes from object file %q: %w", sz, p, err)
}

if !t.IsCompressed(b) {
return b, nil
}

dec, err := t.DecompressForce(b)
if err != nil {
if cap(dec) > cap(b) {
b = dec
}
return b, fmt.Errorf("decompress object file data %q: %w", p, err)
}

return dec, nil
}

// GetRange implements common.Storage.
func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := t.Get(common.GetPrm{Address: prm.Address})
Expand Down
36 changes: 36 additions & 0 deletions pkg/local_object_storage/blobstor/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Get reads the object from b.
Expand All @@ -30,3 +31,38 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
}
return b.storage[0].Storage.Get(prm)
}

// GetBytes reads object from the BlobStor by address into memory buffer in a
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
// is missing.
func (b *BlobStor) GetBytes(addr oid.Address, subStorageID []byte, opts common.GetBytesOptions) ([]byte, error) {
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()

var bs []byte
if subStorageID == nil {
getReusedBuffer := func(ln int) []byte {
if cap(bs) >= ln {
return bs[:ln]
}
if opts.GetBuffer != nil {
return opts.GetBuffer(ln)
}
return make([]byte, ln)
}
opts := common.GetBytesOptions{GetBuffer: getReusedBuffer}
for i := range b.storage {
var err error
bs, err = b.storage[i].Storage.GetBytes(addr, opts)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return bs, err
}
}

return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
if len(subStorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.GetBytes(addr, opts)
}
return b.storage[0].Storage.GetBytes(addr, opts)
}
19 changes: 19 additions & 0 deletions pkg/local_object_storage/blobstor/internal/blobstortest/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,24 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
res, err = s.Get(gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].raw, res.RawData)

// Binary.
b, err := s.GetBytes(objects[i].addr, common.GetBytesOptions{})
require.NoError(t, err)
require.Equal(t, objects[i].raw, b)

b2, err := s.GetBytes(objects[i].addr, common.GetBytesOptions{
GetBuffer: func(ln int) []byte {
if cap(b) >= ln {
return b[:ln]
}
return make([]byte, ln)
},
})
require.NoError(t, err)
require.Equal(t, objects[i].raw, b2)
if len(b) > 0 {
require.Equal(t, &b[0], &b2[0])
}
}
}
50 changes: 50 additions & 0 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,56 @@ func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, err
}

// GetBytes reads object from the Peapod by address into memory buffer in a
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
// is missing.
func (x *Peapod) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) {
var b []byte

err := x.bolt.View(func(tx *bbolt.Tx) error {
bktRoot := tx.Bucket(rootBucket)
if bktRoot == nil {
return errMissingRootBucket
}

val := bktRoot.Get(keyForObject(addr))
if val == nil {
return apistatus.ErrObjectNotFound
}

if opts.GetBuffer != nil {
b = opts.GetBuffer(len(val))
} else {
b = make([]byte, len(val))
}

copy(b, val)

return nil
})
if err != nil {
if errors.Is(err, apistatus.ErrObjectNotFound) {
return nil, logicerr.Wrap(err)
}
return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err)
}

// copy-paste from FSTree
if !x.compress.IsCompressed(b) {
return b, nil
}

dec, err := x.compress.DecompressForce(b)
if err != nil {
if cap(dec) > cap(b) {
b = dec
}
return b, fmt.Errorf("decompress object BoltDB data: %w", err)
}

return dec, nil
}

// GetRange works like Get but reads specific payload range.
func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
// copy-paste from FSTree
Expand Down
11 changes: 6 additions & 5 deletions pkg/local_object_storage/engine/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const errSmallSize = 256

func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string, [2]*shard.ID) {
if dir == "" {
var err error

Expand All @@ -31,10 +31,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
}

e := New(
WithLogger(zaptest.NewLogger(t)),
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
e := New(append([]Option{WithShardPoolSize(1)}, opts...)...)

var ids [2]*shard.ID
var err error
Expand All @@ -60,6 +57,10 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
return e, dir, ids
}

func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
return newEngine(t, dir, WithLogger(zaptest.NewLogger(t)), WithErrorThreshold(errThreshold))
}

func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
Expand Down
Loading

0 comments on commit 0ceb48b

Please sign in to comment.