From 7c842b9c5c730d8d7781d00887a7dcc1a5a5ef4a Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 2 Dec 2024 22:09:11 +0300 Subject: [PATCH] blobstor: replace IterateLazily with IterateAddresses Object getter callback is useless for FSTree and a bit more efficient for Peapod. But the only user of this interface is writecache which only uses it with FSTree, so some deduplication can be helpful. Signed-off-by: Roman Khimov --- cmd/neofs-lens/internal/peapod/list.go | 2 +- .../blobstor/common/storage.go | 2 +- .../blobstor/fstree/fstree.go | 20 ++++---- pkg/local_object_storage/blobstor/get_test.go | 2 +- .../blobstor/internal/blobstortest/iterate.go | 8 ++-- .../blobstor/peapod/peapod.go | 47 ++++--------------- .../blobstor/peapod/peapod_test.go | 4 +- pkg/local_object_storage/writecache/flush.go | 6 +-- pkg/local_object_storage/writecache/init.go | 4 +- .../writecache/iterate.go | 6 +-- 10 files changed, 37 insertions(+), 64 deletions(-) diff --git a/cmd/neofs-lens/internal/peapod/list.go b/cmd/neofs-lens/internal/peapod/list.go index 9dc7bdd4e2..e5b0212552 100644 --- a/cmd/neofs-lens/internal/peapod/list.go +++ b/cmd/neofs-lens/internal/peapod/list.go @@ -36,7 +36,7 @@ func listFunc(cmd *cobra.Command, _ []string) error { } defer ppd.Close() - err = ppd.IterateAddresses(wAddr) + err = ppd.IterateAddresses(wAddr, false) if err != nil { return fmt.Errorf("Peapod iterator failure: %w", err) } diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 78109ccbd4..ffcf7ea81d 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -30,7 +30,7 @@ type Storage interface { Put(oid.Address, []byte) error Delete(oid.Address) error Iterate(func(oid.Address, []byte, []byte) error, func(oid.Address, error) error) error - IterateLazily(func(oid.Address, func() ([]byte, error)) error, bool) error + IterateAddresses(func(oid.Address) error, bool) error } // Copy copies all objects from source Storage into the destination one. If any diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 1e609ed29b..c6518d26ea 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -142,21 +142,23 @@ func (t *FSTree) Iterate(objHandler func(addr oid.Address, data []byte, id []byt return t.iterate(0, []string{t.RootPath}, objHandler, errorHandler, nil) } -// IterateLazily is similar to Iterate, but allows to skip/defer object -// retrieval in the handler. Use getter function when needed. -func (t *FSTree) IterateLazily(lazyHandler func(addr oid.Address, getter func() ([]byte, error)) error, ignoreErrors bool) error { +// IterateAddresses iterates over all objects stored in the underlying storage +// and passes their addresses into f. If f returns an error, IterateAddresses +// returns it and breaks. ignoreErrors allows to continue if internal errors +// happen. +func (t *FSTree) IterateAddresses(f func(addr oid.Address) error, ignoreErrors bool) error { var errorHandler func(oid.Address, error) error if ignoreErrors { errorHandler = func(oid.Address, error) error { return nil } } - return t.iterate(0, []string{t.RootPath}, nil, errorHandler, lazyHandler) + return t.iterate(0, []string{t.RootPath}, nil, errorHandler, f) } func (t *FSTree) iterate(depth uint64, curPath []string, objHandler func(oid.Address, []byte, []byte) error, errorHandler func(oid.Address, error) error, - lazyHandler func(oid.Address, func() ([]byte, error)) error) error { + addrHandler func(oid.Address) error) error { curName := strings.Join(curPath[1:], "") dir := filepath.Join(curPath...) des, err := os.ReadDir(dir) @@ -175,7 +177,7 @@ func (t *FSTree) iterate(depth uint64, curPath []string, curPath[l] = des[i].Name() if !isLast && des[i].IsDir() { - err := t.iterate(depth+1, curPath, objHandler, errorHandler, lazyHandler) + err := t.iterate(depth+1, curPath, objHandler, errorHandler, addrHandler) if err != nil { // Must be error from handler in case errors are ignored. // Need to report. @@ -192,10 +194,8 @@ func (t *FSTree) iterate(depth uint64, curPath []string, continue } - if lazyHandler != nil { - err = lazyHandler(*addr, func() ([]byte, error) { - return getRawObjectBytes(addr.Object(), filepath.Join(curPath...)) - }) + if addrHandler != nil { + err = addrHandler(*addr) } else { var data []byte p := filepath.Join(curPath...) diff --git a/pkg/local_object_storage/blobstor/get_test.go b/pkg/local_object_storage/blobstor/get_test.go index 87f8e5f224..64e8945a6c 100644 --- a/pkg/local_object_storage/blobstor/get_test.go +++ b/pkg/local_object_storage/blobstor/get_test.go @@ -55,7 +55,7 @@ func (x *getBytesOnlySubStorage) Iterate(_ func(oid.Address, []byte, []byte) err panic("must not be called") } -func (x *getBytesOnlySubStorage) IterateLazily(_ func(oid.Address, func() ([]byte, error)) error, _ bool) error { +func (x *getBytesOnlySubStorage) IterateAddresses(_ func(oid.Address) error, _ bool) error { panic("must not be called") } diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go index 8439afee82..4baf700cc6 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go @@ -45,11 +45,11 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) { } }) - t.Run("lazy handler", func(t *testing.T) { + t.Run("addresses", func(t *testing.T) { seen := make(map[string]objectDesc) - var lazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - data, err := f() + var addrHandler = func(addr oid.Address) error { + data, err := s.GetBytes(addr) require.NoError(t, err) seen[addr.String()] = objectDesc{ @@ -59,7 +59,7 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) { return nil } - err := s.IterateLazily(lazyHandler, false) + err := s.IterateAddresses(addrHandler, false) require.NoError(t, err) require.Equal(t, len(objects), len(seen)) for i := range objects { diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go index 3b63346560..1ba8491e65 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -486,19 +486,9 @@ func (x *Peapod) Iterate(objHandler func(addr oid.Address, data []byte, id []byt return x.iterate(objHandler, errorHandler, nil) } -// IterateLazily is similar to Iterate, but allows to skip/defer object -// retrieval in the handler. Use getter function when needed. -func (x *Peapod) IterateLazily(lazyHandler func(addr oid.Address, getter func() ([]byte, error)) error, ignoreErrors bool) error { - var errorHandler func(oid.Address, error) error - if ignoreErrors { - errorHandler = func(oid.Address, error) error { return nil } - } - return x.iterate(nil, errorHandler, lazyHandler) -} - func (x *Peapod) iterate(objHandler func(oid.Address, []byte, []byte) error, errorHandler func(oid.Address, error) error, - lazyHandler func(oid.Address, func() ([]byte, error)) error) error { + addrHandler func(oid.Address) error) error { err := x.bolt.View(func(tx *bbolt.Tx) error { bktRoot := tx.Bucket(rootBucket) if bktRoot == nil { @@ -525,10 +515,8 @@ func (x *Peapod) iterate(objHandler func(oid.Address, []byte, []byte) error, return fmt.Errorf("decompress value for object '%s': %w", addr, err) } - if lazyHandler != nil { - return lazyHandler(addr, func() ([]byte, error) { - return v, nil - }) + if addrHandler != nil { + return addrHandler(addr) } return objHandler(addr, v, storageID) @@ -543,28 +531,13 @@ func (x *Peapod) iterate(objHandler func(oid.Address, []byte, []byte) error, // IterateAddresses iterates over all objects stored in the underlying database // and passes their addresses into f. If f returns an error, IterateAddresses -// returns it and breaks. -func (x *Peapod) IterateAddresses(f func(addr oid.Address) error) error { - var addr oid.Address - - err := x.bolt.View(func(tx *bbolt.Tx) error { - bktRoot := tx.Bucket(rootBucket) - if bktRoot == nil { - return errMissingRootBucket - } - - return bktRoot.ForEach(func(k, v []byte) error { - err := decodeKeyForObject(&addr, k) - if err != nil { - return fmt.Errorf("decode object address from bucket key: %w", err) - } - - return f(addr) - }) - }) - if err != nil { - return fmt.Errorf("exec read-only BoltDB transaction: %w", err) +// returns it and breaks. ignoreErrors allows to continue if internal errors +// happen. +func (x *Peapod) IterateAddresses(f func(addr oid.Address) error, ignoreErrors bool) error { + var errorHandler func(oid.Address, error) error + if ignoreErrors { + errorHandler = func(oid.Address, error) error { return nil } } - return nil + return x.iterate(nil, errorHandler, f) } diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go index a1ef089c34..9a104127b1 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod_test.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -220,7 +220,7 @@ func TestPeapod_IterateAddresses(t *testing.T) { return nil } - err := ppd.IterateAddresses(f) + err := ppd.IterateAddresses(f, false) require.NoError(t, err) require.Empty(t, mDst) @@ -229,7 +229,7 @@ func TestPeapod_IterateAddresses(t *testing.T) { require.NoError(t, err) } - err = ppd.IterateAddresses(f) + err = ppd.IterateAddresses(f, false) require.NoError(t, err) require.Equal(t, mSrc, mDst) } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 42a6443633..e08f9df77f 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -173,14 +173,14 @@ func (c *cache) reportFlushError(msg string, addr string, err error) { } func (c *cache) flushFSTree(ignoreErrors bool) error { - var lazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { + var addrHandler = func(addr oid.Address) error { sAddr := addr.EncodeToString() if _, ok := c.store.flushed.Peek(sAddr); ok { return nil } - data, err := f() + data, err := c.fsTree.GetBytes(addr) if err != nil { if errors.As(err, new(apistatus.ObjectNotFound)) { // an object can be removed b/w iterating over it @@ -219,7 +219,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error { return nil } - return c.fsTree.IterateLazily(lazyHandler, ignoreErrors) + return c.fsTree.IterateAddresses(addrHandler, ignoreErrors) } // flushWorker writes objects to the main storage. diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index 3000c0a2c7..9ff22bcf38 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -14,7 +14,7 @@ import ( func (c *cache) initFlushMarks() { c.log.Info("filling flush marks for objects in FSTree") - var lazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { + var addrHandler = func(addr oid.Address) error { flushed, needRemove := c.flushStatus(addr) if flushed { c.store.flushed.Add(addr.EncodeToString(), true) @@ -31,7 +31,7 @@ func (c *cache) initFlushMarks() { } return nil } - _ = c.fsTree.IterateLazily(lazyHandler, false) + _ = c.fsTree.IterateAddresses(addrHandler, false) c.log.Info("filling flush marks for objects in database") diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 071bf68e01..243b53340e 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -40,11 +40,11 @@ func (c *cache) Iterate(handler func(oid.Address, []byte) error, ignoreErrors bo return err } - var lazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { + var addrHandler = func(addr oid.Address) error { if _, ok := c.flushed.Peek(addr.EncodeToString()); ok { return nil } - data, err := f() + data, err := c.fsTree.GetBytes(addr) if err != nil { if ignoreErrors || errors.As(err, new(apistatus.ObjectNotFound)) { // an object can be removed b/w iterating over it @@ -56,7 +56,7 @@ func (c *cache) Iterate(handler func(oid.Address, []byte) error, ignoreErrors bo return handler(addr, data) } - return c.fsTree.IterateLazily(lazyHandler, ignoreErrors) + return c.fsTree.IterateAddresses(addrHandler, ignoreErrors) } // IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.