Skip to content

Commit

Permalink
neofs: Use slices like an option
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Sep 25, 2023
1 parent 86038b8 commit f3367c7
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 22 deletions.
18 changes: 13 additions & 5 deletions cmd/s3-authmate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
containerPolicies string
awcCliCredFile string
timeoutFlag time.Duration
slicerEnabledFlag bool

// pool timeouts flag.
poolDialTimeoutFlag time.Duration
Expand Down Expand Up @@ -293,6 +294,11 @@ It will be ceil rounded to the nearest amount of epoch.`,
Destination: &poolStreamTimeoutFlag,
Value: poolStreamTimeout,
},
&cli.BoolFlag{
Name: "with-slicer",
Usage: "Enable slicer for object uploading",
Destination: &slicerEnabledFlag,
},
},
Action: func(c *cli.Context) error {
ctx, log := prepare()
Expand Down Expand Up @@ -322,7 +328,7 @@ It will be ceil rounded to the nearest amount of epoch.`,
}
anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey)

neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner)
neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner, slicerEnabledFlag)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2)
}
Expand Down Expand Up @@ -663,7 +669,7 @@ func obtainSecret() *cli.Command {
}
anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey)

neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner)
neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner, slicerEnabledFlag)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2)
}
Expand Down Expand Up @@ -699,7 +705,7 @@ func obtainSecret() *cli.Command {
return command
}

func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigner user.Signer) (authmate.NeoFS, error) {
func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigner user.Signer, isSlicerEnabled bool) (authmate.NeoFS, error) {
log.Debug("prepare connection pool")

signer := user.NewAutoIDSignerRFC6979(*cfg.Key)
Expand Down Expand Up @@ -727,10 +733,12 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne
}

neofsCfg := neofs.Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: isSlicerEnabled,
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
}

neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg)
neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg, ni)

return neofs.NewAuthmateNeoFS(neoFS), nil
}
19 changes: 17 additions & 2 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,25 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
}

neofsCfg := neofs.Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: v.GetBool(cfgSlicerEnabled),
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
}

neoFS := neofs.NewNeoFS(conns, signer, anonSigner, neofsCfg)
// If slicer is disabled, we should use "static" getter, which doesn't make periodic requests to the NeoFS.
var epochGetter neofs.EpochGetter = ni

if neofsCfg.IsSlicerEnabled {
epochUpdateInterval := v.GetDuration(cfgEpochUpdateInterval)

if epochUpdateInterval == 0 {
epochUpdateInterval = defaultEpochUpdateInterval
}

epochGetter = neofs.NewPeriodicGetter(ctx, ni.CurrentEpoch(), epochUpdateInterval, conns, log.logger)
}

neoFS := neofs.NewNeoFS(conns, signer, anonSigner, neofsCfg, epochGetter)

// prepare auth center
ctr := auth.New(neofs.NewAuthmateNeoFS(neoFS), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger))
Expand Down
17 changes: 12 additions & 5 deletions cmd/s3-gw/app_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
)

