Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add selective download feature to Data Prepper sources section #6247

Merged
merged 28 commits into from
Jun 27, 2024
Merged
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a931ca6
Add feature to this section
vagimeli Jan 23, 2024
1b4d8df
add content
vagimeli Jan 31, 2024
934a19e
Copy edits
vagimeli Jan 31, 2024
3666347
Merge branch 'main' into selective-download
vagimeli Jan 31, 2024
9566e3e
Merge branch 'main' into selective-download
vagimeli Feb 5, 2024
a5f367b
Merge branch 'main' into selective-download
vagimeli Mar 6, 2024
2ee9fc0
Update selective-download.md
vagimeli Mar 6, 2024
afe349a
Merge branch 'main' into selective-download
vagimeli Apr 4, 2024
86cb777
Merge branch 'main' into selective-download
vagimeli Apr 9, 2024
38f7589
Merge branch 'main' into selective-download
vagimeli May 13, 2024
06cf450
Address tech review comments
vagimeli May 13, 2024
d53813f
Address tech review comments
vagimeli May 13, 2024
5644771
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli May 24, 2024
3f99c9c
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli May 24, 2024
eb59294
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli May 24, 2024
1e4537b
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli May 24, 2024
2a1db98
Merge branch 'main' into selective-download
vagimeli May 24, 2024
7fa5358
Update s3-logs.md
vagimeli May 29, 2024
5509831
Update s3-logs.md
vagimeli May 29, 2024
372c24f
Update s3-logs.md
vagimeli May 29, 2024
e146721
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
1615a03
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
18c4245
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
ac30dec
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
f2b57d2
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
46ca0b6
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
155138e
Update _data-prepper/common-use-cases/s3-logs.md
vagimeli Jun 27, 2024
313a6c3
Merge branch 'main' into selective-download
vagimeli Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 59 additions & 11 deletions _data-prepper/common-use-cases/s3-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ nav_order: 40

Data Prepper allows you to load logs from [Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3), including traditional logs, JSON documents, and CSV logs.


## Architecture

Data Prepper can read objects from S3 buckets using an [Amazon Simple Queue Service (SQS)](https://aws.amazon.com/sqs/) (Amazon SQS) queue and [Amazon S3 Event Notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html).
Expand All @@ -20,15 +19,14 @@ The following diagram shows the overall architecture of the components involved.

<img src="{{site.url}}{{site.baseurl}}/images/data-prepper/s3-source/s3-architecture.jpg" alt="S3 source architecture">{: .img-fluid}

The flow of data is as follows.
The component data flow is as follows:

1. A system produces logs into the S3 bucket.
2. S3 creates an S3 event notification in the SQS queue.
3. Data Prepper polls Amazon SQS for messages and then receives a message.
4. Data Prepper downloads the content from the S3 object.
5. Data Prepper sends a document to OpenSearch for the content in the S3 object.


## Pipeline overview

Data Prepper supports reading data from S3 using the [`s3` source]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/sources/s3/).
Expand All @@ -44,7 +42,6 @@ Before Data Prepper can read log data from S3, you need the following prerequisi
- An S3 bucket.
- A log producer that writes logs to S3. The exact log producer will vary depending on your specific use case, but could include writing logs to S3 or a service such as Amazon CloudWatch.


## Getting started

Use the following steps to begin loading logs from S3 with Data Prepper.
Expand All @@ -57,8 +54,7 @@ Use the following steps to begin loading logs from S3 with Data Prepper.

### Setting permissions for Data Prepper

To view S3 logs, Data Prepper needs access to Amazon SQS and S3.
Use the following example to set up permissions:
To view S3 logs, Data Prepper needs access to Amazon SQS and S3. Use the following example to set up permissions:

```json
{
Expand Down Expand Up @@ -88,12 +84,13 @@ Use the following example to set up permissions:
]
}
```
{% include copy-curl.html %}

If your S3 objects or SQS queues do not use KMS, you can remove the `kms:Decrypt` permission.

### SQS dead-letter queue

The are two options for how to handle errors resulting from processing S3 objects.
The following two options can be used to handle S3 object processing errors:

