Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] S3 source processes SQS notification when S3 folder is created #3727

Closed
asifsmohammed opened this issue Nov 29, 2023 · 1 comment · Fixed by #3806
Closed

[BUG] S3 source processes SQS notification when S3 folder is created #3727

asifsmohammed opened this issue Nov 29, 2023 · 1 comment · Fixed by #3806
Labels
bug Something isn't working
Milestone

Comments

@asifsmohammed
Copy link
Collaborator

Describe the bug
There is a confusion in s3 source users when source receives notification from SQS on creating S3 folder. S3 sources processes this notifications and tries to get the object from folder, which results in 0 records being created and also logging a warning message along with incrementing s3ObjectNoRecordsFound metric.
Failed to find any records in S3 object: s3ObjectReference=[bucketName=bucket-name, key=folder-name/].

To Reproduce
Steps to reproduce the behavior:

  1. Go to S3
  2. Create a folder
  3. Check notification in SQS
  4. Start a pipeline with that notification
  5. You will see the following warning log
    Failed to find any records in S3 object: s3ObjectReference=[bucketName=bucket-name, key=folder-name/].

Expected behavior
There shouldn't a log that there are no records found in that key, but instead we should skip getObject call on this key or have a different log message that it's a folder.

There are couple of ways we can achieve this,

  1. Check the object size, it's 0 for folders and objects with zero objects. We already deserialize the SQS notification to S3EventNotification which contains size.
  2. Validate if key ends with /, I don't see that you can create a key with / in the key name.

We can achieve this by doing the following here

    if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.S3)
         && !parsedMessage.isEmptyNotification()
         && isS3EventNameCreated(parsedMessage)
        && !parsedMessage.getObjectKey().endsWith("/")) {
@chenqi0805
Copy link
Collaborator

I was able to reproduce it with parquet codec where the error message is slightly different:

pipeline:

log-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        parquet:
      sqs:
        queue_url: "https://sqs.us-east-1.amazonaws.com/253613578708/my-osi-sqs"
      compression: "none"
      aws:
        region: "us-east-1"
...
2023-12-04T21:41:59.712 [Thread-11] INFO  org.opensearch.dataprepper.plugins.source.s3.S3ObjectWorker - Read S3 object: [bucketName=my-osi-bucket, key=empty-folder/]
--
2023-12-04T21:41:59.998 [Thread-11] ERROR org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec - An exception occurred while parsing parquet InputStream java.lang.RuntimeException: org.opensearch.dataprepper.plugins.source.s3.S3InputFile@560dad3f is not a Parquet file (length is too low: 0)	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:548) ~[parquet-hadoop-1.13.1.jar:1.13.1]	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799) ~[parquet-hadoop-1.13.1.jar:1.13.1]	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666) ~[parquet-hadoop-1.13.1.jar:1.13.1]	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) ~[parquet-hadoop-1.13.1.jar:1.13.1]	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) ~[parquet-hadoop-1.13.1.jar:1.13.1]	at org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec.parseParquetFile(ParquetInputCodec.java:84) ~[parquet-codecs-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec.parse(ParquetInputCodec.java:74) ~[parquet-codecs-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.S3ObjectWorker.doParseObject(S3ObjectWorker.java:99) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.S3ObjectWorker.lambda$parseS3Object$0(S3ObjectWorker.java:66) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at io.micrometer.core.instrument.composite.CompositeTimer.recordCallable(CompositeTimer.java:129) ~[micrometer-core-1.11.3.jar:1.11.3]	at org.opensearch.dataprepper.plugins.source.s3.S3ObjectWorker.parseS3Object(S3ObjectWorker.java:65) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.S3Service.addS3Object(S3Service.java:19) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.processS3Object(SqsWorker.java:308) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.processS3EventNotificationRecords(SqsWorker.java:290) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.processSqsMessages(SqsWorker.java:142) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.run(SqsWorker.java:117) ~[s3-source-2.6.0-SNAPSHOT.jar:?]	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

</body></html>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants