diff --git a/cmd/blobovnicza-to-peapod/blobovniczatree/get.go b/cmd/blobovnicza-to-peapod/blobovniczatree/get.go index 52792dad1c..4ee5b5998a 100644 --- a/cmd/blobovnicza-to-peapod/blobovniczatree/get.go +++ b/cmd/blobovnicza-to-peapod/blobovniczatree/get.go @@ -1,6 +1,7 @@ package blobovniczatree import ( + "errors" "fmt" "path/filepath" @@ -9,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -141,3 +143,7 @@ func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.G return common.GetRes{Object: obj, RawData: data}, nil } + +func (b *Blobovniczas) GetBytes(_ oid.Address, _ common.GetBytesOptions) ([]byte, error) { + return nil, errors.New("unimplemented Blobovniczas.GetBytes") +} diff --git a/pkg/local_object_storage/blobstor/common/get.go b/pkg/local_object_storage/blobstor/common/get.go index c8a96c5adc..04edef1e4b 100644 --- a/pkg/local_object_storage/blobstor/common/get.go +++ b/pkg/local_object_storage/blobstor/common/get.go @@ -15,3 +15,11 @@ type GetRes struct { Object *objectSDK.Object RawData []byte } + +// GetBytesOptions groups options for [Storage.GetBytes]. +type GetBytesOptions struct { + // GetBuffer specifies function replacing built-in make to allocate memory + // buffer of ln length. Buffer allocated via GetBuffer is returned from GetBytes + // regardless of error so caller could handle it properly. + GetBuffer func(ln int) []byte +} diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 6a6ecd9a91..488b1e32eb 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) // Storage represents key-value object storage. @@ -20,6 +21,9 @@ type Storage interface { // This function MUST be called before Open. SetReportErrorFunc(f func(string, error)) + // GetBytes reads object by address into memory buffer in a canonical NeoFS + // binary format. Returns [apistatus.ObjectNotFound] if object is missing. + GetBytes(oid.Address, GetBytesOptions) ([]byte, error) Get(GetPrm) (GetRes, error) GetRange(GetRangePrm) (GetRangeRes, error) Exists(ExistsPrm) (ExistsRes, error) diff --git a/pkg/local_object_storage/blobstor/compression/compress.go b/pkg/local_object_storage/blobstor/compression/compress.go index fdca2f9a10..cd7982c279 100644 --- a/pkg/local_object_storage/blobstor/compression/compress.go +++ b/pkg/local_object_storage/blobstor/compression/compress.go @@ -71,12 +71,22 @@ func (c *Config) NeedsCompression(obj *objectSDK.Object) bool { return c.Enabled } +// IsCompressed checks whether given data is compressed. +func (c *Config) IsCompressed(data []byte) bool { + return len(data) >= 4 && bytes.Equal(data[:4], zstdFrameMagic) +} + // Decompress decompresses data if it starts with the magic // and returns data untouched otherwise. func (c *Config) Decompress(data []byte) ([]byte, error) { - if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) { + if !c.IsCompressed(data) { return data, nil } + return c.DecompressForce(data) +} + +// DecompressForce decompresses given compressed data. +func (c *Config) DecompressForce(data []byte) ([]byte, error) { return c.decoder.DecodeAll(data, nil) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index f3f8c9feab..5f4c3c7483 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -4,7 +4,9 @@ import ( "crypto/sha256" "errors" "fmt" + "io" "io/fs" + "math" "os" "path/filepath" "strings" @@ -297,6 +299,62 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) { return common.GetRes{Object: obj, RawData: data}, nil } +// GetBytes reads object from the FSTree by address into memory buffer in a +// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object +// is missing. +func (t *FSTree) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) { + p := t.treePath(addr) + + f, err := os.Open(p) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) + } + return nil, fmt.Errorf("open object file %q: %w", p, err) + } + + fi, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("stat object file %q: %w", p, err) + } + sz := fi.Size() + if sz > math.MaxInt { + return nil, fmt.Errorf("too big object file %d > %d", sz, math.MaxInt) + } + if sz == 0 { + return nil, nil + } + + var b []byte + if opts.GetBuffer != nil { + b = opts.GetBuffer(int(sz)) + } else { + b = make([]byte, sz) + } + + _, err = io.ReadFull(f, b) + if err != nil { + if errors.Is(err, io.EOF) { + err = io.ErrUnexpectedEOF + } + return b, fmt.Errorf("read all %d bytes from object file %q: %w", sz, p, err) + } + + if !t.IsCompressed(b) { + return b, nil + } + + dec, err := t.DecompressForce(b) + if err != nil { + if cap(dec) > cap(b) { + b = dec + } + return b, fmt.Errorf("decompress object file data %q: %w", p, err) + } + + return dec, nil +} + // GetRange implements common.Storage. func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { res, err := t.Get(common.GetPrm{Address: prm.Address}) diff --git a/pkg/local_object_storage/blobstor/get.go b/pkg/local_object_storage/blobstor/get.go index 72b51a246e..024fc1faa3 100644 --- a/pkg/local_object_storage/blobstor/get.go +++ b/pkg/local_object_storage/blobstor/get.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) // Get reads the object from b. @@ -30,3 +31,38 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) { } return b.storage[0].Storage.Get(prm) } + +// GetBytes reads object from the BlobStor by address into memory buffer in a +// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object +// is missing. +func (b *BlobStor) GetBytes(addr oid.Address, subStorageID []byte, opts common.GetBytesOptions) ([]byte, error) { + b.modeMtx.RLock() + defer b.modeMtx.RUnlock() + + var bs []byte + if subStorageID == nil { + getReusedBuffer := func(ln int) []byte { + if cap(bs) >= ln { + return bs[:ln] + } + if opts.GetBuffer != nil { + return opts.GetBuffer(ln) + } + return make([]byte, ln) + } + opts := common.GetBytesOptions{GetBuffer: getReusedBuffer} + for i := range b.storage { + var err error + bs, err = b.storage[i].Storage.GetBytes(addr, opts) + if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { + return bs, err + } + } + + return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) + } + if len(subStorageID) == 0 { + return b.storage[len(b.storage)-1].Storage.GetBytes(addr, opts) + } + return b.storage[0].Storage.GetBytes(addr, opts) +} diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/get.go b/pkg/local_object_storage/blobstor/internal/blobstortest/get.go index e8d8a4531b..47af50bc5f 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/get.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/get.go @@ -46,5 +46,24 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) { res, err = s.Get(gPrm) require.NoError(t, err) require.Equal(t, objects[i].raw, res.RawData) + + // Binary. + b, err := s.GetBytes(objects[i].addr, common.GetBytesOptions{}) + require.NoError(t, err) + require.Equal(t, objects[i].raw, b) + + b2, err := s.GetBytes(objects[i].addr, common.GetBytesOptions{ + GetBuffer: func(ln int) []byte { + if cap(b) >= ln { + return b[:ln] + } + return make([]byte, ln) + }, + }) + require.NoError(t, err) + require.Equal(t, objects[i].raw, b2) + if len(b) > 0 { + require.Equal(t, &b[0], &b2[0]) + } } } diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go index b579d205cd..95724e817b 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -315,6 +315,56 @@ func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) { return common.GetRes{Object: obj, RawData: data}, err } +// GetBytes reads object from the Peapod by address into memory buffer in a +// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object +// is missing. +func (x *Peapod) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) { + var b []byte + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + val := bktRoot.Get(keyForObject(addr)) + if val == nil { + return apistatus.ErrObjectNotFound + } + + if opts.GetBuffer != nil { + b = opts.GetBuffer(len(val)) + } else { + b = make([]byte, len(val)) + } + + copy(b, val) + + return nil + }) + if err != nil { + if errors.Is(err, apistatus.ErrObjectNotFound) { + return nil, logicerr.Wrap(err) + } + return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + // copy-paste from FSTree + if !x.compress.IsCompressed(b) { + return b, nil + } + + dec, err := x.compress.DecompressForce(b) + if err != nil { + if cap(dec) > cap(b) { + b = dec + } + return b, fmt.Errorf("decompress object BoltDB data: %w", err) + } + + return dec, nil +} + // GetRange works like Get but reads specific payload range. func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { // copy-paste from FSTree diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index a590fda035..3e052d3b57 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -22,7 +22,7 @@ import ( const errSmallSize = 256 -func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { +func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string, [2]*shard.ID) { if dir == "" { var err error @@ -31,10 +31,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) t.Cleanup(func() { _ = os.RemoveAll(dir) }) } - e := New( - WithLogger(zaptest.NewLogger(t)), - WithShardPoolSize(1), - WithErrorThreshold(errThreshold)) + e := New(append([]Option{WithShardPoolSize(1)}, opts...)...) var ids [2]*shard.ID var err error @@ -60,6 +57,10 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) return e, dir, ids } +func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { + return newEngine(t, dir, WithLogger(zaptest.NewLogger(t)), WithErrorThreshold(errThreshold)) +} + func TestErrorReporting(t *testing.T) { t.Run("ignore errors by default", func(t *testing.T) { e, dir, id := newEngineWithErrorThreshold(t, "", 0) diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 1b284ebd3f..0936884ed1 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -3,6 +3,7 @@ package engine import ( "errors" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -44,21 +45,30 @@ func (r GetRes) Object() *objectSDK.Object { // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) { + var sp shard.GetPrm + sp.SetAddress(prm.addr) err = e.execIfNotBlocked(func() error { - res, err = e.get(prm) - return err + return e.get(prm.addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { + sp.SetIgnoreMeta(ignoreMetadata) + sr, err := s.Get(sp) + if err != nil { + return sr.HasMeta(), err + } + res.obj = sr.Object() + return sr.HasMeta(), nil + }) }) return } -func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { +func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error { if e.metrics != nil { defer elapsed(e.metrics.AddGetDuration)() } var ( - obj *objectSDK.Object + ok bool siErr *objectSDK.SplitInfoError errNotFound apistatus.ObjectNotFound @@ -70,21 +80,16 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { metaError error ) - var shPrm shard.GetPrm - shPrm.SetAddress(prm.addr) - var hasDegraded bool var objectExpired bool - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() - shPrm.SetIgnoreMeta(noMeta) - hasDegraded = hasDegraded || noMeta - res, err := sh.Get(shPrm) + hasMetadata, err := shardFunc(sh.Shard, noMeta) if err != nil { - if res.HasMeta() { + if hasMetadata { shardWithMeta = sh metaError = err } @@ -122,51 +127,47 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { } } - obj = res.Object() + ok = true return true }) if outSI != nil { - return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } if objectExpired { - return GetRes{}, errNotFound + return errNotFound } - if obj == nil { + if !ok { if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) { - return GetRes{}, outError + return outError } // If the object is not found but is present in metabase, // try to fetch it from blobstor directly. If it is found in any // blobstor, increase the error counter for the shard which contains the meta. - shPrm.SetIgnoreMeta(true) - - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { if sh.GetMode().NoMetabase() { // Already visited. return false } - res, err := sh.Get(shPrm) - obj = res.Object() - return err == nil + _, err := shardFunc(sh.Shard, true) + ok = err == nil + return ok }) - if obj == nil { - return GetRes{}, outError + if !ok { + return outError } if shardWithMeta.Shard != nil { e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, zap.Stringer("address", prm.addr)) + metaError, zap.Stringer("address", addr)) } } - return GetRes{ - obj: obj, - }, nil + return nil } // Get reads object from local storage by provided address. @@ -181,3 +182,33 @@ func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { return res.Object(), nil } + +// GetBytes reads object from the StorageEngine by address into memory buffer in +// a canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object +// is missing. +func (e *StorageEngine) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) { + var b []byte + var err error + getBufferBase := opts.GetBuffer + opts.GetBuffer = func(ln int) []byte { + if cap(b) >= ln { + return b[:ln] + } + if getBufferBase != nil { + return getBufferBase(ln) + } + return make([]byte, ln) + } + + err = e.execIfNotBlocked(func() error { + return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { + if ignoreMetadata { + b, err = s.GetBytes(addr, opts) + } else { + b, hasMetadata, err = s.GetBytesWithMetadataLookup(addr, opts) + } + return + }) + }) + return b, err +} diff --git a/pkg/local_object_storage/engine/get_test.go b/pkg/local_object_storage/engine/get_test.go new file mode 100644 index 0000000000..53ec66e988 --- /dev/null +++ b/pkg/local_object_storage/engine/get_test.go @@ -0,0 +1,40 @@ +package engine + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" +) + +func TestStorageEngine_GetBytes(t *testing.T) { + e, _, _ := newEngine(t, t.TempDir()) + obj := generateObjectWithCID(t, cidtest.ID()) + addr := object.AddressOf(obj) + + objBin, err := obj.Marshal() + require.NoError(t, err) + + err = Put(e, obj) + require.NoError(t, err) + + b, err := e.GetBytes(addr, common.GetBytesOptions{}) + require.NoError(t, err) + require.Equal(t, objBin, b) + + b2, err := e.GetBytes(addr, common.GetBytesOptions{ + GetBuffer: func(ln int) []byte { + if cap(b) >= ln { + return b[:ln] + } + return make([]byte, ln) + }, + }) + require.NoError(t, err) + require.Equal(t, objBin, b2) + if len(b) > 0 { + require.Equal(t, &b[0], &b2[0]) + } +} diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 708e8280cf..f2e721b8e0 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -14,10 +14,6 @@ import ( "go.uber.org/zap" ) -// storFetcher is a type to unify object fetching mechanism in `fetchObjectData` -// method. It represents generalization of `getSmall` and `getBig` methods. -type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) - // GetPrm groups the parameters of Get operation. type GetPrm struct { addr oid.Address @@ -65,38 +61,48 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) { s.m.RLock() defer s.m.RUnlock() - cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { + var res GetRes + + cb := func(stor *blobstor.BlobStor, id []byte) error { var getPrm common.GetPrm getPrm.Address = prm.addr getPrm.StorageID = id - res, err := stor.Get(getPrm) + r, err := stor.Get(getPrm) if err != nil { - return nil, err + return err } - - return res.Object, nil + res.obj = r.Object + return nil } - wc := func(c writecache.Cache) (*objectSDK.Object, error) { - return c.Get(prm.addr) + wc := func(c writecache.Cache) error { + o, err := c.Get(prm.addr) + if err != nil { + return err + } + res.obj = o + return nil } skipMeta := prm.skipMeta || s.info.Mode.NoMetabase() - obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc) + var err error + res.hasMeta, err = s.fetchObjectData(prm.addr, skipMeta, cb, wc) - return GetRes{ - obj: obj, - hasMeta: hasMeta, - }, err + return res, err } // emptyStorageID is an empty storageID that indicates that // an object is big (and is stored in an FSTree, not in a peapod). var emptyStorageID = make([]byte, 0) -// fetchObjectData looks through writeCache and blobStor to find object. -func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) { +// fetchObjectData looks through writeCache and blobStor to find object. Returns +// true iff skipMeta flag is unset && referenced object is found in the +// underlying metaBase. +func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, + blobFunc func(bs *blobstor.BlobStor, subStorageID []byte) error, + wc func(w writecache.Cache) error, +) (bool, error) { var ( mErr error mRes meta.ExistsRes @@ -109,15 +115,15 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, mRes, mErr = s.metaBase.Exists(mPrm) if mErr != nil && !s.info.Mode.NoMetabase() { - return nil, false, mErr + return false, mErr } exists = mRes.Exists() } if s.hasWriteCache() { - res, err := wc(s.writeCache) + err := wc(s.writeCache) if err == nil || IsErrOutOfRange(err) { - return res, false, err + return false, err } if IsErrNotFound(err) { @@ -133,12 +139,12 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, } if skipMeta || mErr != nil { - res, err := cb(s.blobStor, nil) - return res, false, err + err := blobFunc(s.blobStor, nil) + return false, err } if !exists { - return nil, false, logicerr.Wrap(apistatus.ObjectNotFound{}) + return false, logicerr.Wrap(apistatus.ObjectNotFound{}) } var mPrm meta.StorageIDPrm @@ -146,7 +152,7 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, mExRes, err := s.metaBase.StorageID(mPrm) if err != nil { - return nil, true, fmt.Errorf("can't fetch storage id from metabase: %w", err) + return true, fmt.Errorf("can't fetch storage id from metabase: %w", err) } storageID := mExRes.StorageID() @@ -157,7 +163,48 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, storageID = emptyStorageID } - res, err := cb(s.blobStor, storageID) + return true, blobFunc(s.blobStor, storageID) +} + +// GetBytes reads object from the Shard by address into memory buffer in a +// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object +// is missing. +func (s *Shard) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) { + b, _, err := s.getBytesWithMetadataLookup(addr, opts, true) + return b, err +} + +// GetBytesWithMetadataLookup works similar to [shard.GetBytes], but pre-checks +// object presence in the underlying metabase: if object cannot be accessed from +// the metabase, GetBytesWithMetadataLookup returns an error. +func (s *Shard) GetBytesWithMetadataLookup(addr oid.Address, opts common.GetBytesOptions) ([]byte, bool, error) { + return s.getBytesWithMetadataLookup(addr, opts, false) +} + +func (s *Shard) getBytesWithMetadataLookup(addr oid.Address, opts common.GetBytesOptions, skipMeta bool) ([]byte, bool, error) { + s.m.RLock() + defer s.m.RUnlock() + + var b []byte + getBufferBase := opts.GetBuffer + opts.GetBuffer = func(ln int) []byte { + if cap(b) >= ln { + return b[:ln] + } + if getBufferBase != nil { + return getBufferBase(ln) + } + return make([]byte, ln) + } - return res, true, err + hasMeta, err := s.fetchObjectData(addr, skipMeta, func(bs *blobstor.BlobStor, subStorageID []byte) error { + var err error + b, err = bs.GetBytes(addr, subStorageID, opts) + return err + }, func(w writecache.Cache) error { + var err error + b, err = w.GetBytes(addr, opts) + return err + }) + return b, hasMeta, err } diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index b7ca6dde41..f45ae9432d 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -70,47 +70,47 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) { s.m.RLock() defer s.m.RUnlock() - cb := func(stor *blobstor.BlobStor, id []byte) (*object.Object, error) { + var res RngRes + + cb := func(stor *blobstor.BlobStor, id []byte) error { var getRngPrm common.GetRangePrm getRngPrm.Address = prm.addr getRngPrm.Range.SetOffset(prm.off) getRngPrm.Range.SetLength(prm.ln) getRngPrm.StorageID = id - res, err := stor.GetRange(getRngPrm) + r, err := stor.GetRange(getRngPrm) if err != nil { - return nil, err + return err } - obj := object.New() - obj.SetPayload(res.Data) + res.obj = object.New() + res.obj.SetPayload(r.Data) - return obj, nil + return nil } - wc := func(c writecache.Cache) (*object.Object, error) { - res, err := c.Get(prm.addr) + wc := func(c writecache.Cache) error { + o, err := c.Get(prm.addr) if err != nil { - return nil, err + return err } - payload := res.Payload() + payload := o.Payload() from := prm.off to := from + prm.ln if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { - return nil, logicerr.Wrap(apistatus.ObjectOutOfRange{}) + return logicerr.Wrap(apistatus.ObjectOutOfRange{}) } - obj := object.New() - obj.SetPayload(payload[from:to]) - return obj, nil + res.obj = object.New() + res.obj.SetPayload(payload[from:to]) + return nil } skipMeta := prm.skipMeta || s.info.Mode.NoMetabase() - obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc) + var err error + res.hasMeta, err = s.fetchObjectData(prm.addr, skipMeta, cb, wc) - return RngRes{ - obj: obj, - hasMeta: hasMeta, - }, err + return res, err } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index f3be34aaa3..1b19081faf 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -9,7 +9,6 @@ import ( objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" @@ -37,33 +36,7 @@ func TestFlush(t *testing.T) { ) newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) { - dir := t.TempDir() - mb := meta.New( - meta.WithPath(filepath.Join(dir, "meta")), - meta.WithEpochState(dummyEpoch{})) - require.NoError(t, mb.Open(false)) - require.NoError(t, mb.Init()) - - fsTree := fstree.New( - fstree.WithPath(filepath.Join(dir, "blob")), - fstree.WithDepth(0), - fstree.WithDirNameLen(1)) - bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ - {Storage: fsTree}, - })) - require.NoError(t, bs.Open(false)) - require.NoError(t, bs.Init()) - - wc := New( - append([]Option{ - WithLogger(zaptest.NewLogger(t)), - WithPath(filepath.Join(dir, "writecache")), - WithSmallObjectSize(smallSize), - WithMetabase(mb), - WithBlobstor(bs), - }, opts...)...) - require.NoError(t, wc.Open(false)) - require.NoError(t, wc.Init()) + wc, bs, mb := newCache(t, smallSize, append(opts, WithLogger(zaptest.NewLogger(t)))...) // First set mode for metabase and blobstor to prevent background flushes. require.NoError(t, mb.SetMode(mode.ReadOnly)) diff --git a/pkg/local_object_storage/writecache/generic_test.go b/pkg/local_object_storage/writecache/generic_test.go index 4e97b21ea8..164d3acd43 100644 --- a/pkg/local_object_storage/writecache/generic_test.go +++ b/pkg/local_object_storage/writecache/generic_test.go @@ -6,7 +6,10 @@ import ( "strconv" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/storagetest" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -27,3 +30,34 @@ func TestGeneric(t *testing.T) { storagetest.TestAll(t, newCache) } + +func newCache(tb testing.TB, smallSize uint64, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) { + dir := tb.TempDir() + mb := meta.New( + meta.WithPath(filepath.Join(dir, "meta")), + meta.WithEpochState(dummyEpoch{})) + require.NoError(tb, mb.Open(false)) + require.NoError(tb, mb.Init()) + + fsTree := fstree.New( + fstree.WithPath(filepath.Join(dir, "blob")), + fstree.WithDepth(0), + fstree.WithDirNameLen(1)) + bs := blobstor.New( + blobstor.WithStorages([]blobstor.SubStorage{{Storage: fsTree}}), + blobstor.WithCompressObjects(true)) + require.NoError(tb, bs.Open(false)) + require.NoError(tb, bs.Init()) + + wc := New( + append([]Option{ + WithPath(filepath.Join(dir, "writecache")), + WithSmallObjectSize(smallSize), + WithMetabase(mb), + WithBlobstor(bs), + }, opts...)...) + require.NoError(tb, wc.Open(false)) + require.NoError(tb, wc.Init()) + + return wc, bs, mb +} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 2e591fda40..0a412e3435 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -1,7 +1,6 @@ package writecache import ( - "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -49,18 +48,56 @@ func (c *cache) Head(addr oid.Address) (*objectSDK.Object, error) { // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. func Get(db *bbolt.DB, key []byte) ([]byte, error) { + return get(db, key, nil) +} + +func get(db *bbolt.DB, key []byte, getBuffer func(ln int) []byte) ([]byte, error) { var value []byte err := db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) if b == nil { return ErrNoDefaultBucket } - value = b.Get(key) - if value == nil { + v := b.Get(key) + if v == nil { return logicerr.Wrap(apistatus.ObjectNotFound{}) } - value = slice.Copy(value) + if getBuffer != nil { + value = getBuffer(len(v)) + } else { + value = make([]byte, len(v)) + } + copy(value, v) return nil }) return value, err } + +func (c *cache) GetBytes(addr oid.Address, opts common.GetBytesOptions) ([]byte, error) { + saddr := addr.EncodeToString() + var b []byte + getBufferBase := opts.GetBuffer + opts.GetBuffer = func(ln int) []byte { + if cap(b) >= ln { + return b[:ln] + } + if getBufferBase != nil { + return opts.GetBuffer(ln) + } + return make([]byte, ln) + } + + b, err := get(c.db, []byte(saddr), opts.GetBuffer) + if err == nil { + c.flushed.Get(saddr) + return b, nil + } + + b, err = c.fsTree.GetBytes(addr, opts) + if err != nil { + return b, logicerr.Wrap(apistatus.ObjectNotFound{}) + } + + c.flushed.Get(saddr) + return b, nil +} diff --git a/pkg/local_object_storage/writecache/get_test.go b/pkg/local_object_storage/writecache/get_test.go new file mode 100644 index 0000000000..9ba3e692fc --- /dev/null +++ b/pkg/local_object_storage/writecache/get_test.go @@ -0,0 +1,29 @@ +package writecache + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/stretchr/testify/require" +) + +func TestCache_GetBytes(t *testing.T) { + const maxObjSize = 4 << 10 + c, _, _ := newCache(t, maxObjSize) + + o := putObject(t, c, maxObjSize/2) + objBin, err := o.obj.Marshal() + require.NoError(t, err) + + b, err := c.GetBytes(o.addr, common.GetBytesOptions{}) + require.NoError(t, err) + require.Equal(t, objBin, b) + + o = putObject(t, c, 2*maxObjSize) + objBin, err = o.obj.Marshal() + require.NoError(t, err) + + b, err = c.GetBytes(o.addr, common.GetBytesOptions{}) + require.NoError(t, err) + require.Equal(t, objBin, b) +} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 9bd9977fde..08c5c50ca6 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -21,6 +21,10 @@ type Info struct { // Cache represents write-cache for objects. type Cache interface { Get(address oid.Address) (*object.Object, error) + // GetBytes reads object from the Cache by address into memory buffer in a + // canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object + // is missing. + GetBytes(oid.Address, common.GetBytesOptions) ([]byte, error) Head(oid.Address) (*object.Object, error) // Delete removes object referenced by the given oid.Address from the // Cache. Returns any error encountered that prevented the object to be