From 02a206bd50f1d641ea9b2040c68bf06c373b285f Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Sep 2023 13:05:07 +0400 Subject: [PATCH 1/3] neofs: Use config struct instead of separate parameters Signed-off-by: Evgenii Baidakov --- cmd/s3-authmate/main.go | 6 +++++- cmd/s3-gw/app.go | 12 ++++++++++-- internal/neofs/neofs.go | 25 +++++++++++++++---------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/cmd/s3-authmate/main.go b/cmd/s3-authmate/main.go index 0297374a..0b1c1882 100644 --- a/cmd/s3-authmate/main.go +++ b/cmd/s3-authmate/main.go @@ -726,7 +726,11 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne return nil, fmt.Errorf("networkInfo: %w", err) } - neoFS := neofs.NewNeoFS(p, signer, anonSigner, int64(ni.MaxObjectSize())) + neofsCfg := neofs.Config{ + MaxObjectSize: int64(ni.MaxObjectSize()), + } + + neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg) return neofs.NewAuthmateNeoFS(neoFS), nil } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index fb8ad58c..d3c6d170 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -106,7 +106,11 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { log.logger.Fatal("newApp: networkInfo", zap.Error(err)) } - neoFS := neofs.NewNeoFS(conns, signer, anonSigner, int64(ni.MaxObjectSize())) + neofsCfg := neofs.Config{ + MaxObjectSize: int64(ni.MaxObjectSize()), + } + + neoFS := neofs.NewNeoFS(conns, signer, anonSigner, neofsCfg) // prepare auth center ctr := auth.New(neofs.NewAuthmateNeoFS(neoFS), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger)) @@ -162,8 +166,12 @@ func (a *App) initLayer(ctx context.Context, anonSigner user.Signer) { a.log.Fatal("initLayer: networkInfo", zap.Error(err)) } + neofsCfg := neofs.Config{ + MaxObjectSize: int64(ni.MaxObjectSize()), + } + // prepare object layer - a.obj = layer.NewLayer(a.log, neofs.NewNeoFS(a.pool, signer, anonSigner, int64(ni.MaxObjectSize())), layerCfg) + a.obj = layer.NewLayer(a.log, neofs.NewNeoFS(a.pool, signer, anonSigner, neofsCfg), layerCfg) if a.cfg.GetBool(cfgEnableNATS) { nopts := getNotificationsOptions(a.cfg, a.log) diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index bcc0e730..c41746d5 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -29,23 +29,28 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/waiter" ) +// Config allows to configure some [NeoFS] parameters. +type Config struct { + MaxObjectSize int64 +} + // NeoFS represents virtual connection to the NeoFS network. // It is used to provide an interface to dependent packages // which work with NeoFS. type NeoFS struct { - pool *pool.Pool - gateSigner user.Signer - anonSigner user.Signer - maxObjectSize int64 + pool *pool.Pool + gateSigner user.Signer + anonSigner user.Signer + cfg Config } // NewNeoFS creates new NeoFS using provided pool.Pool. -func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, maxObjectSize int64) *NeoFS { +func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config) *NeoFS { return &NeoFS{ - pool: p, - gateSigner: signer, - anonSigner: anonSigner, - maxObjectSize: maxObjectSize, + pool: p, + gateSigner: signer, + anonSigner: anonSigner, + cfg: cfg, } } @@ -284,7 +289,7 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err) } - chunk := make([]byte, x.maxObjectSize) + chunk := make([]byte, x.cfg.MaxObjectSize) _, err = io.CopyBuffer(writer, prm.Payload, chunk) if err != nil { return oid.ID{}, fmt.Errorf("read payload chunk: %w", err) From 86038b8835a74359786461c4ca653c26e1c52c83 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Mon, 25 Sep 2023 08:29:11 +0400 Subject: [PATCH 2/3] app: Reuse neofs component Signed-off-by: Evgenii Baidakov --- cmd/s3-gw/app.go | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index d3c6d170..6074f154 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -130,18 +130,18 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { settings: newAppSettings(log, v), } - app.init(ctx, anonSigner) + app.init(ctx, anonSigner, neoFS) return app } -func (a *App) init(ctx context.Context, anonSigner user.Signer) { - a.initAPI(ctx, anonSigner) +func (a *App) init(ctx context.Context, anonSigner user.Signer, neoFS *neofs.NeoFS) { + a.initAPI(ctx, anonSigner, neoFS) a.initMetrics() a.initServers(ctx) } -func (a *App) initLayer(ctx context.Context, anonSigner user.Signer) { +func (a *App) initLayer(ctx context.Context, anonSigner user.Signer, neoFS *neofs.NeoFS) { a.initResolver(ctx) treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint) @@ -159,19 +159,8 @@ func (a *App) initLayer(ctx context.Context, anonSigner user.Signer) { TreeService: treeService, } - signer := user.NewAutoIDSignerRFC6979(a.gateKey.PrivateKey) - - ni, err := a.pool.NetworkInfo(ctx, client.PrmNetworkInfo{}) - if err != nil { - a.log.Fatal("initLayer: networkInfo", zap.Error(err)) - } - - neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), - } - // prepare object layer - a.obj = layer.NewLayer(a.log, neofs.NewNeoFS(a.pool, signer, anonSigner, neofsCfg), layerCfg) + a.obj = layer.NewLayer(a.log, neoFS, layerCfg) if a.cfg.GetBool(cfgEnableNATS) { nopts := getNotificationsOptions(a.cfg, a.log) @@ -207,8 +196,8 @@ func getDefaultPolicyValue(v *viper.Viper) string { return defaultPolicyStr } -func (a *App) initAPI(ctx context.Context, anonSigner user.Signer) { - a.initLayer(ctx, anonSigner) +func (a *App) initAPI(ctx context.Context, anonSigner user.Signer, neoFS *neofs.NeoFS) { + a.initLayer(ctx, anonSigner, neoFS) a.initHandler() } From 39412e870edefb5bd27f8d764417d572f80a10c8 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Sep 2023 15:06:22 +0400 Subject: [PATCH 3/3] neofs: Add slicer as an option closes #808 Signed-off-by: Evgenii Baidakov --- cmd/s3-authmate/main.go | 18 +++++++--- cmd/s3-gw/app.go | 19 ++++++++-- cmd/s3-gw/app_settings.go | 6 ++++ config/config.env | 6 ++++ config/config.yaml | 5 +++ internal/neofs/epoch.go | 74 +++++++++++++++++++++++++++++++++++++++ internal/neofs/neofs.go | 47 +++++++++++++++++++------ 7 files changed, 158 insertions(+), 17 deletions(-) create mode 100644 internal/neofs/epoch.go diff --git a/cmd/s3-authmate/main.go b/cmd/s3-authmate/main.go index 0b1c1882..de960212 100644 --- a/cmd/s3-authmate/main.go +++ b/cmd/s3-authmate/main.go @@ -79,6 +79,7 @@ var ( containerPolicies string awcCliCredFile string timeoutFlag time.Duration + slicerEnabledFlag bool // pool timeouts flag. poolDialTimeoutFlag time.Duration @@ -293,6 +294,11 @@ It will be ceil rounded to the nearest amount of epoch.`, Destination: &poolStreamTimeoutFlag, Value: poolStreamTimeout, }, + &cli.BoolFlag{ + Name: "internal-slicer", + Usage: "Enable slicer for object uploading", + Destination: &slicerEnabledFlag, + }, }, Action: func(c *cli.Context) error { ctx, log := prepare() @@ -322,7 +328,7 @@ It will be ceil rounded to the nearest amount of epoch.`, } anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey) - neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner) + neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner, slicerEnabledFlag) if err != nil { return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2) } @@ -663,7 +669,7 @@ func obtainSecret() *cli.Command { } anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey) - neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner) + neoFS, err := createNeoFS(ctx, log, poolCfg, anonSigner, slicerEnabledFlag) if err != nil { return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2) } @@ -699,7 +705,7 @@ func obtainSecret() *cli.Command { return command } -func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigner user.Signer) (authmate.NeoFS, error) { +func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigner user.Signer, isSlicerEnabled bool) (authmate.NeoFS, error) { log.Debug("prepare connection pool") signer := user.NewAutoIDSignerRFC6979(*cfg.Key) @@ -727,10 +733,12 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne } neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: isSlicerEnabled, + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), } - neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg) + neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg, ni) return neofs.NewAuthmateNeoFS(neoFS), nil } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 6074f154..c86a24b3 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -107,10 +107,25 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { } neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: v.GetBool(cfgSlicerEnabled), + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), } - neoFS := neofs.NewNeoFS(conns, signer, anonSigner, neofsCfg) + // If slicer is disabled, we should use "static" getter, which doesn't make periodic requests to the NeoFS. + var epochGetter neofs.EpochGetter = ni + + if neofsCfg.IsSlicerEnabled { + epochUpdateInterval := v.GetDuration(cfgEpochUpdateInterval) + + if epochUpdateInterval == 0 { + epochUpdateInterval = time.Duration(int64(ni.EpochDuration())/2*ni.MsPerBlock()) * time.Millisecond + } + + epochGetter = neofs.NewPeriodicGetter(ctx, ni.CurrentEpoch(), epochUpdateInterval, conns, log.logger) + } + + neoFS := neofs.NewNeoFS(conns, signer, anonSigner, neofsCfg, epochGetter) // prepare auth center ctr := auth.New(neofs.NewAuthmateNeoFS(neoFS), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger)) diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 354336b2..53a2dd9f 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -121,11 +121,17 @@ const ( // Settings. // Number of the object copies to consider PUT to NeoFS successful. cfgSetCopiesNumber = "neofs.set_copies_number" + // Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. + cfgEpochUpdateInterval = "neofs.epoch_update_interval" + // List of allowed AccessKeyID prefixes. cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes" // envPrefix is an environment variables prefix used for configuration. envPrefix = "S3_GW" + + // Shows if slicer is enabled. If enabled slicer will be used for object put. + cfgSlicerEnabled = "internal_slicer" ) var ignore = map[string]struct{}{ diff --git a/config/config.env b/config/config.env index 3b64f72e..63049aeb 100644 --- a/config/config.env +++ b/config/config.env @@ -119,6 +119,12 @@ S3_GW_CORS_DEFAULT_MAX_AGE=600 # If not set, default value 0 will be used -- it means that object will be processed according to the container's placement policy S3_GW_NEOFS_SET_COPIES_NUMBER=0 +# Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. +S3_GW_NEOFS_EPOCH_UPDATE_INTERVAL=2m + # List of allowed AccessKeyID prefixes # If not set, S3 GW will accept all AccessKeyIDs S3_GW_ALLOWED_ACCESS_KEY_ID_PREFIXES=Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn + +# Allows to use slicer for Object uploading. +S3_GW_INTERNAL_SLICER=false diff --git a/config/config.yaml b/config/config.yaml index 7037b03d..e2972b9d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -136,9 +136,14 @@ neofs: # Number of the object copies to consider PUT to NeoFS successful. # `0` means that object will be processed according to the container's placement policy set_copies_number: 0 + # Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. + epoch_update_interval: 2m # List of allowed AccessKeyID prefixes # If the parameter is omitted, S3 GW will accept all AccessKeyIDs allowed_access_key_id_prefixes: - Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX - 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn + +# Allows to use slicer for Object uploading. +internal_slicer: false diff --git a/internal/neofs/epoch.go b/internal/neofs/epoch.go new file mode 100644 index 00000000..3cef716f --- /dev/null +++ b/internal/neofs/epoch.go @@ -0,0 +1,74 @@ +package neofs + +import ( + "context" + "sync/atomic" + "time" + + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "go.uber.org/zap" +) + +type ( + // NetworkInfoGetter represents provider to get actual [netmap.NetworkInfo]. + NetworkInfoGetter interface { + NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error) + } + + // EpochGetter represents provider to get actual NeoFS epoch. + EpochGetter interface { + CurrentEpoch() uint64 + } + + // PeriodicGetter implements [EpochGetter]. + PeriodicGetter struct { + logger *zap.Logger + netGetter NetworkInfoGetter + epoch atomic.Uint64 + timeOut time.Duration + } +) + +// NewPeriodicGetter is a constructor to [PeriodicGetter]. +func NewPeriodicGetter(ctx context.Context, initialEpoch uint64, timeOut time.Duration, netGetter NetworkInfoGetter, logger *zap.Logger) *PeriodicGetter { + getter := &PeriodicGetter{ + timeOut: timeOut, + netGetter: netGetter, + logger: logger, + } + + getter.epoch.Store(initialEpoch) + + go getter.update(ctx) + + return getter +} + +// CurrentEpoch returns actual epoch. +// +// CurrentEpoch implements [EpochGetter]. +func (g *PeriodicGetter) CurrentEpoch() uint64 { + return g.epoch.Load() +} + +func (g *PeriodicGetter) update(ctx context.Context) { + tm := time.NewTicker(g.timeOut) + + for { + select { + case <-ctx.Done(): + tm.Stop() + return + case <-tm.C: + ni, err := g.netGetter.NetworkInfo(ctx, client.PrmNetworkInfo{}) + if err != nil { + g.logger.Error("periodicGetter: networkInfo", zap.Error(err)) + continue + } + + g.logger.Info("periodicGetter", zap.Uint64("epoch", ni.CurrentEpoch())) + g.epoch.Store(ni.CurrentEpoch()) + } + } +} diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index c41746d5..106aa60e 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/object/slicer" "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/stat" @@ -31,26 +32,30 @@ import ( // Config allows to configure some [NeoFS] parameters. type Config struct { - MaxObjectSize int64 + MaxObjectSize int64 + IsSlicerEnabled bool + IsHomomorphicEnabled bool } // NeoFS represents virtual connection to the NeoFS network. // It is used to provide an interface to dependent packages // which work with NeoFS. type NeoFS struct { - pool *pool.Pool - gateSigner user.Signer - anonSigner user.Signer - cfg Config + pool *pool.Pool + gateSigner user.Signer + anonSigner user.Signer + cfg Config + epochGetter EpochGetter } // NewNeoFS creates new NeoFS using provided pool.Pool. -func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config) *NeoFS { +func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS { return &NeoFS{ - pool: p, - gateSigner: signer, - anonSigner: anonSigner, - cfg: cfg, + pool: p, + gateSigner: signer, + anonSigner: anonSigner, + cfg: cfg, + epochGetter: epochGetter, } } @@ -273,6 +278,28 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi prm.Payload = bytes.NewReader(obj.Payload()) } + if x.cfg.IsSlicerEnabled { + opts := slicer.Options{} + opts.SetObjectPayloadLimit(uint64(x.cfg.MaxObjectSize)) + opts.SetCopiesNumber(prm.CopiesNumber) + opts.SetCurrentNeoFSEpoch(x.epochGetter.CurrentEpoch()) + + if x.cfg.IsHomomorphicEnabled { + opts.CalculateHomomorphicChecksum() + } + + if prm.BearerToken != nil { + opts.SetBearerToken(*prm.BearerToken) + } + + objID, err := slicer.Put(ctx, x.pool, obj, x.signer(ctx), prm.Payload, opts) + if err != nil { + return oid.ID{}, fmt.Errorf("slicer put: %w", err) + } + + return objID, nil + } + var prmObjPutInit client.PrmObjectPutInit prmObjPutInit.SetCopiesNumber(prm.CopiesNumber)