Skip to content

Commit

Permalink
Make Slicer interface more generic (#466)
Browse files Browse the repository at this point in the history
closes #442
  • Loading branch information
roman-khimov authored Jul 29, 2023
2 parents d675d85 + d6858d9 commit af656fb
Show file tree
Hide file tree
Showing 10 changed files with 705 additions and 476 deletions.
5 changes: 3 additions & 2 deletions client/container_statistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,10 @@ func TestClientStatistic_ObjectPut(t *testing.T) {
writer, err := c.ObjectPutInit(ctx, hdr, signer, prm)
require.NoError(t, err)

require.True(t, writer.WritePayloadChunk(randBytes(10)))
_, err = writer.Write(randBytes(10))
require.NoError(t, err)

_, err = writer.Close()
err = writer.Close()
require.NoError(t, err)

require.Equal(t, 2, collector.methods[stat.MethodObjectPut].requests)
Expand Down
152 changes: 36 additions & 116 deletions client/object_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ import (
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
)
Expand Down Expand Up @@ -58,10 +56,15 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
}

// ObjectWriter is designed to write one object to NeoFS system.
type ObjectWriter interface {
io.WriteCloser
GetResult() ResObjectPut
}

// DefaultObjectWriter implements [ObjectWriter].
//
// Must be initialized using Client.ObjectPutInit, any other
// usage is unsafe.
type ObjectWriter struct {
// Must be initialized using [Client.ObjectPutInit], any other usage is unsafe.
type DefaultObjectWriter struct {
cancelCtxStream context.CancelFunc

client *Client
Expand Down Expand Up @@ -106,8 +109,8 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
}

// writeHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) writeHeader(hdr object.Object) error {
// Failure reason can be received via [DefaultObjectWriter.Close].
func (x *DefaultObjectWriter) writeHeader(hdr object.Object) error {
if x.statisticCallback != nil {
defer func() {
x.statisticCallback(x.err)
Expand All @@ -134,8 +137,8 @@ func (x *ObjectWriter) writeHeader(hdr object.Object) error {
}

// WritePayloadChunk writes chunk of the object payload. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
// Failure reason can be received via [DefaultObjectWriter.Close].
func (x *DefaultObjectWriter) Write(chunk []byte) (n int, err error) {
if x.statisticCallback != nil {
defer func() {
x.statisticCallback(x.err)
Expand All @@ -147,6 +150,8 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
x.req.GetBody().SetObjectPart(&x.partChunk)
}

var writtenBytes int

for ln := len(chunk); ln > 0; ln = len(chunk) {
// maxChunkLen restricts maximum byte length of the chunk
// transmitted in a single stream message. It depends on
Expand Down Expand Up @@ -174,22 +179,23 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
x.err = signServiceMessage(x.signer, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
return writtenBytes, x.err
}

x.err = x.stream.Write(&x.req)
if x.err != nil {
return false
return writtenBytes, x.err
}

writtenBytes += len(chunk[:ln])
chunk = chunk[ln:]
}

return true
return writtenBytes, nil
}

// Close ends writing the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectWriter.
// along with the final results. Must be called after using the [DefaultObjectWriter].
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
Expand All @@ -204,7 +210,7 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
// - [apistatus.ErrLockNonRegularObject]
// - [apistatus.ErrSessionTokenNotFound]
// - [apistatus.ErrSessionTokenExpired]
func (x *ObjectWriter) Close() (*ResObjectPut, error) {
func (x *DefaultObjectWriter) Close() error {
var err error
if x.statisticCallback != nil {
defer func() {
Expand All @@ -219,25 +225,25 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
// message. Server returns an error in response message (in status).
if x.err != nil && !errors.Is(x.err, io.EOF) {
err = x.err
return nil, err
return err
}

if x.err = x.stream.Close(); x.err != nil {
err = x.err
return nil, err
return err
}

if x.err = x.client.processResponse(&x.respV2); x.err != nil {
err = x.err
return nil, err
return err
}

const fieldID = "ID"

idV2 := x.respV2.GetBody().GetObjectID()
if idV2 == nil {
err = newErrMissingResponseField(fieldID)
return nil, err
return err
}

x.err = x.res.obj.ReadFromV2(*idV2)
Expand All @@ -246,27 +252,33 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
err = x.err
}

return &x.res, nil
return nil
}

// GetResult returns the put operation result.
func (x *DefaultObjectWriter) GetResult() ResObjectPut {
return x.res
}

// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.
//
// The call only opens the transmission channel, explicit recording is done using the ObjectWriter.
// The call only opens the transmission channel, explicit recording is done using the [ObjectWriter].
// Exactly one return value is non-nil. Resulting writer must be finally closed.
//
// Context is required and must not be nil. It is used for network communication.
// Context is required and must not be nil. It will be used for network communication for the whole object transmission,
// including put init (this method) and subsequent object payload writes via ObjectWriter.
//
// Signer is required and must not be nil. The operation is executed on behalf of
// the account corresponding to the specified Signer, which is taken into account, in particular, for access control.
//
// Returns errors:
// - [ErrMissingSigner]
func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm PrmObjectPutInit) (*ObjectWriter, error) {
func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm PrmObjectPutInit) (ObjectWriter, error) {
var err error
defer func() {
c.sendStatistic(stat.MethodObjectPut, err)()
}()
var w ObjectWriter
var w DefaultObjectWriter
w.statisticCallback = func(err error) {
c.sendStatistic(stat.MethodObjectPutStream, err)()
}
Expand All @@ -292,102 +304,10 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer us
c.prepareRequest(&w.req, &prm.meta)

if err = w.writeHeader(hdr); err != nil {
_, _ = w.Close()
_ = w.Close()
err = fmt.Errorf("header write: %w", err)
return nil, err
}

return &w, nil
}

type objectWriter struct {
context context.Context
client *Client
}

func (x *objectWriter) InitDataStream(header object.Object, signer user.Signer) (io.Writer, error) {
var prm PrmObjectPutInit

stream, err := x.client.ObjectPutInit(x.context, header, signer, prm)
if err != nil {
return nil, fmt.Errorf("init object stream: %w", err)
}

return &payloadWriter{
stream: stream,
}, nil
}

type payloadWriter struct {
stream *ObjectWriter
}

func (x *payloadWriter) Write(p []byte) (int, error) {
if !x.stream.WritePayloadChunk(p) {
return 0, x.Close()
}

return len(p), nil
}

func (x *payloadWriter) Close() error {
_, err := x.stream.Close()
if err != nil {
return err
}

return nil
}

// CreateObject creates new NeoFS object with given payload data and stores it
// in specified container of the NeoFS network using provided Client connection.
// The object is created on behalf of provided neofscrypto.Signer, and owned by
// the specified user.ID.
//
// In terms of NeoFS, parameterized neofscrypto.Signer represents object owner,
// object signer and request sender. Container SHOULD be public-write or sender
// SHOULD have corresponding rights.
//
// Client connection MUST be opened in advance, see Dial method for details.
// Network communication is carried out within a given context, so it MUST NOT
// be nil.
//
// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the
// future. Be ready to refactor your code regarding imports and call mechanics,
// in essence the operation will not change.
func CreateObject(ctx context.Context, cli *Client, signer user.Signer, cnr cid.ID, owner user.ID, data io.Reader, attributes ...string) (oid.ID, error) {
s, err := NewDataSlicer(ctx, cli, signer, cnr, owner)
if err != nil {
return oid.ID{}, err
}

return s.Slice(data, attributes...)
}

// NewDataSlicer creates slicer.Slicer that saves data in the NeoFS network
// through provided Client. The data is packaged into NeoFS objects stored in
// the specified container. Provided signer is being used to sign the resulting
// objects as a system requirement. Produced objects are owned by the
// parameterized NeoFS user.
//
// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the
// future. Be ready to refactor your code regarding imports and call mechanics,
// in essence the operation will not change.
func NewDataSlicer(ctx context.Context, cli *Client, signer user.Signer, cnr cid.ID, owner user.ID) (*slicer.Slicer, error) {
netInfo, err := cli.NetworkInfo(ctx, PrmNetworkInfo{})
if err != nil {
return nil, fmt.Errorf("read current network info: %w", err)
}

var opts slicer.Options
opts.SetObjectPayloadLimit(netInfo.MaxObjectSize())
opts.SetCurrentNeoFSEpoch(netInfo.CurrentEpoch())
if !netInfo.HomomorphicHashingDisabled() {
opts.CalculateHomomorphicChecksum()
}

return slicer.New(signer, cnr, owner, &objectWriter{
context: ctx,
client: cli,
}, opts), nil
}
17 changes: 17 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (o *Object) SetID(v oid.ID) {
SetObjectID(&v2)
}

// ResetID removes object identifier.
//
// See also [Object.SetID].
func (o *Object) ResetID() {
(*object.Object)(o).
SetObjectID(nil)
}

// Signature returns signature of the object identifier.
//
// See also [Object.SetSignature].
Expand Down Expand Up @@ -536,6 +544,15 @@ func (o *Object) SetParentID(v oid.ID) {
})
}

// ResetParentID removes identifier of the parent object.
//
// See also [Object.SetParentID].
func (o *Object) ResetParentID() {
o.setSplitFields(func(split *object.SplitHeader) {
split.SetParent(nil)
})
}

// Parent returns parent object w/o payload.
//
// See also [Object.SetParent].
Expand Down
31 changes: 31 additions & 0 deletions object/slicer/options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package slicer

import (
"github.com/nspcc-dev/neofs-sdk-go/session"
)

// Options groups Slicer options.
type Options struct {
objectPayloadLimit uint64

currentNeoFSEpoch uint64

withHomoChecksum bool

sessionToken *session.Object
}

// SetObjectPayloadLimit specifies data size limit for produced physically
Expand All @@ -25,3 +31,28 @@ func (x *Options) SetCurrentNeoFSEpoch(e uint64) {
func (x *Options) CalculateHomomorphicChecksum() {
x.withHomoChecksum = true
}

// SetSession sets session object.
func (x *Options) SetSession(sess *session.Object) {
x.sessionToken = sess
}

// ObjectPayloadLimit returns required max object size.
func (x *Options) ObjectPayloadLimit() uint64 {
return x.objectPayloadLimit
}

// CurrentNeoFSEpoch returns epoch.
func (x *Options) CurrentNeoFSEpoch() uint64 {
return x.currentNeoFSEpoch
}

// IsHomomorphicChecksumEnabled indicates homomorphic checksum calculation status.
func (x *Options) IsHomomorphicChecksumEnabled() bool {
return x.withHomoChecksum
}

// Session returns session object.
func (x *Options) Session() *session.Object {
return x.sessionToken
}
Loading

0 comments on commit af656fb

Please sign in to comment.