Skip to content

Commit

Permalink
WIP: node: Serve new object replication service
Browse files Browse the repository at this point in the history
NeoFS protocol has been recently extended with new object replication
RPC `ObjectService.Replicate` separated from the general-purpose
`ObjectService.Put` one. According to API of the new RPC, all physically
stored objects are transmitted in one message. Also, replication request
and response formats are much simpler than for other operations.

Serve new RPC by the storage node app.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Dec 14, 2023
1 parent 0e4fe58 commit 02b9fb9
Show file tree
Hide file tree
Showing 10 changed files with 1,639 additions and 10 deletions.
42 changes: 42 additions & 0 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"time"

Expand All @@ -14,12 +15,53 @@ import (
)

func initGRPC(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)
}

// limit max size of single messages received by the gRPC servers up to max
// object size setting of the NeoFS network: this is needed to serve
// ObjectService.Replicate RPC transmitting the entire stored object in one
// message
maxObjSize, err := c.maxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

maxRecvSize := maxObjSize
// don't forget about meta fields
const approxMaxMsgMetaSize = 10 << 10 // ^10K is definitely enough
if maxRecvSize < uint64(math.MaxUint64-approxMaxMsgMetaSize) { // just in case, always true in practice
maxRecvSize += approxMaxMsgMetaSize
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: "+
"object of max size is bigger than gRPC server is able to support %d > %d",
maxRecvSize, math.MaxInt))
}

maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
}

var successCount int
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}

if maxRecvMsgSizeOpt != nil {
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
}

tlsCfg := sc.TLS()

if tlsCfg != nil {
Expand Down
23 changes: 21 additions & 2 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
apiNetmap "github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -54,8 +55,12 @@ type objectSvc struct {
delete *deletesvcV2.Service
}

func (c *cfg) maxObjectSize() (uint64, error) {
return c.cfgNetmap.wrapper.MaxObjectSize()
}

