Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/Object transformer package #2671

Merged
merged 4 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions pkg/services/object/get/prm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"hash"

coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -93,15 +94,9 @@ type ChunkWriter interface {
WriteChunk([]byte) error
}

// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(*object.Object) error
}

// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
HeaderWriter
internal.HeaderWriter
ChunkWriter
}

Expand Down Expand Up @@ -166,7 +161,7 @@ func (p *commonPrm) WithCachedSignerKey(signerKey *ecdsa.PrivateKey) {
}

// SetHeaderWriter sets target component to write the object header.
func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) {
func (p *HeadPrm) SetHeaderWriter(w internal.HeaderWriter) {
p.objWriter = &partWriter{
headWriter: w,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/object"
Expand All @@ -35,7 +35,7 @@
type partWriter struct {
ObjectWriter

headWriter HeaderWriter
headWriter internal.HeaderWriter

chunkWriter ChunkWriter
}
Expand Down Expand Up @@ -192,7 +192,7 @@
prm.SetRawFlag()
}

res, err := internal.GetObject(prm)
res, err := internalclient.GetObject(prm)

Check warning on line 195 in pkg/services/object/get/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/get/util.go#L195

Added line #L195 was not covered by tests
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package transformer
package internal

import (
"io"
Expand All @@ -7,13 +7,9 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// AccessIdentifiers groups the result of the writing object operation.
type AccessIdentifiers struct {
id oid.ID
}

// ObjectTarget is an interface of the object writer.
type ObjectTarget interface {
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
// WriteHeader writes object header w/ payload part.
// The payload of the object may be incomplete.
//
Expand All @@ -23,7 +19,11 @@ type ObjectTarget interface {
//
// Must not be called after Close call.
WriteHeader(*object.Object) error
}

// Target is an interface of the object writer.
type Target interface {
HeaderWriter
// Write writes object payload chunk.
//
// Can be called multiple times.
Expand All @@ -33,31 +33,11 @@ type ObjectTarget interface {

// Close is used to finish object writing.
//
// Close must return access identifiers of the object
// Close must return ID of the object
// that has been written.
//
// Must be called no more than once. Control remains with the caller.
// Re-calling can lead to undefined behavior
// that depends on the implementation.
Close() (*AccessIdentifiers, error)
}

// TargetInitializer represents ObjectTarget constructor.
type TargetInitializer func() ObjectTarget

// SelfID returns identifier of the written object.
func (a AccessIdentifiers) SelfID() oid.ID {
return a.id
}

// WithSelfID returns AccessIdentifiers with passed self identifier.
func (a *AccessIdentifiers) WithSelfID(v oid.ID) *AccessIdentifiers {
res := a
if res == nil {
res = new(AccessIdentifiers)
}

res.id = v

return res
Close() (oid.ID, error)
}
17 changes: 8 additions & 9 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/object"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error)
Close() (oid.ID, error)
}

type distributedTarget struct {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (t *distributedTarget) Write(p []byte) (n int, err error) {
return len(p), nil
}

func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
func (t *distributedTarget) Close() (oid.ID, error) {
defer func() {
putPayload(t.payload)
t.payload = nil
Expand All @@ -137,7 +137,7 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
var err error

if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
return oid.ID{}, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}

if len(t.obj.Children()) > 0 {
Expand All @@ -163,14 +163,14 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
return nil
}

func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) {
func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (oid.ID, error) {
id, _ := t.obj.ID()

traverser, err := placement.NewTraverser(
append(t.traversal.opts, placement.ForObject(id))...,
)
if err != nil {
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
return oid.ID{}, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
}

var resErr atomic.Value
Expand Down Expand Up @@ -239,7 +239,7 @@ loop:

err.singleErr, _ = resErr.Load().(error)

return nil, err
return oid.ID{}, err
}

// perform additional container broadcast if needed
Expand All @@ -256,6 +256,5 @@ loop:

id, _ = t.obj.ID()

return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
return id, nil
}
12 changes: 5 additions & 7 deletions pkg/services/object/put/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)
Expand Down Expand Up @@ -38,28 +37,27 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet
return nil
}

func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
func (t *localTarget) Close() (oid.ID, error) {
switch t.meta.Type() {
case object.TypeTombstone:
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil {
return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
return oid.ID{}, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}
case object.TypeLock:
err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil {
return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
return oid.ID{}, fmt.Errorf("could not lock object from lock objects locally: %w", err)
}
default:
// objects that do not change meta storage
}

if err := t.storage.Put(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
return oid.ID{}, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
}

id, _ := t.obj.ID()

return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
return id, nil
}
13 changes: 6 additions & 7 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

type remoteTarget struct {
Expand Down Expand Up @@ -49,7 +49,7 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
return nil
}

func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
func (t *remoteTarget) Close() (oid.ID, error) {
var sessionInfo *util.SessionInfo

if tok := t.commonPrm.SessionToken(); tok != nil {
Expand All @@ -61,12 +61,12 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {

key, err := t.keyStorage.GetKey(sessionInfo)
if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
return oid.ID{}, fmt.Errorf("(%T) could not receive private key: %w", t, err)
}

c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
return oid.ID{}, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
}

var prm internalclient.PutObjectPrm
Expand All @@ -81,11 +81,10 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {

res, err := internalclient.PutObject(prm)
if err != nil {
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
return oid.ID{}, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
}

return new(transformer.AccessIdentifiers).
WithSelfID(res.ID()), nil
return res.ID(), nil
}

// NewRemoteSender creates, initializes and returns new RemoteSender instance.
Expand Down
33 changes: 16 additions & 17 deletions pkg/services/object/put/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, internal/smth is used, because internal itself doesn't mean a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you suggest naming? object? target? objectarget?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange enough we have a whole package just for this interface. To me it looks a lot like a part of pkg/services/object/put. But there it'd be internal. But then who needs an internal interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2671 (comment) describes where i saw this interface and why

part of it is used by the get service too. i wanted to see more usages but somehow we have so many WriteChunk and Write(object.Object) error (not the oi.Writer) so it does not look so useful sadly. but we can continue this refactor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get doesn't need it. It has the thing it needs already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont you like how HeaderWriter is included in the Target and how every object interface can be placed in a single common file? every header should be written with the same interface does not matter what service does it

"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
Expand All @@ -20,31 +21,31 @@ type slicingTarget struct {
maxObjSize uint64
homoHashDisabled bool

initNextTarget transformer.TargetInitializer
nextTarget internal.Target

payloadWriter *slicer.PayloadWriter
}

// returns transformer.ObjectTarget for raw root object streamed by the client
// returns [internal.Target] for raw root object streamed by the client
// with payload slicing and child objects' formatting. Each ready child object
// is written into destination target constructed via the given transformer.TargetInitializer.
// is written into destination target constructed via the given [internal.Target].
func newSlicingTarget(
ctx context.Context,
maxObjSize uint64,
homoHashDisabled bool,
signer user.Signer,
sessionToken *session.Object,
curEpoch uint64,
initNextTarget transformer.TargetInitializer,
) transformer.ObjectTarget {
initNextTarget internal.Target,
) internal.Target {
return &slicingTarget{
ctx: ctx,
signer: signer,
sessionToken: sessionToken,
currentEpoch: curEpoch,
maxObjSize: maxObjSize,
homoHashDisabled: homoHashDisabled,
initNextTarget: initNextTarget,
nextTarget: initNextTarget,
}
}

Expand All @@ -61,7 +62,7 @@ func (x *slicingTarget) WriteHeader(hdr *object.Object) error {

var err error
x.payloadWriter, err = slicer.InitPut(x.ctx, &readyObjectWriter{
initNextTarget: x.initNextTarget,
nextTarget: x.nextTarget,
}, *hdr, x.signer, opts)
if err != nil {
return fmt.Errorf("init object slicer: %w", err)
Expand All @@ -74,36 +75,34 @@ func (x *slicingTarget) Write(p []byte) (n int, err error) {
return x.payloadWriter.Write(p)
}

func (x *slicingTarget) Close() (*transformer.AccessIdentifiers, error) {
func (x *slicingTarget) Close() (oid.ID, error) {
err := x.payloadWriter.Close()
if err != nil {
return nil, fmt.Errorf("finish object slicing: %w", err)
return oid.ID{}, fmt.Errorf("finish object slicing: %w", err)
}

return new(transformer.AccessIdentifiers).WithSelfID(x.payloadWriter.ID()), nil
return x.payloadWriter.ID(), nil
}

// implements slicer.ObjectWriter for ready child objects.
type readyObjectWriter struct {
initNextTarget transformer.TargetInitializer
nextTarget internal.Target
}

func (x *readyObjectWriter) ObjectPutInit(_ context.Context, hdr object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) {
tgt := x.initNextTarget()

err := tgt.WriteHeader(&hdr)
err := x.nextTarget.WriteHeader(&hdr)
if err != nil {
return nil, err
}

return &readyObjectPayloadWriter{
target: tgt,
target: x.nextTarget,
}, nil
}

// implements client.ObjectWriter for ready child objects.
type readyObjectPayloadWriter struct {
target transformer.ObjectTarget
target internal.Target
}

func (x *readyObjectPayloadWriter) Write(p []byte) (int, error) {
Expand Down
Loading
Loading