Skip to content

Commit

Permalink
increase timeout to 30s (#3422)
Browse files Browse the repository at this point in the history
* increase timeout to 30s

* add debug

* use context logger and include size in logs

* close reader

* address comments
  • Loading branch information
ahrav authored Oct 18, 2024
1 parent 88b8c86 commit 6e055ea
Showing 1 changed file with 45 additions and 21 deletions.
66 changes: 45 additions & 21 deletions pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (s *Source) setMaxObjectSize(maxObjectSize int64) {
func (s *Source) newClient(region, roleArn string) (*s3.S3, error) {
cfg := aws.NewConfig()
cfg.CredentialsChainVerboseErrors = aws.Bool(true)
cfg.LogLevel = aws.LogLevel(aws.LogDebugWithRequestErrors)
cfg.Region = aws.String(region)

switch cred := s.conn.GetCredential().(type) {
Expand Down Expand Up @@ -271,46 +272,62 @@ func (s *Source) getRegionalClientForBucket(ctx context.Context, defaultRegionCl
}

// pageChunker emits chunks onto the given channel from a page
func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan *sources.Chunk, bucket string, page *s3.ListObjectsV2Output, errorCount *sync.Map, pageNumber int, objectCount *uint64) {
func (s *Source) pageChunker(
ctx context.Context,
client *s3.S3,
chunksChan chan *sources.Chunk,
bucket string,
page *s3.ListObjectsV2Output,
errorCount *sync.Map,
pageNumber int,
objectCount *uint64,
) {
for _, obj := range page.Contents {
obj := obj
if common.IsDone(ctx) {
return
}

if obj == nil {
continue
}

// skip GLACIER and GLACIER_IR objects
ctx = context.WithValues(
ctx,
"key", *obj.Key,
"bucket", bucket,
"page", pageNumber,
"size", *obj.Size,
)

if common.IsDone(ctx) {
return
}

// Skip GLACIER and GLACIER_IR objects.
if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") {
s.log.V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass, "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass)
continue
}

// ignore large files
// Ignore large files.
if *obj.Size > s.maxObjectSize {
s.log.V(5).Info("Skipping %d byte file (over maxObjectSize limit)", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)")
continue
}

// file empty file
// File empty file.
if *obj.Size == 0 {
s.log.V(5).Info("Skipping 0 byte file", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping empty file")
continue
}

// skip incompatible extensions
// Skip incompatible extensions.
if common.SkipFile(*obj.Key) {
s.log.V(5).Info("Skipping file with incompatible extension", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping file with incompatible extension")
continue
}

s.jobPool.Go(func() error {
defer common.RecoverWithExit(ctx)

if strings.HasSuffix(*obj.Key, "/") {
s.log.V(5).Info("Skipping directory", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping directory")
return nil
}

Expand All @@ -322,14 +339,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
nErr = 0
}
if nErr.(int) > 3 {
s.log.V(2).Info("Skipped due to excessive errors", "object", *obj.Key)
ctx.Logger().V(2).Info("Skipped due to excessive errors")
return nil
}

// Use an anonymous function to retrieve the S3 object with a dedicated timeout context.
// This ensures that the timeout is isolated and does not affect any downstream operations. (e.g. HandleFile)
getObject := func() (*s3.GetObjectOutput, error) {
const getObjectTimeout = 5 * time.Second
const getObjectTimeout = 30 * time.Second
objCtx, cancel := context.WithTimeout(ctx, getObjectTimeout)
defer cancel()

Expand All @@ -342,22 +359,29 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
res, err := getObject()
if err != nil {
if !strings.Contains(err.Error(), "AccessDenied") {
s.log.Error(err, "could not get S3 object", "object", *obj.Key)
ctx.Logger().Error(err, "could not get S3 object")
}
// According to the documentation for GetObjectWithContext,
// the response can be non-nil even if there was an error.
// It's uncertain if the body will be nil in such cases,
// but we'll close it if it's not.
if res != nil && res.Body != nil {
res.Body.Close()
}

nErr, ok := errorCount.Load(prefix)
if !ok {
nErr = 0
}
if nErr.(int) > 3 {
s.log.V(3).Info("Skipped due to excessive errors", "object", *obj.Key)
ctx.Logger().V(3).Info("Skipped due to excessive errors")
return nil
}
nErr = nErr.(int) + 1
errorCount.Store(prefix, nErr)
// too many consecutive errors on this page
if nErr.(int) > 3 {
s.log.V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix)
ctx.Logger().V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix)
}
return nil
}
Expand Down Expand Up @@ -393,7 +417,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
}

atomic.AddUint64(objectCount, 1)
s.log.V(5).Info("S3 object scanned.", "object_count", objectCount, "page_number", pageNumber)
ctx.Logger().V(5).Info("S3 object scanned.", "object_count", objectCount)
nErr, ok = errorCount.Load(prefix)
if !ok {
nErr = 0
Expand Down

0 comments on commit 6e055ea

Please sign in to comment.