func (c *cfg) MaxObjectSize() uint64 {
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
sz, err := c.maxObjectSize()
if err != nil {
c.log.Error("could not get max object size value",
zap.String("error", err.Error()),
Expand Down Expand Up @@ -362,7 +367,21 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc)
replNode := newReplicationNode(c.log, ls, c.PublicKey, func(id cid.ID) (apiNetmap.PlacementPolicy, error) {
cnr, err := c.cfgObject.cnrSource.Get(id)
if err != nil {
return apiNetmap.PlacementPolicy{}, nil
}
return cnr.Value.PlacementPolicy(), nil
}, c.cfgNetmap.state.CurrentEpoch, func(epoch uint64) (apiNetmap.NetMap, error) {
nm, err := c.netMapSource.GetNetMapByEpoch(epoch)
if err != nil {
return apiNetmap.NetMap{}, err
}
return *nm, nil
})

server := objectTransportGRPC.New(firstSvc, replNode)

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
199 changes: 199 additions & 0 deletions cmd/neofs-node/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package main

import (
"bytes"
"errors"
"fmt"
"strconv"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// interface of [engine.StorageEngine] useful for testing.
type replicationNodeLocalObjectStorage interface {
IsLocked(oid.Address) (bool, error)
Put(engine.PutPrm) (engine.PutRes, error)
}

// replicationNode is [objectTransportGRPC.Node] interface provider
// checking storage policy compliance against NeoFS network maps.
type replicationNode struct {
log *zap.Logger

localObjStorage replicationNodeLocalObjectStorage

getPubKey func() []byte
getContainerStoragePolicy func(cid.ID) (netmapsdk.PlacementPolicy, error)
getCurrentEpoch func() uint64
getNetmap func(epoch uint64) (netmapsdk.NetMap, error)
}

func newReplicationNode(
log *zap.Logger,
localObjStorage replicationNodeLocalObjectStorage,
getLocalNodePubKey func() []byte,
getContainerStoragePolicy func(cid.ID) (netmapsdk.PlacementPolicy, error),
getCurrentEpoch func() uint64,
getNetmap func(epoch uint64) (netmapsdk.NetMap, error),
) *replicationNode {
return &replicationNode{
log: log,
localObjStorage: localObjStorage,
getPubKey: getLocalNodePubKey,
getContainerStoragePolicy: getContainerStoragePolicy,
getCurrentEpoch: getCurrentEpoch,
getNetmap: getNetmap,
}
}

func (x *replicationNode) compliesStoragePolicyInPastNetmap(bPubKey []byte, cnrID cid.ID, policy netmapsdk.PlacementPolicy, epoch uint64) (bool, error) {
nm, err := x.getNetmap(epoch)
if err != nil {
return false, fmt.Errorf("read network map: %w", err)
}

inNetmap := false
nodes := nm.Nodes()

for i := range nodes {
if bytes.Equal(nodes[i].PublicKey(), bPubKey) {
inNetmap = true
break
}
}

if !inNetmap {
return false, nil
}

cnrVectors, err := nm.ContainerNodes(policy, cnrID)
if err != nil {
return false, fmt.Errorf("build list of container nodes from network map, storage policy and container ID: %w", err)
}

for i := range cnrVectors {
for j := range cnrVectors[i] {
if bytes.Equal(cnrVectors[i][j].PublicKey(), bPubKey) {
return true, nil
}
}
}

return false, nil
}

// CompliesContainerStoragePolicy checks whether local node's public key is
// presented in network map of the latest NeoFS epoch and matches storage policy
// of the referenced container.
func (x *replicationNode) CompliesContainerStoragePolicy(cnrID cid.ID) (bool, error) {
storagePolicy, err := x.getContainerStoragePolicy(cnrID)
if err != nil {
return false, fmt.Errorf("read container storage policy by container ID: %w", err)
}

ok, err := x.compliesStoragePolicyInPastNetmap(x.getPubKey(), cnrID, storagePolicy, x.getCurrentEpoch())
if err != nil {
return false, fmt.Errorf("check with the latest network map: %w", err)
}

return ok, nil
}

// ClientCompliesContainerStoragePolicy checks whether given public key belongs
// to any storage node present in network map of the latest or previous NeoFS
// epoch and matching storage policy of the referenced container.
func (x *replicationNode) ClientCompliesContainerStoragePolicy(bClientPubKey []byte, cnrID cid.ID) (bool, error) {
storagePolicy, err := x.getContainerStoragePolicy(cnrID)
if err != nil {
return false, fmt.Errorf("read container storage policy by container ID: %w", err)
}

curEpoch := x.getCurrentEpoch()

ok, err := x.compliesStoragePolicyInPastNetmap(bClientPubKey, cnrID, storagePolicy, curEpoch)
if err != nil {
return false, fmt.Errorf("check with the latest network map: %w", err)
}

if !ok && curEpoch > 0 {
ok, err = x.compliesStoragePolicyInPastNetmap(bClientPubKey, cnrID, storagePolicy, curEpoch-1)
if err != nil {
return false, fmt.Errorf("check with previous network map: %w", err)
}
}

return ok, nil
}

var errObjectExpired = errors.New("object is expired")

// StoreBinaryObject checks whether binary-encoded NeoFS object from the
// referenced container is not expired (*) and, if it is, saves the object in
// the underlying [engine.StorageEngine].
//
// (*) if the object is LOCKed locally or this property is unavailable at the
// moment, it is saved even if expired are strictly protected from the removal
// incl. due to expiration.
func (x *replicationNode) StoreBinaryObject(cnr cid.ID, bObj []byte) error {
var obj object.Object
// TODO(@cthulhu-rider): avoid decoding the object completely
err := obj.Unmarshal(bObj)
if err != nil {
return fmt.Errorf("decode object from binary: %w", err)
}

id, ok := obj.ID()
if !ok {
return errors.New("missing object ID")
}

attrs := obj.Attributes()
for i := range attrs {
if attrs[i].Key() != object.AttributeExpirationEpoch {
continue
}

expiresAfter, err := strconv.ParseUint(attrs[i].Value(), 10, 64)
if err != nil {
x.log.Debug("failed to decode object expiration attribute, save the object anyway",
zap.String("value", attrs[i].Value()), zap.Error(err))
break
}

if x.getCurrentEpoch() <= expiresAfter {
break
}

var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(id)

locked, err := x.localObjStorage.IsLocked(addr)
if err != nil {
x.log.Debug("failed to check whether object is LOCKed, save the object anyway",
zap.Stringer("address", addr), zap.Error(err))
break
}

if locked {
x.log.Debug("object is expired but LOCKed, save the object anyway")
break
}

return errObjectExpired
}

var prm engine.PutPrm
prm.WithObject(&obj)
_, err = x.localObjStorage.Put(prm)
if err != nil {
return fmt.Errorf("save object in the object storage engine: %w", err)
}

return nil
}
Loading

0 comments on commit 02b9fb9

Please sign in to comment.