Skip to content

Commit

Permalink
engine: Support reading objects without decoding
Browse files Browse the repository at this point in the history
Previously, the local object storage engine always decoded the requested
objects. In cases where it was necessary to obtain a binary object, an
excessive decoding-encoding round was performed.

Now `GetBytes` method to read object binary is provided. The method
allows to optimize memory buffer allocations via optional parameter.

Closes #1723.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Feb 26, 2024
1 parent 9756cc4 commit 8654d4f
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 113 deletions.
6 changes: 6 additions & 0 deletions cmd/blobovnicza-to-peapod/blobovniczatree/get.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blobovniczatree

import (
"errors"
"fmt"
"path/filepath"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -141,3 +143,7 @@ func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.G

return common.GetRes{Object: obj, RawData: data}, nil
}

func (b *Blobovniczas) GetBytes(_ oid.Address, _ common.GetBytesOptions) ([]byte, error) {
return nil, errors.New("unimplemented Blobovniczas.GetBytes")

Check warning on line 148 in cmd/blobovnicza-to-peapod/blobovniczatree/get.go

View check run for this annotation

Codecov / codecov/patch

cmd/blobovnicza-to-peapod/blobovniczatree/get.go#L147-L148

Added lines #L147 - L148 were not covered by tests
}
8 changes: 8 additions & 0 deletions pkg/local_object_storage/blobstor/common/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,11 @@ type GetRes struct {
Object *objectSDK.Object
RawData []byte
}

// GetBytesOptions groups options for [Storage.GetBytes].
type GetBytesOptions struct {
// GetBuffer specifies function replacing built-in make to allocate memory
// buffer of ln length. Buffer allocated via GetBuffer is returned from GetBytes
// regardless of error so caller could handle it properly.
GetBuffer func(ln int) []byte
}
4 changes: 4 additions & 0 deletions pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Storage represents key-value object storage.
Expand All @@ -20,6 +21,9 @@ type Storage interface {
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

// GetBytes reads object by address into memory buffer in a canonical NeoFS
// binary format. Returns [apistatus.ObjectNotFound] if object is missing.
GetBytes(oid.Address, GetBytesOptions) ([]byte, error)
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)
Expand Down
12 changes: 11 additions & 1 deletion pkg/local_object_storage/blobstor/compression/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,22 @@ func (c *Config) NeedsCompression(obj *objectSDK.Object) bool {
return c.Enabled
}

// IsCompressed checks whether given data is compressed.
func (c *Config) IsCompressed(data []byte) bool {
return len(data) >= 4 && bytes.Equal(data[:4], zstdFrameMagic)

Check warning on line 76 in pkg/local_object_storage/blobstor/compression/compress.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/compression/compress.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

// Decompress decompresses data if it starts with the magic
// and returns data untouched otherwise.
func (c *Config) Decompress(data []byte) ([]byte, error) {
if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) {
if !c.IsCompressed(data) {

Check warning on line 82 in pkg/local_object_storage/blobstor/compression/compress.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/compression/compress.go#L82

Added line #L82 was not covered by tests
return data, nil
}
return c.DecompressForce(data)

Check warning on line 85 in pkg/local_object_storage/blobstor/compression/compress.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/compression/compress.go#L85

Added line #L85 was not covered by tests
}

// DecompressForce decompresses given compressed data.
func (c *Config) DecompressForce(data []byte) ([]byte, error) {

Check warning on line 89 in pkg/local_object_storage/blobstor/compression/compress.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/compression/compress.go#L89

Added line #L89 was not covered by tests
return c.decoder.DecodeAll(data, nil)
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"crypto/sha256"
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -297,6 +299,62 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, nil
}

// GetBytes reads object from the FSTree by address into memory buffer in a
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
// is missing.
func (t *FSTree) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) {
p := t.treePath(addr)

f, err := os.Open(p)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})

Check warning on line 311 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L310-L311

Added lines #L310 - L311 were not covered by tests
}
return nil, fmt.Errorf("open object file %q: %w", p, err)

Check warning on line 313 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L313

Added line #L313 was not covered by tests
}

fi, err := f.Stat()
if err != nil {
return nil, fmt.Errorf("stat object file %q: %w", p, err)

Check warning on line 318 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L318

Added line #L318 was not covered by tests
}
sz := fi.Size()
if sz > math.MaxInt {
return nil, fmt.Errorf("too big object file %d > %d", sz, math.MaxInt)

Check warning on line 322 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L322

Added line #L322 was not covered by tests
}
if sz == 0 {
return nil, nil

Check warning on line 325 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L325

Added line #L325 was not covered by tests
}

var b []byte
if opts.GetBuffer != nil {
b = opts.GetBuffer(int(sz))
} else {
b = make([]byte, sz)
}

_, err = io.ReadFull(f, b)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.ErrUnexpectedEOF

Check warning on line 338 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L337-L338

Added lines #L337 - L338 were not covered by tests
}
return b, fmt.Errorf("read all %d bytes from object file %q: %w", sz, p, err)

Check warning on line 340 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L340

Added line #L340 was not covered by tests
}

if !t.IsCompressed(b) {
return b, nil
}

dec, err := t.DecompressForce(b)
if err != nil {
if cap(dec) > cap(b) {
b = dec

Check warning on line 350 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L347-L350

Added lines #L347 - L350 were not covered by tests
}
return b, fmt.Errorf("decompress object file data %q: %w", p, err)

Check warning on line 352 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L352

Added line #L352 was not covered by tests
}

