Skip to content

Commit

Permalink
blobstor: replace IterateLazily with IterateAddresses
Browse files Browse the repository at this point in the history
Object getter callback is useless for FSTree and a bit more efficient for
Peapod. But the only user of this interface is writecache which only uses it
with FSTree, so some deduplication can be helpful.

Signed-off-by: Roman Khimov <[email protected]>
  • Loading branch information
roman-khimov committed Dec 2, 2024
1 parent 2a00009 commit 7c842b9
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 64 deletions.
2 changes: 1 addition & 1 deletion cmd/neofs-lens/internal/peapod/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func listFunc(cmd *cobra.Command, _ []string) error {
}
defer ppd.Close()

err = ppd.IterateAddresses(wAddr)
err = ppd.IterateAddresses(wAddr, false)

Check warning on line 39 in cmd/neofs-lens/internal/peapod/list.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-lens/internal/peapod/list.go#L39

Added line #L39 was not covered by tests
if err != nil {
return fmt.Errorf("Peapod iterator failure: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Storage interface {
Put(oid.Address, []byte) error
Delete(oid.Address) error
Iterate(func(oid.Address, []byte, []byte) error, func(oid.Address, error) error) error
IterateLazily(func(oid.Address, func() ([]byte, error)) error, bool) error
IterateAddresses(func(oid.Address) error, bool) error
}

// Copy copies all objects from source Storage into the destination one. If any
Expand Down
20 changes: 10 additions & 10 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,23 @@ func (t *FSTree) Iterate(objHandler func(addr oid.Address, data []byte, id []byt
return t.iterate(0, []string{t.RootPath}, objHandler, errorHandler, nil)
}

// IterateLazily is similar to Iterate, but allows to skip/defer object
// retrieval in the handler. Use getter function when needed.
func (t *FSTree) IterateLazily(lazyHandler func(addr oid.Address, getter func() ([]byte, error)) error, ignoreErrors bool) error {
// IterateAddresses iterates over all objects stored in the underlying storage
// and passes their addresses into f. If f returns an error, IterateAddresses
// returns it and breaks. ignoreErrors allows to continue if internal errors
// happen.
func (t *FSTree) IterateAddresses(f func(addr oid.Address) error, ignoreErrors bool) error {
var errorHandler func(oid.Address, error) error
if ignoreErrors {
errorHandler = func(oid.Address, error) error { return nil }

Check warning on line 152 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L152

Added line #L152 was not covered by tests
}

return t.iterate(0, []string{t.RootPath}, nil, errorHandler, lazyHandler)
return t.iterate(0, []string{t.RootPath}, nil, errorHandler, f)
}

func (t *FSTree) iterate(depth uint64, curPath []string,
objHandler func(oid.Address, []byte, []byte) error,
errorHandler func(oid.Address, error) error,
lazyHandler func(oid.Address, func() ([]byte, error)) error) error {
addrHandler func(oid.Address) error) error {
curName := strings.Join(curPath[1:], "")
dir := filepath.Join(curPath...)
des, err := os.ReadDir(dir)
Expand All @@ -175,7 +177,7 @@ func (t *FSTree) iterate(depth uint64, curPath []string,
curPath[l] = des[i].Name()

if !isLast && des[i].IsDir() {
err := t.iterate(depth+1, curPath, objHandler, errorHandler, lazyHandler)
err := t.iterate(depth+1, curPath, objHandler, errorHandler, addrHandler)
if err != nil {
// Must be error from handler in case errors are ignored.
// Need to report.
Expand All @@ -192,10 +194,8 @@ func (t *FSTree) iterate(depth uint64, curPath []string,
continue
}

if lazyHandler != nil {
err = lazyHandler(*addr, func() ([]byte, error) {
return getRawObjectBytes(addr.Object(), filepath.Join(curPath...))
})
if addrHandler != nil {
err = addrHandler(*addr)
} else {
var data []byte
p := filepath.Join(curPath...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/blobstor/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (x *getBytesOnlySubStorage) Iterate(_ func(oid.Address, []byte, []byte) err
panic("must not be called")
}

func (x *getBytesOnlySubStorage) IterateLazily(_ func(oid.Address, func() ([]byte, error)) error, _ bool) error {
func (x *getBytesOnlySubStorage) IterateAddresses(_ func(oid.Address) error, _ bool) error {
panic("must not be called")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
}
})

t.Run("lazy handler", func(t *testing.T) {
t.Run("addresses", func(t *testing.T) {
seen := make(map[string]objectDesc)

var lazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
data, err := f()
var addrHandler = func(addr oid.Address) error {
data, err := s.GetBytes(addr)
require.NoError(t, err)

seen[addr.String()] = objectDesc{
Expand All @@ -59,7 +59,7 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
return nil
}

err := s.IterateLazily(lazyHandler, false)
err := s.IterateAddresses(addrHandler, false)
require.NoError(t, err)
require.Equal(t, len(objects), len(seen))
for i := range objects {
Expand Down
47 changes: 10 additions & 37 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,19 +486,9 @@ func (x *Peapod) Iterate(objHandler func(addr oid.Address, data []byte, id []byt
return x.iterate(objHandler, errorHandler, nil)
}

// IterateLazily is similar to Iterate, but allows to skip/defer object
// retrieval in the handler. Use getter function when needed.
func (x *Peapod) IterateLazily(lazyHandler func(addr oid.Address, getter func() ([]byte, error)) error, ignoreErrors bool) error {
var errorHandler func(oid.Address, error) error
if ignoreErrors {
errorHandler = func(oid.Address, error) error { return nil }
}
return x.iterate(nil, errorHandler, lazyHandler)
}

func (x *Peapod) iterate(objHandler func(oid.Address, []byte, []byte) error,
errorHandler func(oid.Address, error) error,
lazyHandler func(oid.Address, func() ([]byte, error)) error) error {
addrHandler func(oid.Address) error) error {
err := x.bolt.View(func(tx *bbolt.Tx) error {
bktRoot := tx.Bucket(rootBucket)
if bktRoot == nil {
Expand All @@ -525,10 +515,8 @@ func (x *Peapod) iterate(objHandler func(oid.Address, []byte, []byte) error,
return fmt.Errorf("decompress value for object '%s': %w", addr, err)
}

if lazyHandler != nil {
return lazyHandler(addr, func() ([]byte, error) {
return v, nil
})
if addrHandler != nil {
return addrHandler(addr)
}

return objHandler(addr, v, storageID)
Expand All @@ -543,28 +531,13 @@ func (x *Peapod) iterate(objHandler func(oid.Address, []byte, []byte) error,

// IterateAddresses iterates over all objects stored in the underlying database
// and passes their addresses into f. If f returns an error, IterateAddresses
// returns it and breaks.
func (x *Peapod) IterateAddresses(f func(addr oid.Address) error) error {
var addr oid.Address

err := x.bolt.View(func(tx *bbolt.Tx) error {
bktRoot := tx.Bucket(rootBucket)
if bktRoot == nil {
return errMissingRootBucket
}

return bktRoot.ForEach(func(k, v []byte) error {
err := decodeKeyForObject(&addr, k)
if err != nil {
return fmt.Errorf("decode object address from bucket key: %w", err)
}

return f(addr)
})
})
if err != nil {
return fmt.Errorf("exec read-only BoltDB transaction: %w", err)
// returns it and breaks. ignoreErrors allows to continue if internal errors
// happen.
func (x *Peapod) IterateAddresses(f func(addr oid.Address) error, ignoreErrors bool) error {
var errorHandler func(oid.Address, error) error
if ignoreErrors {
errorHandler = func(oid.Address, error) error { return nil }

Check warning on line 539 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L539

Added line #L539 was not covered by tests
}

return nil
return x.iterate(nil, errorHandler, f)
}
4 changes: 2 additions & 2 deletions pkg/local_object_storage/blobstor/peapod/peapod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestPeapod_IterateAddresses(t *testing.T) {
return nil
}

err := ppd.IterateAddresses(f)
err := ppd.IterateAddresses(f, false)
require.NoError(t, err)
require.Empty(t, mDst)

Expand All @@ -229,7 +229,7 @@ func TestPeapod_IterateAddresses(t *testing.T) {
require.NoError(t, err)
}

err = ppd.IterateAddresses(f)
err = ppd.IterateAddresses(f, false)
require.NoError(t, err)
require.Equal(t, mSrc, mDst)
}
6 changes: 3 additions & 3 deletions pkg/local_object_storage/writecache/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
}

func (c *cache) flushFSTree(ignoreErrors bool) error {
var lazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
var addrHandler = func(addr oid.Address) error {
sAddr := addr.EncodeToString()

if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}

data, err := f()
data, err := c.fsTree.GetBytes(addr)
if err != nil {
if errors.As(err, new(apistatus.ObjectNotFound)) {
// an object can be removed b/w iterating over it
Expand Down Expand Up @@ -219,7 +219,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return nil
}

return c.fsTree.IterateLazily(lazyHandler, ignoreErrors)
return c.fsTree.IterateAddresses(addrHandler, ignoreErrors)
}

// flushWorker writes objects to the main storage.
Expand Down
4 changes: 2 additions & 2 deletions pkg/local_object_storage/writecache/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func (c *cache) initFlushMarks() {
c.log.Info("filling flush marks for objects in FSTree")

var lazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
var addrHandler = func(addr oid.Address) error {
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
Expand All @@ -31,7 +31,7 @@ func (c *cache) initFlushMarks() {
}
return nil
}
_ = c.fsTree.IterateLazily(lazyHandler, false)
_ = c.fsTree.IterateAddresses(addrHandler, false)

c.log.Info("filling flush marks for objects in database")

Expand Down
6 changes: 3 additions & 3 deletions pkg/local_object_storage/writecache/iterate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func (c *cache) Iterate(handler func(oid.Address, []byte) error, ignoreErrors bo
return err
}

var lazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
var addrHandler = func(addr oid.Address) error {

Check warning on line 43 in pkg/local_object_storage/writecache/iterate.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/writecache/iterate.go#L43

Added line #L43 was not covered by tests
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f()
data, err := c.fsTree.GetBytes(addr)

Check warning on line 47 in pkg/local_object_storage/writecache/iterate.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/writecache/iterate.go#L47

Added line #L47 was not covered by tests
if err != nil {
if ignoreErrors || errors.As(err, new(apistatus.ObjectNotFound)) {
// an object can be removed b/w iterating over it
Expand All @@ -56,7 +56,7 @@ func (c *cache) Iterate(handler func(oid.Address, []byte) error, ignoreErrors bo
return handler(addr, data)
}

return c.fsTree.IterateLazily(lazyHandler, ignoreErrors)
return c.fsTree.IterateAddresses(addrHandler, ignoreErrors)

Check warning on line 59 in pkg/local_object_storage/writecache/iterate.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/writecache/iterate.go#L59

Added line #L59 was not covered by tests
}

// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
Expand Down

0 comments on commit 7c842b9

Please sign in to comment.