Skip to content

Commit

Permalink
cli: Collect storagegroup members with new V2 split
Browse files Browse the repository at this point in the history
It was easier to implement it from scratch than to try to understand callbacks
and recursion.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Mar 20, 2024
1 parent ba7fe68 commit fb9b4e7
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 29 deletions.
44 changes: 40 additions & 4 deletions cmd/neofs-cli/modules/storagegroup/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/key"
objectCli "github.com/nspcc-dev/neofs-node/cmd/neofs-cli/modules/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -80,6 +81,7 @@ func putSG(cmd *cobra.Command, _ []string) {
headPrm internalclient.HeadObjectPrm
putPrm internalclient.PutObjectPrm
getCnrPrm internalclient.GetContainerPrm
getPrm internalclient.GetObjectPrm

Check warning on line 84 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L84

Added line #L84 was not covered by tests
)

cli := internalclient.GetSDKClientByFlag(ctx, cmd, commonflags.RPC)
Expand All @@ -96,12 +98,19 @@ func putSG(cmd *cobra.Command, _ []string) {
headPrm.SetClient(cli)
headPrm.SetPrivateKey(*pk)

headPrm.SetRawFlag(true)
getPrm.SetClient(cli)
getPrm.SetPrivateKey(*pk)
objectCli.Prepare(cmd, &getPrm)

Check warning on line 104 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L101-L104

Added lines #L101 - L104 were not covered by tests

sg, err := storagegroup.CollectMembers(sgHeadReceiver{
ctx: ctx,
cmd: cmd,
key: pk,
ownerID: &ownerID,
prm: headPrm,
prmHead: headPrm,
cli: cli,
getPrm: getPrm,

Check warning on line 113 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L111-L113

Added lines #L111 - L113 were not covered by tests
}, cnr, members, !resGetCnr.Container().IsHomomorphicHashingDisabled())
common.ExitOnErr(cmd, "could not collect storage group members: %w", err)

Expand Down Expand Up @@ -138,13 +147,40 @@ type sgHeadReceiver struct {
cmd *cobra.Command
key *ecdsa.PrivateKey
ownerID *user.ID
prm internalclient.HeadObjectPrm
prmHead internalclient.HeadObjectPrm
cli *client.Client
getPrm internalclient.GetObjectPrm
}

type payloadWriter struct {
payload []byte
}

func (pw *payloadWriter) Write(p []byte) (n int, err error) {
pw.payload = append(pw.payload, p...)
return len(p), nil

Check warning on line 161 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L159-L161

Added lines #L159 - L161 were not covered by tests
}

func (c sgHeadReceiver) Get(addr oid.Address) (object.Object, error) {
pw := &payloadWriter{}
c.getPrm.SetPayloadWriter(pw)
c.getPrm.SetAddress(addr)

Check warning on line 167 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L164-L167

Added lines #L164 - L167 were not covered by tests

res, err := internalclient.GetObject(c.ctx, c.getPrm)
if err != nil {
return object.Object{}, fmt.Errorf("rpc error: %w", err)

Check warning on line 171 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L169-L171

Added lines #L169 - L171 were not covered by tests
}

obj := res.Header()
obj.SetPayload(pw.payload)

Check warning on line 175 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L174-L175

Added lines #L174 - L175 were not covered by tests

return *obj, nil

Check warning on line 177 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L177

Added line #L177 was not covered by tests
}

func (c sgHeadReceiver) Head(addr oid.Address) (any, error) {
c.prm.SetAddress(addr)
c.prmHead.SetAddress(addr)

Check warning on line 181 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L181

Added line #L181 was not covered by tests

res, err := internalclient.HeadObject(c.ctx, c.prm)
res, err := internalclient.HeadObject(c.ctx, c.prmHead)

Check warning on line 183 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L183

Added line #L183 was not covered by tests

var errSplitInfo *object.SplitInfoError

Expand Down
184 changes: 161 additions & 23 deletions pkg/services/object/util/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@ import (
"errors"
"fmt"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// HeadReceiver is an interface of entity that can receive
// object header or the information about the object relations.
type HeadReceiver interface {
// ObjectSource is an interface of entity that can receive
// object header, the whole object or the information about
// the object relations.
type ObjectSource interface {
// Head must return one of:
// * object header (*object.Object);
// * structured information about split-chain (*object.SplitInfo).
Head(id oid.Address) (any, error)

// Get must return object by its address.
Get(address oid.Address) (object.Object, error)
}

// SplitMemberHandler is a handler of next split-chain element.
Expand All @@ -24,7 +29,7 @@ type HeadReceiver interface {
type SplitMemberHandler func(member *object.Object, reverseDirection bool) (stop bool)

// IterateAllSplitLeaves is an iterator over all object split-tree leaves in direct order.
func IterateAllSplitLeaves(r HeadReceiver, addr oid.Address, h func(*object.Object)) error {
func IterateAllSplitLeaves(r ObjectSource, addr oid.Address, h func(*object.Object)) error {

Check warning on line 32 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L32

Added line #L32 was not covered by tests
return IterateSplitLeaves(r, addr, func(leaf *object.Object) bool {
h(leaf)
return false
Expand All @@ -34,45 +39,178 @@ func IterateAllSplitLeaves(r HeadReceiver, addr oid.Address, h func(*object.Obje
// IterateSplitLeaves is an iterator over object split-tree leaves in direct order.
//
// If member handler returns true, then the iterator aborts without error.
func IterateSplitLeaves(r HeadReceiver, addr oid.Address, h func(*object.Object) bool) error {
var (
reverse bool
leaves []*object.Object
)

if err := TraverseSplitChain(r, addr, func(member *object.Object, reverseDirection bool) (stop bool) {
reverse = reverseDirection

if reverse {
leaves = append(leaves, member)
return false
func IterateSplitLeaves(r ObjectSource, addr oid.Address, h func(*object.Object) bool) error {
info, err := r.Head(addr)
if err != nil {
return fmt.Errorf("receiving information about the object: %w", err)

Check warning on line 45 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L42-L45

Added lines #L42 - L45 were not covered by tests
}

switch res := info.(type) {
default:
panic(fmt.Sprintf("unexpected result of %T: %T", r, info))
case *object.Object:
h(res)
case *object.SplitInfo:
if res.SplitID() == nil {
return iterateV2Split(r, res, addr.Container(), h)
} else {
return iterateV1Split(r, res, addr.Container(), h)

Check warning on line 57 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L48-L57

Added lines #L48 - L57 were not covered by tests
}
}

return h(member)
}); err != nil {
return err
return nil

Check warning on line 61 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L61

Added line #L61 was not covered by tests
}

func iterateV1Split(r ObjectSource, info *object.SplitInfo, cID cid.ID, handler func(*object.Object) bool) error {
var addr oid.Address
addr.SetContainer(cID)

Check warning on line 66 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L64-L66

Added lines #L64 - L66 were not covered by tests

linkID, ok := info.Link()
if ok {
addr.SetObject(linkID)

Check warning on line 70 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L68-L70

Added lines #L68 - L70 were not covered by tests

linkObj, err := headFromReceiver(r, addr)
if err != nil {
return err

Check warning on line 74 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L72-L74

Added lines #L72 - L74 were not covered by tests
}

for _, child := range linkObj.Children() {
addr.SetObject(child)

Check warning on line 78 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L77-L78

Added lines #L77 - L78 were not covered by tests

childHeader, err := headFromReceiver(r, addr)
if err != nil {
return err

Check warning on line 82 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L80-L82

Added lines #L80 - L82 were not covered by tests
}

if stop := handler(childHeader); stop {
return nil

Check warning on line 86 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
}

handler(linkObj)

Check warning on line 90 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L90

Added line #L90 was not covered by tests

return nil

Check warning on line 92 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L92

Added line #L92 was not covered by tests
}

lastID, ok := info.LastPart()
if ok {
addr.SetObject(lastID)
return iterateFromLastObject(r, addr, handler)

Check warning on line 98 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L95-L98

Added lines #L95 - L98 were not covered by tests
}

for i := len(leaves) - 1; i >= 0; i-- {
if h(leaves[i]) {
return errors.New("neither link, nor last object ID is found")

Check warning on line 101 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L101

Added line #L101 was not covered by tests
}

func iterateV2Split(r ObjectSource, info *object.SplitInfo, cID cid.ID, handler func(*object.Object) bool) error {
var addr oid.Address
addr.SetContainer(cID)

Check warning on line 106 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L104-L106

Added lines #L104 - L106 were not covered by tests

linkID, ok := info.Link()
if ok {
addr.SetObject(linkID)

Check warning on line 110 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L108-L110

Added lines #L108 - L110 were not covered by tests

linkObjRaw, err := r.Get(addr)
if err != nil {
return fmt.Errorf("receiving link object %s: %w", addr, err)

Check warning on line 114 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L112-L114

Added lines #L112 - L114 were not covered by tests
}

if stop := handler(&linkObjRaw); stop {
return nil

Check warning on line 118 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

var linkObj object.Link
err = linkObjRaw.ReadLink(&linkObj)
if err != nil {
return fmt.Errorf("decoding link object (%d): %w", addr, err)

Check warning on line 124 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L121-L124

Added lines #L121 - L124 were not covered by tests
}

for _, child := range linkObj.Objects() {
addr.SetObject(child.ObjectID())

Check warning on line 128 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L127-L128

Added lines #L127 - L128 were not covered by tests

childObj, err := headFromReceiver(r, addr)
if err != nil {
return fmt.Errorf("fetching child object (%s): %w", addr, err)

Check warning on line 132 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}

if stop := handler(childObj); stop {
return nil

Check warning on line 136 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L135-L136

Added lines #L135 - L136 were not covered by tests
}
}

return nil

Check warning on line 140 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L140

Added line #L140 was not covered by tests
}

lastID, ok := info.LastPart()
if ok {
addr.SetObject(lastID)
return iterateFromLastObject(r, addr, handler)

Check warning on line 146 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L143-L146

Added lines #L143 - L146 were not covered by tests
}

return errors.New("neither link, nor last object ID is found")

Check warning on line 149 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L149

Added line #L149 was not covered by tests
}

func iterateFromLastObject(r ObjectSource, lastAddr oid.Address, handler func(*object.Object) bool) error {
var idBuff []oid.ID
addr := lastAddr

Check warning on line 154 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L152-L154

Added lines #L152 - L154 were not covered by tests

for {
obj, err := headFromReceiver(r, addr)
if err != nil {
return err

Check warning on line 159 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L156-L159

Added lines #L156 - L159 were not covered by tests
}

oID, _ := obj.ID()
idBuff = append(idBuff, oID)

Check warning on line 163 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L162-L163

Added lines #L162 - L163 were not covered by tests

prevOID, set := obj.PreviousID()
if !set {

Check warning on line 166 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L165-L166

Added lines #L165 - L166 were not covered by tests
break
}

addr.SetObject(prevOID)

Check warning on line 170 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L170

Added line #L170 was not covered by tests
}

for i := len(idBuff) - 1; i >= 0; i-- {
addr.SetObject(idBuff[i])

Check warning on line 174 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L173-L174

Added lines #L173 - L174 were not covered by tests

childObj, err := headFromReceiver(r, addr)
if err != nil {
return err

Check warning on line 178 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L176-L178

Added lines #L176 - L178 were not covered by tests
}

if stop := handler(childObj); stop {
return nil

Check warning on line 182 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L181-L182

Added lines #L181 - L182 were not covered by tests
}
}

return nil
}

func headFromReceiver(r ObjectSource, addr oid.Address) (*object.Object, error) {
res, err := r.Head(addr)
if err != nil {
return nil, fmt.Errorf("fetching information about %s: %w", addr, err)

Check warning on line 192 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L189-L192

Added lines #L189 - L192 were not covered by tests
}

switch v := res.(type) {
case *object.Object:
return v, nil
default:
return nil, fmt.Errorf("unexpected information: %T", res)

Check warning on line 199 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L195-L199

Added lines #L195 - L199 were not covered by tests
}
}

// TraverseSplitChain is an iterator over object split-tree leaves.
//
// Traversal occurs in one of two directions, which depends on what pslit info was received:
// * in direct order for link part;
// * in reverse order for last part.
func TraverseSplitChain(r HeadReceiver, addr oid.Address, h SplitMemberHandler) error {
func TraverseSplitChain(r ObjectSource, addr oid.Address, h SplitMemberHandler) error {

Check warning on line 208 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L208

Added line #L208 was not covered by tests
_, err := traverseSplitChain(r, addr, h)
return err
}

func traverseSplitChain(r HeadReceiver, addr oid.Address, h SplitMemberHandler) (bool, error) {
func traverseSplitChain(r ObjectSource, addr oid.Address, h SplitMemberHandler) (bool, error) {

Check warning on line 213 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L213

Added line #L213 was not covered by tests
v, err := r.Head(addr)
if err != nil {
return false, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/object_manager/storagegroup/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ var (
)

// CollectMembers creates new storage group structure and fills it
// with information about members collected via HeadReceiver.
// with information about members collected via ObjectSource.
//
// Resulting storage group consists of physically stored objects only.
func CollectMembers(r objutil.HeadReceiver, cnr cid.ID, members []oid.ID, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
func CollectMembers(r objutil.ObjectSource, cnr cid.ID, members []oid.ID, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
var (
sumPhySize uint64
phyMembers []oid.ID
Expand Down
4 changes: 4 additions & 0 deletions pkg/services/object_manager/storagegroup/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type mockedObjects struct {
hdr *object.Object
}

func (x *mockedObjects) Get(address oid.Address) (object.Object, error) {
return *x.hdr, nil
}

func (x *mockedObjects) Head(_ oid.Address) (any, error) {
return x.hdr, nil
}
Expand Down

0 comments on commit fb9b4e7

Please sign in to comment.