return dec, nil

Check warning on line 355 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L355

Added line #L355 was not covered by tests
}

// GetRange implements common.Storage.
func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := t.Get(common.GetPrm{Address: prm.Address})
Expand Down
36 changes: 36 additions & 0 deletions pkg/local_object_storage/blobstor/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Get reads the object from b.
Expand All @@ -30,3 +31,38 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
}
return b.storage[0].Storage.Get(prm)
}

// GetBytes reads object from the BlobStor by address into memory buffer in a
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
// is missing.
func (b *BlobStor) GetBytes(addr oid.Address, subStorageID []byte, opts common.GetBytesOptions) ([]byte, error) {
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()

Check warning on line 40 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L38-L40

Added lines #L38 - L40 were not covered by tests

var bs []byte
if subStorageID == nil {
getReusedBuffer := func(ln int) []byte {
if cap(bs) >= ln {
return bs[:ln]

Check warning on line 46 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L42-L46

Added lines #L42 - L46 were not covered by tests
}
if opts.GetBuffer != nil {
return opts.GetBuffer(ln)

Check warning on line 49 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}
return make([]byte, ln)

Check warning on line 51 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L51

Added line #L51 was not covered by tests
}
opts := common.GetBytesOptions{GetBuffer: getReusedBuffer}
for i := range b.storage {
var err error
bs, err = b.storage[i].Storage.GetBytes(addr, opts)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return bs, err

Check warning on line 58 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L53-L58

Added lines #L53 - L58 were not covered by tests
}
}

return nil, logicerr.Wrap(apistatus.ObjectNotFound{})

Check warning on line 62 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L62

Added line #L62 was not covered by tests
}
if len(subStorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.GetBytes(addr, opts)

Check warning on line 65 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}
return b.storage[0].Storage.GetBytes(addr, opts)

Check warning on line 67 in pkg/local_object_storage/blobstor/get.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/get.go#L67

Added line #L67 was not covered by tests
}
19 changes: 19 additions & 0 deletions pkg/local_object_storage/blobstor/internal/blobstortest/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,24 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
res, err = s.Get(gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].raw, res.RawData)

// Binary.
b, err := s.GetBytes(objects[i].addr, common.GetBytesOptions{})
require.NoError(t, err)
require.Equal(t, objects[i].raw, b)

b2, err := s.GetBytes(objects[i].addr, common.GetBytesOptions{
GetBuffer: func(ln int) []byte {
if cap(b) >= ln {
return b[:ln]
}
return make([]byte, ln)
},
})
require.NoError(t, err)
require.Equal(t, objects[i].raw, b2)
if len(b) > 0 {
require.Equal(t, &b[0], &b2[0])
}
}
}
50 changes: 50 additions & 0 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,56 @@ func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, err
}

// GetBytes reads object from the Peapod by address into memory buffer in a
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
// is missing.
func (x *Peapod) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) {
var b []byte

err := x.bolt.View(func(tx *bbolt.Tx) error {
bktRoot := tx.Bucket(rootBucket)
if bktRoot == nil {
return errMissingRootBucket

Check warning on line 327 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L327

Added line #L327 was not covered by tests
}

val := bktRoot.Get(keyForObject(addr))
if val == nil {
return apistatus.ErrObjectNotFound

Check warning on line 332 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L332

Added line #L332 was not covered by tests
}

if opts.GetBuffer != nil {
b = opts.GetBuffer(len(val))
} else {
b = make([]byte, len(val))
}

copy(b, val)

return nil
})
if err != nil {
if errors.Is(err, apistatus.ErrObjectNotFound) {
return nil, logicerr.Wrap(err)

Check warning on line 347 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L346-L347

Added lines #L346 - L347 were not covered by tests
}
return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err)

Check warning on line 349 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L349

Added line #L349 was not covered by tests
}

// copy-paste from FSTree
if !x.compress.IsCompressed(b) {
return b, nil
}

dec, err := x.compress.DecompressForce(b)
if err != nil {
if cap(dec) > cap(b) {
b = dec

Check warning on line 360 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L357-L360

Added lines #L357 - L360 were not covered by tests
}
return b, fmt.Errorf("decompress object BoltDB data: %w", err)

Check warning on line 362 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L362

Added line #L362 was not covered by tests
}

return dec, nil

Check warning on line 365 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L365

Added line #L365 was not covered by tests
}

// GetRange works like Get but reads specific payload range.
func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
// copy-paste from FSTree
Expand Down
11 changes: 6 additions & 5 deletions pkg/local_object_storage/engine/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const errSmallSize = 256

func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string, [2]*shard.ID) {
if dir == "" {
var err error

Expand All @@ -31,10 +31,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
}

e := New(
WithLogger(zaptest.NewLogger(t)),
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
e := New(append([]Option{WithShardPoolSize(1)}, opts...)...)

var ids [2]*shard.ID
var err error
Expand All @@ -60,6 +57,10 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
return e, dir, ids
}

func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
return newEngine(t, dir, WithLogger(zaptest.NewLogger(t)), WithErrorThreshold(errThreshold))
}

func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
Expand Down
Loading

0 comments on commit 8654d4f

Please sign in to comment.