- Use an SQS dead-letter queue (DLQ) to track the failure. This is the recommended approach.
- Delete the message from SQS. You must manually find the S3 object and correct the error.
Expand All @@ -104,8 +101,8 @@ The following diagram shows the system architecture when using SQS with DLQ.

To use an SQS dead-letter queue, perform the following steps:

1. Create a new SQS standard queue to act as your DLQ.
2. Configure your SQS's redrive policy [to use your DLQ](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-dead-letter-queue.html). Consider using a low value such as 2 or 3 for the "Maximum Receives" setting.
1. Create a new SQS standard queue to act as the DLQ.
2. Configure your SQS re-drive policy [to use DLQ](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-dead-letter-queue.html). Consider using a low value such as 2 or 3 for the **Maximum Receives** setting.
3. Configure the Data Prepper `s3` source to use `retain_messages` for `on_error`. This is the default behavior.

## Pipeline design
Expand All @@ -125,6 +122,7 @@ s3-log-pipeline:
queue_url: "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
visibility_timeout: "2m"
```
{% include copy-curl.html %}

Configure the following options according to your use case:

Expand Down Expand Up @@ -164,17 +162,67 @@ s3-log-pipeline:
password: "admin"
index: s3_logs
```
{% include copy-curl.html %}

## Multiple Data Prepper pipelines

We recommend that you have one SQS queue per Data Prepper pipeline. In addition, you can have multiple nodes in the same cluster reading from the same SQS queue, which doesn't require additional configuration with Data Prepper.
It is recommended that you have one SQS queue per Data Prepper pipeline. In addition, you can have multiple nodes in the same cluster reading from the same SQS queue, which doesn't require additional Data Prepper configuration.

If you have multiple pipelines, you must create multiple SQS queues for each pipeline, even if both pipelines use the same S3 bucket.

## Amazon SNS fanout pattern

To meet the scale of logs produced by S3, some users require multiple SQS queues for their logs. You can use [Amazon Simple Notification Service](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) (Amazon SNS) to route event notifications from S3 to an SQS [fanout pattern](https://docs.aws.amazon.com/sns/latest/dg/sns-common-scenarios.html). Using SNS, all S3 event notifications are sent directly to a single SNS topic, where you can subscribe to multiple SQS queues.

To make sure that Data Prepper can directly parse the event from the SNS topic, configure [raw message delivery](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html) on the SNS to SQS subscription. Setting this option will not affect other SQS queues that are subscribed to that SNS topic.
To make sure that Data Prepper can directly parse the event from the SNS topic, configure [raw message delivery](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html) on the SNS-to-SQS subscription. Applying this option does not affect other SQS queues subscribed to the SNS topic.

## Filtering and retrieving data using Amazon S3 Select

If a pipeline uses an S3 source, you can use SQL expressions to perform filtering and computations on the contents of S3 objects before ingesting them into the pipeline.

The `s3_select` option supports objects in the [Parquet File Format](https://parquet.apache.org/docs/). It also works with objects that are compressed with GZIP or BZIP2 (for CSV and JSON objects only) and supports columnar compression for the Parquet File Format using GZIP and Snappy.

Refer to [Filtering and retrieving data using Amazon S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html) and [SQL reference for Amazon S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-select-sql-reference.html) for comprehensive information about using Amazon S3 Select.
{: .note}

The following example pipeline retrieves all data S3 objects encoded in the Parquet File Format:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either "from" should follow "data", or "data" should be removed.

vagimeli marked this conversation as resolved.
Show resolved Hide resolved

```json
pipeline:
source:
s3:
s3_select:
expression: "select * from s3object s"
input_serialization: parquet
notification_type: "sqs"
...
```
{% include copy-curl.html %}

The following example pipeline retrieves only the first 10,000 records in the objects:

```json
pipeline:
source:
s3:
s3_select:
expression: "select * from s3object s LIMIT 10000"
input_serialization: parquet
notification_type: "sqs"
...
```
{% include copy-curl.html %}

The following example pipeline retrieves records from S3 objects that have a `data_value` in the given range of 200--500:

```json
pipeline:
source:
s3:
s3_select:
expression: "select s.* from s3object s where s.data_value > 200 and s.data_value < 500 "
input_serialization: parquet
notification_type: "sqs"
...
```
{% include copy-curl.html %}
Loading