Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add slicer as an option #831

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cmd/s3-authmate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
containerPolicies string
awcCliCredFile string
timeoutFlag time.Duration
slicerEnabledFlag bool

// pool timeouts flag.
poolDialTimeoutFlag time.Duration
Expand Down Expand Up @@ -293,6 +294,11 @@ It will be ceil rounded to the nearest amount of epoch.`,
Destination: &poolStreamTimeoutFlag,
Value: poolStreamTimeout,
},
&cli.BoolFlag{
Name: "internal-slicer",
Usage: "Enable slicer for object uploading",
Destination: &slicerEnabledFlag,
},
},
Action: func(c *cli.Context) error {
ctx, log := prepare()
Expand Down Expand Up @@ -322,7 +328,7 @@ It will be ceil rounded to the nearest amount of epoch.`,
}
anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey)

neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner)
neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner, slicerEnabledFlag)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2)
}
Expand Down Expand Up @@ -663,7 +669,7 @@ func obtainSecret() *cli.Command {
}
anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey)

neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner)
neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner, slicerEnabledFlag)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2)
}
Expand Down Expand Up @@ -699,7 +705,7 @@ func obtainSecret() *cli.Command {
return command
}

func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigner user.Signer) (authmate.NeoFS, error) {
func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigner user.Signer, isSlicerEnabled bool) (authmate.NeoFS, error) {
log.Debug("prepare connection pool")

signer := user.NewAutoIDSignerRFC6979(*cfg.Key)
Expand All @@ -726,7 +732,13 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne
return nil, fmt.Errorf("networkInfo: %w", err)
}

neoFS := neofs.NewNeoFS(p, signer, anonSigner, int64(ni.MaxObjectSize()))
neofsCfg := neofs.Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: isSlicerEnabled,
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
}

neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg, ni)

return neofs.NewAuthmateNeoFS(neoFS), nil
}
42 changes: 27 additions & 15 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,26 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
log.logger.Fatal("newApp: networkInfo", zap.Error(err))
}

neoFS := neofs.NewNeoFS(conns, signer, anonSigner, int64(ni.MaxObjectSize()))
neofsCfg := neofs.Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: v.GetBool(cfgSlicerEnabled),
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
}

// If slicer is disabled, we should use "static" getter, which doesn't make periodic requests to the NeoFS.
var epochGetter neofs.EpochGetter = ni

if neofsCfg.IsSlicerEnabled {
epochUpdateInterval := v.GetDuration(cfgEpochUpdateInterval)

if epochUpdateInterval == 0 {
epochUpdateInterval = time.Duration(int64(ni.EpochDuration())/2*ni.MsPerBlock()) * time.Millisecond
}

epochGetter = neofs.NewPeriodicGetter(ctx, ni.CurrentEpoch(), epochUpdateInterval, conns, log.logger)
}

neoFS := neofs.NewNeoFS(conns, signer, anonSigner, neofsCfg, epochGetter)

// prepare auth center
ctr := auth.New(neofs.NewAuthmateNeoFS(neoFS), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger))
Expand All @@ -126,18 +145,18 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
settings: newAppSettings(log, v),
}

app.init(ctx, anonSigner)
app.init(ctx, anonSigner, neoFS)

return app
}

func (a *App) init(ctx context.Context, anonSigner user.Signer) {
a.initAPI(ctx, anonSigner)
func (a *App) init(ctx context.Context, anonSigner user.Signer, neoFS *neofs.NeoFS) {
a.initAPI(ctx, anonSigner, neoFS)
a.initMetrics()
a.initServers(ctx)
}

func (a *App) initLayer(ctx context.Context, anonSigner user.Signer) {
func (a *App) initLayer(ctx context.Context, anonSigner user.Signer, neoFS *neofs.NeoFS) {
a.initResolver(ctx)

treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
Expand All @@ -155,15 +174,8 @@ func (a *App) initLayer(ctx context.Context, anonSigner user.Signer) {
TreeService: treeService,
}

signer := user.NewAutoIDSignerRFC6979(a.gateKey.PrivateKey)

ni, err := a.pool.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
a.log.Fatal("initLayer: networkInfo", zap.Error(err))
}

// prepare object layer
a.obj = layer.NewLayer(a.log, neofs.NewNeoFS(a.pool, signer, anonSigner, int64(ni.MaxObjectSize())), layerCfg)
a.obj = layer.NewLayer(a.log, neoFS, layerCfg)

if a.cfg.GetBool(cfgEnableNATS) {
nopts := getNotificationsOptions(a.cfg, a.log)
Expand Down Expand Up @@ -199,8 +211,8 @@ func getDefaultPolicyValue(v *viper.Viper) string {
return defaultPolicyStr
}

func (a *App) initAPI(ctx context.Context, anonSigner user.Signer) {
a.initLayer(ctx, anonSigner)
func (a *App) initAPI(ctx context.Context, anonSigner user.Signer, neoFS *neofs.NeoFS) {
a.initLayer(ctx, anonSigner, neoFS)
a.initHandler()
}

Expand Down
6 changes: 6 additions & 0 deletions cmd/s3-gw/app_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,17 @@ const ( // Settings.
// Number of the object copies to consider PUT to NeoFS successful.
cfgSetCopiesNumber = "neofs.set_copies_number"

// Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
cfgEpochUpdateInterval = "neofs.epoch_update_interval"

// List of allowed AccessKeyID prefixes.
cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes"

// envPrefix is an environment variables prefix used for configuration.
envPrefix = "S3_GW"

// Shows if slicer is enabled. If enabled slicer will be used for object put.
cfgSlicerEnabled = "internal_slicer"
)

var ignore = map[string]struct{}{
Expand Down
6 changes: 6 additions & 0 deletions config/config.env
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ S3_GW_CORS_DEFAULT_MAX_AGE=600
# If not set, default value 0 will be used -- it means that object will be processed according to the container's placement policy
S3_GW_NEOFS_SET_COPIES_NUMBER=0

# Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
S3_GW_NEOFS_EPOCH_UPDATE_INTERVAL=2m

# List of allowed AccessKeyID prefixes
# If not set, S3 GW will accept all AccessKeyIDs
S3_GW_ALLOWED_ACCESS_KEY_ID_PREFIXES=Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn

# Allows to use slicer for Object uploading.
S3_GW_INTERNAL_SLICER=false
5 changes: 5 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,14 @@ neofs:
# Number of the object copies to consider PUT to NeoFS successful.
# `0` means that object will be processed according to the container's placement policy
set_copies_number: 0
# Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
epoch_update_interval: 2m

# List of allowed AccessKeyID prefixes
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
allowed_access_key_id_prefixes:
- Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX
- 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn

# Allows to use slicer for Object uploading.
internal_slicer: false
74 changes: 74 additions & 0 deletions internal/neofs/epoch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package neofs

import (
"context"
"sync/atomic"
"time"

"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)

type (
// NetworkInfoGetter represents provider to get actual [netmap.NetworkInfo].
NetworkInfoGetter interface {
NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error)
}

// EpochGetter represents provider to get actual NeoFS epoch.
EpochGetter interface {
CurrentEpoch() uint64
}

// PeriodicGetter implements [EpochGetter].
PeriodicGetter struct {
logger *zap.Logger
netGetter NetworkInfoGetter
epoch atomic.Uint64
timeOut time.Duration
}
)

// NewPeriodicGetter is a constructor to [PeriodicGetter].
func NewPeriodicGetter(ctx context.Context, initialEpoch uint64, timeOut time.Duration, netGetter NetworkInfoGetter, logger *zap.Logger) *PeriodicGetter {
getter := &PeriodicGetter{
timeOut: timeOut,
netGetter: netGetter,
logger: logger,
}

getter.epoch.Store(initialEpoch)

go getter.update(ctx)

return getter

Check warning on line 45 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L34-L45

Added lines #L34 - L45 were not covered by tests
}

// CurrentEpoch returns actual epoch.
//
// CurrentEpoch implements [EpochGetter].
func (g *PeriodicGetter) CurrentEpoch() uint64 {
return g.epoch.Load()

Check warning on line 52 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (g *PeriodicGetter) update(ctx context.Context) {
tm := time.NewTicker(g.timeOut)

for {
select {
case <-ctx.Done():
tm.Stop()
return
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
case <-tm.C:
ni, err := g.netGetter.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
g.logger.Error("periodicGetter: networkInfo", zap.Error(err))
continue

Check warning on line 67 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L55-L67

Added lines #L55 - L67 were not covered by tests
}

g.logger.Info("periodicGetter", zap.Uint64("epoch", ni.CurrentEpoch()))
g.epoch.Store(ni.CurrentEpoch())

Check warning on line 71 in internal/neofs/epoch.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/epoch.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}
}
}
52 changes: 42 additions & 10 deletions internal/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,40 @@
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/waiter"
)

// Config allows to configure some [NeoFS] parameters.
type Config struct {
MaxObjectSize int64
IsSlicerEnabled bool
IsHomomorphicEnabled bool
}

// NeoFS represents virtual connection to the NeoFS network.
// It is used to provide an interface to dependent packages
// which work with NeoFS.
type NeoFS struct {
pool *pool.Pool
gateSigner user.Signer
anonSigner user.Signer
maxObjectSize int64
pool *pool.Pool
gateSigner user.Signer
anonSigner user.Signer
cfg Config
epochGetter EpochGetter
}

// NewNeoFS creates new NeoFS using provided pool.Pool.
func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, maxObjectSize int64) *NeoFS {
func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS {

Check warning on line 52 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L52

Added line #L52 was not covered by tests
return &NeoFS{
pool: p,
gateSigner: signer,
anonSigner: anonSigner,
maxObjectSize: maxObjectSize,
pool: p,
gateSigner: signer,
anonSigner: anonSigner,
cfg: cfg,
epochGetter: epochGetter,

Check warning on line 58 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L54-L58

Added lines #L54 - L58 were not covered by tests
}
}

Expand Down Expand Up @@ -268,6 +278,28 @@
prm.Payload = bytes.NewReader(obj.Payload())
}

if x.cfg.IsSlicerEnabled {
opts := slicer.Options{}
opts.SetObjectPayloadLimit(uint64(x.cfg.MaxObjectSize))
opts.SetCopiesNumber(prm.CopiesNumber)
opts.SetCurrentNeoFSEpoch(x.epochGetter.CurrentEpoch())

if x.cfg.IsHomomorphicEnabled {
opts.CalculateHomomorphicChecksum()
}

Check warning on line 289 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L281-L289

Added lines #L281 - L289 were not covered by tests

if prm.BearerToken != nil {
opts.SetBearerToken(*prm.BearerToken)
}

Check warning on line 293 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L291-L293

Added lines #L291 - L293 were not covered by tests

objID, err := slicer.Put(ctx, x.pool, obj, x.signer(ctx), prm.Payload, opts)
if err != nil {
return oid.ID{}, fmt.Errorf("slicer put: %w", err)
}

Check warning on line 298 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L295-L298

Added lines #L295 - L298 were not covered by tests

return objID, nil

Check warning on line 300 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L300

Added line #L300 was not covered by tests
}

var prmObjPutInit client.PrmObjectPutInit
prmObjPutInit.SetCopiesNumber(prm.CopiesNumber)

Expand All @@ -284,7 +316,7 @@
return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err)
}

chunk := make([]byte, x.maxObjectSize)
chunk := make([]byte, x.cfg.MaxObjectSize)

Check warning on line 319 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L319

Added line #L319 was not covered by tests
_, err = io.CopyBuffer(writer, prm.Payload, chunk)
if err != nil {
return oid.ID{}, fmt.Errorf("read payload chunk: %w", err)
Expand Down
Loading