Skip to content

Commit

Permalink
WIP: replicator: Send local objects using new replication service
Browse files Browse the repository at this point in the history
Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Dec 14, 2023
1 parent 02b9fb9 commit 4293260
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package client

import (
"context"
"io"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -19,6 +21,7 @@ import (
type Client interface {
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer, opts client.ReplicateObjectOptions) error
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
Expand Down
21 changes: 21 additions & 0 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand Down Expand Up @@ -220,6 +222,25 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object
return
}

func (x *multiClient) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer, opts client.ReplicateObjectOptions) error {
var errSeek error
err := x.iterateClients(ctx, func(c clientcore.Client) error {
err := c.ReplicateObject(ctx, src, signer, opts)
if err != nil {
_, errSeek = src.Seek(0, io.SeekStart)
if errSeek != nil {
return nil
}
}
return err
})
if err != nil {
return err
}

return errSeek
}

func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)
Expand Down
34 changes: 34 additions & 0 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package putsvc
import (
"context"
"fmt"
"io"

clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-sdk-go/client"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
)
Expand Down Expand Up @@ -135,3 +138,34 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {

return nil
}

// CopyObjectToNode copies binary-encoded NeoFS object from the given
// [io.ReadSeeker] into local storage of the node described by specified
// [netmap.NodeInfo].
func (s *RemoteSender) CopyObjectToNode(ctx context.Context, nodeInfo netmap.NodeInfo, src io.ReadSeeker) error {
var nodeInfoForCons clientcore.NodeInfo

err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo))
if err != nil {
return fmt.Errorf("parse remote node info: %w", err)
}

key, err := s.keyStorage.GetKey(nil)
if err != nil {
return fmt.Errorf("fetch local node's private key: %w", err)
}

c, err := s.clientConstructor.Get(nodeInfoForCons)
if err != nil {
return fmt.Errorf("init NeoFS API client of the remote node: %w", err)
}

var opts client.ReplicateObjectOptions

err = c.ReplicateObject(ctx, src, (*neofsecdsa.Signer)(key), opts)
if err != nil {
return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err)
}

return nil
}
34 changes: 28 additions & 6 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package replicator

import (
"context"
"io"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
Expand All @@ -25,20 +25,28 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
)
}()

var binObjStream io.ReadSeekCloser // set it task.obj is unset only
var err error

if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
binObjStream, err = p.localStorage.OpenObjectStream(task.addr)
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))

return
}

defer func() {
if err := binObjStream.Close(); err != nil {
p.log.Debug("failed to close replicated object's binary stream from the local storage",
zap.Stringer("object", task.addr), zap.Error(err))
}
}()
}

prm := new(putsvc.RemotePutPrm).
WithObject(task.obj)
var prm putsvc.RemotePutPrm

for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
select {
Expand All @@ -47,14 +55,28 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
default:
}

if i > 0 && binObjStream != nil {
_, err = binObjStream.Seek(0, io.SeekStart)
if err != nil {
p.log.Error("failed to seek start of the replicated object's binary stream from the local storage",
zap.Stringer("object", task.addr), zap.Error(err))
return
}
}

log := p.log.With(
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])),
zap.Stringer("object", task.addr),
)

callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
if binObjStream != nil {
err = p.remoteSender.CopyObjectToNode(ctx, task.nodes[i], binObjStream)
// FIXME: what if node is old?
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithObject(task.obj).WithNodeInfo(task.nodes[i]))
}

cancel()

Expand Down

0 comments on commit 4293260

Please sign in to comment.