Skip to content

Commit

Permalink
Add selective download feature to Data Prepper sources section (opens…
Browse files Browse the repository at this point in the history
…earch-project#6247)

* Add feature to this section

Signed-off-by: Melissa Vagi <[email protected]>

* add content

Signed-off-by: Melissa Vagi <[email protected]>

* Copy edits

Signed-off-by: Melissa Vagi <[email protected]>

* Update selective-download.md

Signed-off-by: Melissa Vagi <[email protected]>

Signed-off-by: Melissa Vagi <[email protected]>

* Address tech review comments

Signed-off-by: Melissa Vagi <[email protected]>

* Address tech review comments

Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: David Venable <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: David Venable <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: David Venable <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: David Venable <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update s3-logs.md

Signed-off-by: Melissa Vagi <[email protected]>

Signed-off-by: Melissa Vagi <[email protected]>

* Update s3-logs.md

Signed-off-by: Melissa Vagi <[email protected]>

Signed-off-by: Melissa Vagi <[email protected]>

* Update s3-logs.md

Signed-off-by: Melissa Vagi <[email protected]>

Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: Nathan Bower <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: Nathan Bower <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: Nathan Bower <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: Nathan Bower <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Co-authored-by: Nathan Bower <[email protected]>
Signed-off-by: Melissa Vagi <[email protected]>

* Update _data-prepper/common-use-cases/s3-logs.md

Signed-off-by: Melissa Vagi <[email protected]>

---------

Signed-off-by: Melissa Vagi <[email protected]>
Co-authored-by: David Venable <[email protected]>
Co-authored-by: Nathan Bower <[email protected]>
Signed-off-by: Sander van de Geijn <[email protected]>
  • Loading branch information
3 people authored and sandervandegeijn committed Jul 30, 2024
1 parent 8fdf388 commit 0f559d4
Showing 1 changed file with 59 additions and 11 deletions.
70 changes: 59 additions & 11 deletions _data-prepper/common-use-cases/s3-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ nav_order: 40

Data Prepper allows you to load logs from [Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3), including traditional logs, JSON documents, and CSV logs.


## Architecture

Data Prepper can read objects from S3 buckets using an [Amazon Simple Queue Service (SQS)](https://aws.amazon.com/sqs/) (Amazon SQS) queue and [Amazon S3 Event Notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html).
Expand All @@ -20,15 +19,14 @@ The following diagram shows the overall architecture of the components involved.

<img src="{{site.url}}{{site.baseurl}}/images/data-prepper/s3-source/s3-architecture.jpg" alt="S3 source architecture">{: .img-fluid}

The flow of data is as follows.
The component data flow is as follows:

1. A system produces logs into the S3 bucket.
2. S3 creates an S3 event notification in the SQS queue.
3. Data Prepper polls Amazon SQS for messages and then receives a message.
4. Data Prepper downloads the content from the S3 object.
5. Data Prepper sends a document to OpenSearch for the content in the S3 object.


## Pipeline overview

Data Prepper supports reading data from S3 using the [`s3` source]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/sources/s3/).
Expand All @@ -44,7 +42,6 @@ Before Data Prepper can read log data from S3, you need the following prerequisi
- An S3 bucket.
- A log producer that writes logs to S3. The exact log producer will vary depending on your specific use case, but could include writing logs to S3 or a service such as Amazon CloudWatch.


## Getting started

Use the following steps to begin loading logs from S3 with Data Prepper.
Expand All @@ -57,8 +54,7 @@ Use the following steps to begin loading logs from S3 with Data Prepper.

### Setting permissions for Data Prepper

To view S3 logs, Data Prepper needs access to Amazon SQS and S3.
Use the following example to set up permissions:
To view S3 logs, Data Prepper needs access to Amazon SQS and S3. Use the following example to set up permissions:

```json
{
Expand Down Expand Up @@ -88,12 +84,13 @@ Use the following example to set up permissions:
]
}
```
{% include copy-curl.html %}

If your S3 objects or SQS queues do not use KMS, you can remove the `kms:Decrypt` permission.

### SQS dead-letter queue

The are two options for how to handle errors resulting from processing S3 objects.
The following two options can be used to handle S3 object processing errors:

- Use an SQS dead-letter queue (DLQ) to track the failure. This is the recommended approach.
- Delete the message from SQS. You must manually find the S3 object and correct the error.
Expand All @@ -104,8 +101,8 @@ The following diagram shows the system architecture when using SQS with DLQ.

To use an SQS dead-letter queue, perform the following steps:

1. Create a new SQS standard queue to act as your DLQ.
2. Configure your SQS's redrive policy [to use your DLQ](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-dead-letter-queue.html). Consider using a low value such as 2 or 3 for the "Maximum Receives" setting.
1. Create a new SQS standard queue to act as the DLQ.
2. Configure your SQS re-drive policy [to use DLQ](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-dead-letter-queue.html). Consider using a low value such as 2 or 3 for the **Maximum Receives** setting.
3. Configure the Data Prepper `s3` source to use `retain_messages` for `on_error`. This is the default behavior.

## Pipeline design
Expand All @@ -125,6 +122,7 @@ s3-log-pipeline:
queue_url: "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
visibility_timeout: "2m"
```
{% include copy-curl.html %}
Configure the following options according to your use case:
Expand Down Expand Up @@ -164,17 +162,67 @@ s3-log-pipeline:
password: "admin"
index: s3_logs
```
{% include copy-curl.html %}

## Multiple Data Prepper pipelines

We recommend that you have one SQS queue per Data Prepper pipeline. In addition, you can have multiple nodes in the same cluster reading from the same SQS queue, which doesn't require additional configuration with Data Prepper.
It is recommended that you have one SQS queue per Data Prepper pipeline. In addition, you can have multiple nodes in the same cluster reading from the same SQS queue, which doesn't require additional Data Prepper configuration.

If you have multiple pipelines, you must create multiple SQS queues for each pipeline, even if both pipelines use the same S3 bucket.

## Amazon SNS fanout pattern

To meet the scale of logs produced by S3, some users require multiple SQS queues for their logs. You can use [Amazon Simple Notification Service](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) (Amazon SNS) to route event notifications from S3 to an SQS [fanout pattern](https://docs.aws.amazon.com/sns/latest/dg/sns-common-scenarios.html). Using SNS, all S3 event notifications are sent directly to a single SNS topic, where you can subscribe to multiple SQS queues.

To make sure that Data Prepper can directly parse the event from the SNS topic, configure [raw message delivery](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html) on the SNS to SQS subscription. Setting this option will not affect other SQS queues that are subscribed to that SNS topic.
To make sure that Data Prepper can directly parse the event from the SNS topic, configure [raw message delivery](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html) on the SNS-to-SQS subscription. Applying this option does not affect other SQS queues subscribed to the SNS topic.

## Filtering and retrieving data using Amazon S3 Select

If a pipeline uses an S3 source, you can use SQL expressions to perform filtering and computations on the contents of S3 objects before ingesting them into the pipeline.

The `s3_select` option supports objects in the [Parquet File Format](https://parquet.apache.org/docs/). It also works with objects that are compressed with GZIP or BZIP2 (for CSV and JSON objects only) and supports columnar compression for the Parquet File Format using GZIP and Snappy.

Refer to [Filtering and retrieving data using Amazon S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html) and [SQL reference for Amazon S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-select-sql-reference.html) for comprehensive information about using Amazon S3 Select.
{: .note}

The following example pipeline retrieves all data from S3 objects encoded in the Parquet File Format:

```json
pipeline:
source:
s3:
s3_select:
expression: "select * from s3object s"
input_serialization: parquet
notification_type: "sqs"
...
```
{% include copy-curl.html %}

The following example pipeline retrieves only the first 10,000 records in the objects:

```json
pipeline:
source:
s3:
s3_select:
expression: "select * from s3object s LIMIT 10000"
input_serialization: parquet
notification_type: "sqs"
...
```
{% include copy-curl.html %}

The following example pipeline retrieves records from S3 objects that have a `data_value` in the given range of 200--500:

```json
pipeline:
source:
s3:
s3_select:
expression: "select s.* from s3object s where s.data_value > 200 and s.data_value < 500 "
input_serialization: parquet
notification_type: "sqs"
...
```
{% include copy-curl.html %}

0 comments on commit 0f559d4

Please sign in to comment.