const (
defaultRebalanceInterval = 60 * time.Second
defaultHealthcheckTimeout = 15 * time.Second
defaultConnectTimeout = 10 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultShutdownTimeout = 15 * time.Second
defaultRebalanceInterval = 60 * time.Second
defaultHealthcheckTimeout = 15 * time.Second
defaultConnectTimeout = 10 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultShutdownTimeout = 15 * time.Second
defaultEpochUpdateInterval = time.Minute

defaultPoolErrorThreshold uint32 = 100

Expand Down Expand Up @@ -121,11 +122,17 @@ const ( // Settings.
// Number of the object copies to consider PUT to NeoFS successful.
cfgSetCopiesNumber = "neofs.set_copies_number"

// Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
cfgEpochUpdateInterval = "neofs.epoch_update_interval"

// List of allowed AccessKeyID prefixes.
cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes"

// envPrefix is an environment variables prefix used for configuration.
envPrefix = "S3_GW"

// Shows if slicer is enabled. If enabled slicer will be used for object put.
cfgSlicerEnabled = "slicer.enabled"
)

var ignore = map[string]struct{}{
Expand Down
3 changes: 3 additions & 0 deletions config/config.env
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,6 @@ S3_GW_NEOFS_SET_COPIES_NUMBER=0
# List of allowed AccessKeyID prefixes
# If not set, S3 GW will accept all AccessKeyIDs
S3_GW_ALLOWED_ACCESS_KEY_ID_PREFIXES=Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn

# Allows to use slicer for Object uploading.
S3_GW_SLICER_ENABLED=false
6 changes: 6 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ neofs:
# Number of the object copies to consider PUT to NeoFS successful.
# `0` means that object will be processed according to the container's placement policy
set_copies_number: 0
# Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
epoch_update_interval: 60s

# List of allowed AccessKeyID prefixes
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
allowed_access_key_id_prefixes:
- Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX
- 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn

# Allows to use slicer for Object uploading.
slicer:
enabled: false
71 changes: 71 additions & 0 deletions internal/neofs/epoch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package neofs

import (
"context"
"sync/atomic"
"time"

"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)

type (
// NetworkInfoGetter represents provider to get actual [netmap.NetworkInfo].
NetworkInfoGetter interface {
NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error)
}

// EpochGetter represents provider to get actual NeoFS epoch.
EpochGetter interface {
CurrentEpoch() uint64
}

// PeriodicGetter implements [EpochGetter].
PeriodicGetter struct {
logger *zap.Logger
netGetter NetworkInfoGetter
epoch atomic.Uint64
timeOut time.Duration
}
)

// NewPeriodicGetter is a constructor to [PeriodicGetter].
func NewPeriodicGetter(ctx context.Context, initialEpoch uint64, timeOut time.Duration, netGetter NetworkInfoGetter, logger *zap.Logger) *PeriodicGetter {
getter := &PeriodicGetter{
timeOut: timeOut,
netGetter: netGetter,
logger: logger,
}

getter.epoch.Store(initialEpoch)

go getter.update(ctx)

return getter

Check warning on line 45 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L34-L45

Added lines #L34 - L45 were not covered by tests
}

// CurrentEpoch returns actual epoch.
//
// CurrentEpoch implements [EpochGetter].
func (g *PeriodicGetter) CurrentEpoch() uint64 {
return g.epoch.Load()

Check warning on line 52 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (g *PeriodicGetter) update(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(g.timeOut):
ni, err := g.netGetter.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
g.logger.Error("periodicGetter: networkInfo", zap.Error(err))
continue

Check warning on line 64 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L55-L64

Added lines #L55 - L64 were not covered by tests
}

g.logger.Info("periodicGetter", zap.Uint64("epoch", ni.CurrentEpoch()))
g.epoch.Store(ni.CurrentEpoch())

Check warning on line 68 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}
}
47 changes: 37 additions & 10 deletions internal/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
Expand All @@ -31,26 +32,30 @@ import (

// Config allows to configure some [NeoFS] parameters.
type Config struct {
MaxObjectSize int64
MaxObjectSize int64
IsSlicerEnabled bool
IsHomomorphicEnabled bool
}

// NeoFS represents virtual connection to the NeoFS network.
// It is used to provide an interface to dependent packages
// which work with NeoFS.
type NeoFS struct {
pool *pool.Pool
gateSigner user.Signer
anonSigner user.Signer
cfg Config
pool *pool.Pool
gateSigner user.Signer
anonSigner user.Signer
cfg Config
epochGetter EpochGetter
}

// NewNeoFS creates new NeoFS using provided pool.Pool.
func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config) *NeoFS {
func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS {

Check warning on line 52 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L52

Added line #L52 was not covered by tests
return &NeoFS{
pool: p,
gateSigner: signer,
anonSigner: anonSigner,
cfg: cfg,
pool: p,
gateSigner: signer,
anonSigner: anonSigner,
cfg: cfg,
epochGetter: epochGetter,

Check warning on line 58 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L54-L58

Added lines #L54 - L58 were not covered by tests
}
}

Expand Down Expand Up @@ -273,6 +278,28 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
prm.Payload = bytes.NewReader(obj.Payload())
}

if x.cfg.IsSlicerEnabled {
opts := slicer.Options{}
opts.SetObjectPayloadLimit(uint64(x.cfg.MaxObjectSize))
opts.SetCopiesNumber(prm.CopiesNumber)
opts.SetCurrentNeoFSEpoch(x.epochGetter.CurrentEpoch())

if x.cfg.IsHomomorphicEnabled {
opts.CalculateHomomorphicChecksum()
}

Check warning on line 289 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L281-L289

Added lines #L281 - L289 were not covered by tests

if prm.BearerToken != nil {
opts.SetBearerToken(*prm.BearerToken)
}

Check warning on line 293 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L291-L293

Added lines #L291 - L293 were not covered by tests

objID, err := slicer.Put(ctx, x.pool, obj, x.signer(ctx), prm.Payload, opts)
if err != nil {
return oid.ID{}, fmt.Errorf("slicer put: %w", err)
}

Check warning on line 298 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L295-L298

Added lines #L295 - L298 were not covered by tests

return objID, nil

Check warning on line 300 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L300

Added line #L300 was not covered by tests
}

var prmObjPutInit client.PrmObjectPutInit
prmObjPutInit.SetCopiesNumber(prm.CopiesNumber)

Expand Down

0 comments on commit f3367c7

Please sign in to comment.