diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index e016cff731..ba676f77b0 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -1442,6 +1442,9 @@ agent: # @schema {"name": "agent.sidecar.config.blob_storage.s3.max_part_size", "type": "string", "pattern": "^[0-9]+(kb|mb|gb)$"} # agent.sidecar.config.blob_storage.s3.max_part_size -- s3 multipart upload max part size max_part_size: 64mb + # @schema {"name": "agent.sidecar.config.blob_storage.s3.max_chunk_size", "type": "string", "pattern": "^[0-9]+(kb|mb|gb)$"} + # agent.sidecar.config.blob_storage.s3.max_chunk_size -- s3 download max chunk size + max_chunk_size: 64mb # @schema {"name": "agent.sidecar.config.compress", "type": "object"} compress: # @schema {"name": "agent.sidecar.config.compress.compress_algorithm", "type": "string", "enum": ["gob", "gzip", "lz4", "zstd"]} @@ -1536,6 +1539,8 @@ agent: retry_count: 100 # agent.sidecar.config.client.transport.backoff.enable_error_log -- backoff error log enabled enable_error_log: true + # agent.sidecar.config.restore_backoff_enabled -- restore backoff enabled + restore_backoff_enabled: false # @schema {"name": "agent.sidecar.config.restore_backoff", "alias": "backoff"} restore_backoff: # agent.sidecar.config.restore_backoff.initial_duration -- restore backoff initial duration diff --git a/internal/config/blob.go b/internal/config/blob.go index 552c919338..c6c4138dd5 100644 --- a/internal/config/blob.go +++ b/internal/config/blob.go @@ -71,7 +71,8 @@ type S3Config struct { EnableEndpointDiscovery bool `json:"enable_endpoint_discovery" yaml:"enable_endpoint_discovery"` EnableEndpointHostPrefix bool `json:"enable_endpoint_host_prefix" yaml:"enable_endpoint_host_prefix"` - MaxPartSize string `json:"max_part_size" yaml:"max_part_size"` + MaxPartSize string `json:"max_part_size" yaml:"max_part_size"` + MaxChunkSize string `json:"max_chunk_size" yaml:"max_chunk_size"` } func (b *Blob) Bind() *Blob { @@ -94,6 +95,7 @@ func (s *S3Config) Bind() *S3Config { s.SecretAccessKey = GetActualValue(s.SecretAccessKey) s.Token = GetActualValue(s.Token) s.MaxPartSize = GetActualValue(s.MaxPartSize) + s.MaxChunkSize = GetActualValue(s.MaxChunkSize) return s } diff --git a/internal/config/sidecar.go b/internal/config/sidecar.go index 34a7605efe..d474e1d657 100644 --- a/internal/config/sidecar.go +++ b/internal/config/sidecar.go @@ -42,6 +42,9 @@ type AgentSidecar struct { // Compress represent compression configurations Compress *CompressCore `yaml:"compress" json:"compress"` + // RestoreBackoffEnabled represent backoff enabled or not + RestoreBackoffEnabled bool `yaml:"restore_backoff_enabled" json:"restore_backoff_enabled"` + // RestoreBackoff represent backoff configurations for restoring process RestoreBackoff *Backoff `yaml:"restore_backoff" json:"restore_backoff"` diff --git a/internal/db/storage/blob/s3/option.go b/internal/db/storage/blob/s3/option.go index cf8c3eeed6..712c4d8a1c 100644 --- a/internal/db/storage/blob/s3/option.go +++ b/internal/db/storage/blob/s3/option.go @@ -19,6 +19,7 @@ package s3 import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/unit" ) @@ -70,3 +71,37 @@ func WithMaxPartSize(size string) Option { return nil } } + +func WithMaxChunkSize(size string) Option { + return func(c *client) error { + b, err := unit.ParseBytes(size) + if err != nil { + return err + } + + if int64(b) >= s3manager.MinUploadPartSize { + c.maxChunkSize = int64(b) + } + + return nil + } +} + +func WithReaderBackoff(enabled bool) Option { + return func(c *client) error { + c.readerBackoffEnabled = enabled + return nil + } +} + +func WithReaderBackoffOpts(opts ...backoff.Option) Option { + return func(c *client) error { + if c.readerBackoffOpts == nil { + c.readerBackoffOpts = opts + } + + c.readerBackoffOpts = append(c.readerBackoffOpts, opts...) + + return nil + } +} diff --git a/internal/db/storage/blob/s3/reader/option.go b/internal/db/storage/blob/s3/reader/option.go index 9f2a437c93..92f910c85a 100644 --- a/internal/db/storage/blob/s3/reader/option.go +++ b/internal/db/storage/blob/s3/reader/option.go @@ -18,6 +18,7 @@ package reader import ( "github.com/aws/aws-sdk-go/service/s3" + "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/errgroup" ) @@ -26,6 +27,8 @@ type Option func(r *reader) var ( defaultOpts = []Option{ WithErrGroup(errgroup.Get()), + WithMaxChunkSize(512 * 1024 * 1024), + WithBackoff(false), } ) @@ -56,3 +59,25 @@ func WithKey(key string) Option { r.key = key } } + +func WithMaxChunkSize(size int64) Option { + return func(r *reader) { + r.maxChunkSize = size + } +} + +func WithBackoff(enabled bool) Option { + return func(r *reader) { + r.backoffEnabled = enabled + } +} + +func WithBackoffOpts(opts ...backoff.Option) Option { + return func(r *reader) { + if r.backoffOpts == nil { + r.backoffOpts = opts + } + + r.backoffOpts = append(r.backoffOpts, opts...) + } +} diff --git a/internal/db/storage/blob/s3/reader/reader.go b/internal/db/storage/blob/s3/reader/reader.go index 1f513ded6a..cd6c8e123b 100644 --- a/internal/db/storage/blob/s3/reader/reader.go +++ b/internal/db/storage/blob/s3/reader/reader.go @@ -17,15 +17,21 @@ package reader import ( + "bytes" "context" "io" + "io/ioutil" + "strconv" "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" + "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + ctxio "github.com/vdaas/vald/internal/io" + "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" ) @@ -37,6 +43,10 @@ type reader struct { pr io.ReadCloser wg *sync.WaitGroup + + backoffEnabled bool + backoffOpts []backoff.Option + maxChunkSize int64 } type Reader interface { @@ -54,42 +64,126 @@ func New(opts ...Option) Reader { } func (r *reader) Open(ctx context.Context) (err error) { - input := &s3.GetObjectInput{ - Bucket: aws.String(r.bucket), - Key: aws.String(r.key), - } - var pw io.WriteCloser r.pr, pw = io.Pipe() - resp, err := r.service.GetObjectWithContext(ctx, input) + r.wg = new(sync.WaitGroup) + r.wg.Add(1) + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer r.wg.Done() + defer pw.Close() + + var offset int64 + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + body, err := func() (io.Reader, error) { + if r.backoffEnabled { + return r.getObjectWithBackoff(ctx, offset, r.maxChunkSize) + } else { + return r.getObject(ctx, offset, r.maxChunkSize) + } + }() + if err != nil { + return err + } + + body, err = ctxio.NewReaderWithContext(ctx, body) + if err != nil { + return err + } + + chunk, err := io.Copy(pw, body) + if err != nil { + return err + } + + if chunk < r.maxChunkSize { + log.Debugf("read %d bytes.", offset+chunk) + return nil + } + + offset += chunk + } + })) + + return nil +} + +func (r *reader) getObjectWithBackoff(ctx context.Context, offset, length int64) (io.Reader, error) { + getFunc := func() (interface{}, error) { + return r.getObject(ctx, offset, length) + } + + b := backoff.New(r.backoffOpts...) + defer b.Close() + + res, err := b.Do(ctx, getFunc) + if err != nil { + return nil, err + } + + return res.(io.Reader), nil +} + +func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader, error) { + log.Debugf("reading %d-%d bytes...", offset, offset+length-1) + resp, err := r.service.GetObjectWithContext( + ctx, + &s3.GetObjectInput{ + Bucket: aws.String(r.bucket), + Key: aws.String(r.key), + Range: aws.String("bytes=" + + strconv.FormatInt(offset, 10) + + "-" + + strconv.FormatInt(offset+length-1, 10), + ), + }, + ) + if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case s3.ErrCodeNoSuchBucket: - return errors.NewErrBlobNoSuchBucket(err, r.bucket) + log.Error(errors.NewErrBlobNoSuchBucket(err, r.bucket)) + return ioutil.NopCloser(bytes.NewReader(nil)), nil case s3.ErrCodeNoSuchKey: - return errors.NewErrBlobNoSuchKey(err, r.key) + log.Error(errors.NewErrBlobNoSuchKey(err, r.key)) + return ioutil.NopCloser(bytes.NewReader(nil)), nil + case "InvalidRange": + return ioutil.NopCloser(bytes.NewReader(nil)), nil } } - return err + return nil, err } - r.wg = new(sync.WaitGroup) - r.wg.Add(1) + res, err := ctxio.NewReadCloserWithContext(ctx, resp.Body) + if err != nil { + return nil, err + } - r.eg.Go(safety.RecoverFunc(func() (err error) { - defer r.wg.Done() - defer resp.Body.Close() - defer pw.Close() + buf := new(bytes.Buffer) - _, err = io.Copy(pw, resp.Body) - return err - })) + defer func() { + e := res.Close() + if e != nil { + log.Warn(e) + } + }() - return nil + _, err = io.Copy(buf, res) + if err != nil { + return nil, err + } + + return buf, nil } func (r *reader) Close() error { diff --git a/internal/db/storage/blob/s3/s3.go b/internal/db/storage/blob/s3/s3.go index 11cd8fd64e..e39b2be4e5 100644 --- a/internal/db/storage/blob/s3/s3.go +++ b/internal/db/storage/blob/s3/s3.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/db/storage/blob" "github.com/vdaas/vald/internal/db/storage/blob/s3/reader" "github.com/vdaas/vald/internal/db/storage/blob/s3/writer" @@ -36,7 +37,11 @@ type client struct { service *s3.S3 bucket string - maxPartSize int64 + maxPartSize int64 + maxChunkSize int64 + + readerBackoffEnabled bool + readerBackoffOpts []backoff.Option } func New(opts ...Option) (blob.Bucket, error) { @@ -66,6 +71,9 @@ func (c *client) Reader(ctx context.Context, key string) (io.ReadCloser, error) reader.WithService(c.service), reader.WithBucket(c.bucket), reader.WithKey(key), + reader.WithMaxChunkSize(c.maxChunkSize), + reader.WithBackoff(c.readerBackoffEnabled), + reader.WithBackoffOpts(c.readerBackoffOpts...), ) return r, r.Open(ctx) diff --git a/internal/db/storage/blob/s3/writer/option.go b/internal/db/storage/blob/s3/writer/option.go index 7cc3a0b8c8..48f6f9c18c 100644 --- a/internal/db/storage/blob/s3/writer/option.go +++ b/internal/db/storage/blob/s3/writer/option.go @@ -26,6 +26,7 @@ type Option func(w *writer) var ( defaultOpts = []Option{ WithErrGroup(errgroup.Get()), + WithContentType("application/octet-stream"), WithMaxPartSize(64 * 1024 * 1024), } ) @@ -58,6 +59,14 @@ func WithKey(key string) Option { } } +func WithContentType(ct string) Option { + return func(w *writer) { + if ct != "" { + w.contentType = ct + } + } +} + func WithMaxPartSize(max int64) Option { return func(w *writer) { w.maxPartSize = max diff --git a/internal/db/storage/blob/s3/writer/writer.go b/internal/db/storage/blob/s3/writer/writer.go index 759cb66e60..7d83b99098 100644 --- a/internal/db/storage/blob/s3/writer/writer.go +++ b/internal/db/storage/blob/s3/writer/writer.go @@ -36,6 +36,7 @@ type writer struct { bucket string key string + contentType string maxPartSize int64 pw io.WriteCloser @@ -103,9 +104,10 @@ func (w *writer) upload(ctx context.Context, body io.Reader) (err error) { }, ) input := &s3manager.UploadInput{ - Bucket: aws.String(w.bucket), - Key: aws.String(w.key), - Body: body, + Bucket: aws.String(w.bucket), + Key: aws.String(w.key), + Body: body, + ContentType: aws.String(w.contentType), } res, err := uploader.UploadWithContext(ctx, input) diff --git a/internal/errors/io.go b/internal/errors/io.go new file mode 100644 index 0000000000..5cc21cf9ac --- /dev/null +++ b/internal/errors/io.go @@ -0,0 +1,33 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package errors provides error types and function +package errors + +var ( + // io + NewErrContextNotProvided = func() error { + return New("context not provided") + } + + NewErrReaderNotProvided = func() error { + return New("io.Reader not provided") + } + + NewErrWriterNotProvided = func() error { + return New("io.Writer not provided") + } +) diff --git a/internal/io/io.go b/internal/io/io.go new file mode 100644 index 0000000000..ef32023838 --- /dev/null +++ b/internal/io/io.go @@ -0,0 +1,141 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package io provides io functions +package io + +import ( + "context" + "io" + + "github.com/vdaas/vald/internal/errors" +) + +type ctxReader struct { + ctx context.Context + r io.Reader +} + +func NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) { + if ctx == nil { + return nil, errors.NewErrContextNotProvided() + } + + if r == nil { + return nil, errors.NewErrReaderNotProvided() + } + + return &ctxReader{ + ctx: ctx, + r: r, + }, nil +} + +func NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + if ctx == nil { + return nil, errors.NewErrContextNotProvided() + } + + if r == nil { + return nil, errors.NewErrReaderNotProvided() + } + + return &ctxReader{ + ctx: ctx, + r: r, + }, nil +} + +func (r *ctxReader) Read(p []byte) (n int, err error) { + select { + case <-r.ctx.Done(): + return 0, r.ctx.Err() + default: + } + return r.r.Read(p) +} + +func (r *ctxReader) Close() error { + select { + case <-r.ctx.Done(): + return r.ctx.Err() + default: + } + + if c, ok := r.r.(io.Closer); ok { + return c.Close() + } + + return nil +} + +type ctxWriter struct { + ctx context.Context + w io.Writer +} + +func NewWriterWithContext(ctx context.Context, w io.Writer) (io.Writer, error) { + if ctx == nil { + return nil, errors.NewErrContextNotProvided() + } + + if w == nil { + return nil, errors.NewErrWriterNotProvided() + } + + return &ctxWriter{ + ctx: ctx, + w: w, + }, nil +} + +func NewWriteCloserWithContext(ctx context.Context, w io.WriteCloser) (io.WriteCloser, error) { + if ctx == nil { + return nil, errors.NewErrContextNotProvided() + } + + if w == nil { + return nil, errors.NewErrWriterNotProvided() + } + + return &ctxWriter{ + ctx: ctx, + w: w, + }, nil +} + +func (w *ctxWriter) Write(p []byte) (n int, err error) { + select { + case <-w.ctx.Done(): + return 0, w.ctx.Err() + default: + } + return w.w.Write(p) +} + +func (w *ctxWriter) Close() error { + select { + case <-w.ctx.Done(): + return w.ctx.Err() + default: + } + + if c, ok := w.w.(io.Closer); ok { + return c.Close() + } + + return nil +} diff --git a/pkg/agent/sidecar/service/observer/observer.go b/pkg/agent/sidecar/service/observer/observer.go index 04e357dd32..3fa16ef46a 100644 --- a/pkg/agent/sidecar/service/observer/observer.go +++ b/pkg/agent/sidecar/service/observer/observer.go @@ -30,6 +30,7 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file/watch" + ctxio "github.com/vdaas/vald/internal/io" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" @@ -376,13 +377,23 @@ func (o *observer) backup(ctx context.Context) error { } }() - _, err = io.Copy(tw, data) + d, err := ctxio.NewReaderWithContext(ctx, data) + if err != nil { + return err + } + + _, err = io.Copy(tw, d) return err }() }) })) - _, err = io.Copy(sw, pr) + prr, err := ctxio.NewReaderWithContext(ctx, pr) + if err != nil { + return err + } + + _, err = io.Copy(sw, prr) if err != nil { return err } diff --git a/pkg/agent/sidecar/service/restorer/option.go b/pkg/agent/sidecar/service/restorer/option.go index e0300f26ba..5e62baa443 100644 --- a/pkg/agent/sidecar/service/restorer/option.go +++ b/pkg/agent/sidecar/service/restorer/option.go @@ -28,6 +28,7 @@ type Option func(r *restorer) error var ( defaultOpts = []Option{ WithErrGroup(errgroup.Get()), + WithBackoff(false), } ) @@ -61,6 +62,13 @@ func WithBlobStorage(storage storage.Storage) Option { } } +func WithBackoff(enabled bool) Option { + return func(r *restorer) error { + r.backoffEnabled = enabled + return nil + } +} + func WithBackoffOpts(opts ...backoff.Option) Option { return func(r *restorer) error { if r.backoffOpts == nil { diff --git a/pkg/agent/sidecar/service/restorer/restorer.go b/pkg/agent/sidecar/service/restorer/restorer.go index 0c37da06b1..032db0a0de 100644 --- a/pkg/agent/sidecar/service/restorer/restorer.go +++ b/pkg/agent/sidecar/service/restorer/restorer.go @@ -29,6 +29,7 @@ import ( "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + ctxio "github.com/vdaas/vald/internal/io" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" @@ -45,7 +46,8 @@ type restorer struct { storage storage.Storage - backoffOpts []backoff.Option + backoffEnabled bool + backoffOpts []backoff.Option } func New(opts ...Option) (Restorer, error) { @@ -109,29 +111,31 @@ func (r *restorer) startRestore(ctx context.Context) (<-chan error, error) { return ech, err } - r.eg.Go(safety.RecoverFunc(func() (err error) { - defer close(ech) + restore := func() (interface{}, error) { + err := r.restore(ctx) + if err != nil { + log.Errorf("restoring failed: %s", err) - b := backoff.New(r.backoffOpts...) - defer b.Close() + return nil, err + } - _, err = b.Do(ctx, func() (interface{}, error) { - err := r.restore(ctx) - if err != nil { - log.Errorf("restoring failed: %s", err) + return nil, nil + } - if errors.IsErrBlobNoSuchBucket(err) || - errors.IsErrBlobNoSuchKey(err) { - return nil, nil - } + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) - return nil, err - } + if r.backoffEnabled { + b := backoff.New(r.backoffOpts...) + defer b.Close() + + _, err = b.Do(ctx, restore) + } else { + _, err = restore() + } - return nil, nil - }) if err != nil { - return p.Signal(syscall.SIGKILL) // TODO: #403 + log.Errorf("couldn't restore: %s", err) } return p.Signal(syscall.SIGTERM) // TODO: #403 @@ -159,6 +163,11 @@ func (r *restorer) restore(ctx context.Context) (err error) { return err } + sr, err = ctxio.NewReaderWithContext(ctx, sr) + if err != nil { + return err + } + _, err = io.Copy(pw, sr) if err != nil { return err @@ -170,6 +179,12 @@ func (r *restorer) restore(ctx context.Context) (err error) { tr := tar.NewReader(pr) for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + header, err := tr.Next() if err != nil { if err == io.EOF { @@ -202,7 +217,12 @@ func (r *restorer) restore(ctx context.Context) (err error) { return err } - _, err = io.Copy(f, tr) + fw, err := ctxio.NewWriterWithContext(ctx, f) + if err != nil { + return errors.Wrap(f.Close(), err.Error()) + } + + _, err = io.Copy(fw, tr) if err != nil { return err } diff --git a/pkg/agent/sidecar/usecase/initcontainer/initcontainer.go b/pkg/agent/sidecar/usecase/initcontainer/initcontainer.go index 752bc4a4a0..4a23985a07 100644 --- a/pkg/agent/sidecar/usecase/initcontainer/initcontainer.go +++ b/pkg/agent/sidecar/usecase/initcontainer/initcontainer.go @@ -120,6 +120,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) { ), storage.WithS3Opts( s3.WithMaxPartSize(cfg.AgentSidecar.BlobStorage.S3.MaxPartSize), + s3.WithMaxChunkSize(cfg.AgentSidecar.BlobStorage.S3.MaxChunkSize), + s3.WithReaderBackoff(cfg.AgentSidecar.RestoreBackoffEnabled), + s3.WithReaderBackoffOpts(cfg.AgentSidecar.RestoreBackoff.Opts()...), ), storage.WithCompressAlgorithm(cfg.AgentSidecar.Compress.CompressAlgorithm), storage.WithCompressionLevel(cfg.AgentSidecar.Compress.CompressionLevel), @@ -132,6 +135,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { restorer.WithErrGroup(eg), restorer.WithDir(cfg.AgentSidecar.WatchDir), restorer.WithBlobStorage(bs), + restorer.WithBackoff(cfg.AgentSidecar.RestoreBackoffEnabled), restorer.WithBackoffOpts(cfg.AgentSidecar.RestoreBackoff.Opts()...), ) if err != nil { diff --git a/pkg/agent/sidecar/usecase/sidecar/sidecar.go b/pkg/agent/sidecar/usecase/sidecar/sidecar.go index f3576464a2..dc3c93c933 100644 --- a/pkg/agent/sidecar/usecase/sidecar/sidecar.go +++ b/pkg/agent/sidecar/usecase/sidecar/sidecar.go @@ -120,6 +120,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) { ), storage.WithS3Opts( s3.WithMaxPartSize(cfg.AgentSidecar.BlobStorage.S3.MaxPartSize), + s3.WithMaxChunkSize(cfg.AgentSidecar.BlobStorage.S3.MaxChunkSize), + s3.WithReaderBackoff(cfg.AgentSidecar.RestoreBackoffEnabled), + s3.WithReaderBackoffOpts(cfg.AgentSidecar.RestoreBackoff.Opts()...), ), storage.WithCompressAlgorithm(cfg.AgentSidecar.Compress.CompressAlgorithm), storage.WithCompressionLevel(cfg.AgentSidecar.Compress.CompressionLevel),