Skip to content

Latest commit

 

History

History
238 lines (177 loc) · 11 KB

File metadata and controls

238 lines (177 loc) · 11 KB
layout title parent grand_parent nav_order
default
s3
Sinks
Pipelines
55

s3

The s3 sink saves and writes batches of Data Prepper events to Amazon Simple Storage Service (Amazon S3) objects. The configured codec determines how the s3 sink serializes the data into Amazon S3.

The s3 sink uses the following format when batching events:

${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension}

When a batch of objects is written to S3, the objects are formatted similarly to the following:

my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json

For more information about how to configure an object, see the Object key section.

Usage

The following example creates a pipeline configured with an s3 sink. It contains additional options for customizing the event and size thresholds for which the pipeline sends record events and sets the codec type ndjson:

pipeline:
  ...
  sink:
    - s3:
        aws:
          region: us-east-1
          sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
        max_retries: 5
        bucket: bucket_name
        object_key:
          path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/
        threshold:
          event_count: 10000
          maximum_size: 50mb
          event_collect_timeout: 15s
        codec:
          ndjson:
        buffer_type: in_memory

IAM permissions

In order to use the s3 sink, configure AWS Identity and Access Management (IAM) to grant Data Prepper permissions to write to Amazon S3. You can use a configuration similar to the following JSON configuration:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3-access",
            "Effect": "Allow",
            "Action": [
              "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
        }
    ]
}

Configuration

Use the following options when customizing the s3 sink.

Option Required Type Description
bucket Yes String The name of the S3 bucket to which objects are stored. The name must match the name of your object store.
codec Yes Codec The codec determining the format of output data.
aws Yes AWS The AWS configuration. See aws for more information.
threshold Yes Threshold Configures when to write an object to S3.
object_key No Sets the path_prefix and the file_pattern of the object store. The file pattern is always events-%{yyyy-MM-dd'T'hh-mm-ss}. By default, those objects are found inside the root directory of the bucket. The path_prefix is configurable.
compression No String The compression algorithm to apply: none, gzip, or snappy. Default is none.
buffer_type No Buffer type Determines the buffer type.
max_retries No Integer The maximum number of times a single request should retry when ingesting data to S3. Defaults to 5.

aws

Option Required Type Description
region No String The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region.
sts_role_arn No String The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to null, which will use the standard SDK behavior for credentials.
sts_header_overrides No Map A map of header overrides that the IAM role assumes for the sink plugin.
sts_external_id No String An STS external ID used when Data Prepper assumes the role. For more information, see the ExternalId documentation in the STS AssumeRole API reference.

Threshold configuration

Use the following options to set ingestion thresholds for the s3 sink. When any of these conditions are met, Data Prepper will write events to an S3 object.

Option Required Type Description
event_count Yes Integer The number of Data Prepper events to accumulate before writing an object to S3.
maximum_size No String The maximum number of bytes to accumulate before writing an object to S3. Default is 50mb.
event_collect_timeout Yes String The maximum amount of time before Data Prepper writes an event to S3. The value should be either an ISO-8601 duration, such as PT2M30S, or a simple notation, such as 60s or 1500ms.

Buffer type

buffer_type is an optional configuration that determines how Data Prepper temporarily stores data before writing an object to S3. The default value is in_memory. Use one of the following options:

  • in_memory: Stores the record in memory.
  • local_file: Flushes the record into a file on your local machine. This uses your machine's temporary directory.
  • multipart: Writes using the S3 multipart upload. Every 10 MB is written as a part.

Object key configuration

Option Required Type Description
path_prefix No String The S3 key prefix path to use for objects written to S3. Accepts date-time formatting. For example, you can use %{yyyy}/%{MM}/%{dd}/%{HH}/ to create hourly folders in S3. The prefix path should end with /. By default, Data Prepper writes objects to the root of the S3 bucket.

codec

The codec determines how the s3 source formats data written to each S3 object.

avro codec

The avro codec writes an event as an Apache Avro document.

Because Avro requires a schema, you may either define the schema yourself, or Data Prepper will automatically generate a schema. In general, you should define your own schema because it will most accurately reflect your needs.

We recommend that you make your Avro fields use a null union. Without the null union, each field must be present or the data will fail to write to the sink. If you can be certain that each each event has a given field, you can make it non-nullable.

When you provide your own Avro schema, that schema defines the final structure of your data. Therefore, any extra values inside any incoming events that are not mapped in the Arvo schema will not be included in the final destination. To avoid confusion between a custom Arvo schema and the include_keys or exclude_keys sink configurations, Data Prepper does not allow the use of the include_keys or exclude_keys with a custom schema.

In cases where your data is uniform, you may be able to automatically generate a schema. Automatically generated schemas are based on the first event received by the codec. The schema will only contain keys from this event. Therefore, you must have all keys present in all events in order for the automatically generated schema to produce a working schema. Automatically generated schemas make all fields nullable. Use the sink's include_keys and exclude_keys configurations to control what data is included in the auto-generated schema.

Option Required Type Description
schema Yes String The Avro schema declaration. Not required if auto_schema is set to true.
auto_schema No Boolean When set to true, automatically generates the Avro schema declaration from the first event.

ndjson codec

The ndjson codec writes each line as a JSON object.

The ndjson codec does not take any configurations.

json codec

The json codec writes events in a single large JSON file. Each event is written into an object within a JSON array.

Option Required Type Description
key_name No String The name of the key for the JSON array. By default this is events.

parquet codec

The parquet codec writes events into a Parquet file. When using the Parquet codec, set the buffer_type to in_memory.

The Parquet codec writes data using the Avro schema. Because Parquet requires an Avro schema, you may either define the schema yourself, or Data Prepper will automatically generate a schema. However, we generally recommend that you define your own schema so that it can best meet your needs.

For details on the Avro schema and recommendations, see the Avro codec documentation.

Option Required Type Description
schema Yes String The Avro schema declaration. Not required if auto_schema is set to true.
auto_schema No Boolean When set to true, automatically generates the Avro schema declaration from the first event.

Setting a schema with Parquet

The following example shows you how to configure the s3 sink to write Parquet data into a Parquet file using a schema for VPC Flow Logs:

pipeline:
  ...
  sink:
    - s3:
        aws:
          region: us-east-1
          sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
        bucket: mys3bucket
        object_key:
          path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/
        codec:
          parquet:
            schema: >
              {
                "type" : "record",
                "namespace" : "org.opensearch.dataprepper.examples",
                "name" : "VpcFlowLog",
                "fields" : [
                  { "name" : "version", "type" : ["null", "string"]},
                  { "name" : "srcport", "type": ["null", "int"]},
                  { "name" : "dstport", "type": ["null", "int"]},
                  { "name" : "accountId", "type" : ["null", "string"]},
                  { "name" : "interfaceId", "type" : ["null", "string"]},
                  { "name" : "srcaddr", "type" : ["null", "string"]},
                  { "name" : "dstaddr", "type" : ["null", "string"]},
                  { "name" : "start", "type": ["null", "int"]},
                  { "name" : "end", "type": ["null", "int"]},
                  { "name" : "protocol", "type": ["null", "int"]},
                  { "name" : "packets", "type": ["null", "int"]},
                  { "name" : "bytes", "type": ["null", "int"]},
                  { "name" : "action", "type": ["null", "string"]},
                  { "name" : "logStatus", "type" : ["null", "string"]}
                ]
              }
        threshold:
          event_count: 500000000
          maximum_size: 20mb
          event_collect_timeout: PT15M
        buffer_type: in_memory