Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive log data from S3 as a Source #251

Closed
13 tasks done
laneholloway opened this issue Sep 6, 2021 · 12 comments
Closed
13 tasks done

Receive log data from S3 as a Source #251

laneholloway opened this issue Sep 6, 2021 · 12 comments
Assignees
Labels
enhancement New feature or request plugin - source A plugin to receive data from a service or location.
Milestone

Comments

@laneholloway
Copy link
Contributor

laneholloway commented Sep 6, 2021

Use-Case

Many users have external systems which write their logs to Amazon S3. These users want to use OpenSearch to analyze these logs. Data Prepper is an ingestion tool which can aid teams in extracting these logs for S3 and sending them to OpenSearch or elsewhere.

This proposal is to receive events from S3 notifications, read the object from S3, and create log lines for these.

Basic Configuration

This plugin will be a single source plugin which:

  • Polls a configured SQS standard queue which should hold S3 Event messages.
  • Reads S3 objects which the message indicates as created.
  • Uses a configured codec to parse the S3 object into Log Events.
  • Writes the Log Events into the Data Prepper buffer.

The following example shows what a basic configure would look like.

source:
  s3:
    notification_type: sqs
    sqs:
      queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
    codec:
      single-line:
  processor:
    grok:
      match:
        message:  [ "%{COMMONAPACHELOG}" ]

Detailed Process

The S3 Source will start a new thread for reading from S3. (The number of threads can be configured).

This thread will perform the following steps repeatedly until shutdown

  1. Use the SQS ReceiveMessage API to receive messages from SQS.
  2. For each Message from SQS, it will:
    a. Parse the Message as an S3Event.
    b. Download the S3 Object which the S3Event indicates was created.
    c. Decompress the object if configured to do so.
    d. Parse the decompressed file using the configured codec into a list of Log Event objects.
    e. Write the Log objects into the Data Prepper buffer.
  3. Perform a DeleteMessageBatch with all of the messages which were successfully processed.
  4. Repeat

Error Handling

The S3 Source will suppress exceptions which occur during processing. Any Message which is not processed correctly will not be included in the DeleteMessageBatch request. Thus, the message will appear in the SQS again. Data Prepper expects that the SQS queue is correctly configured with a DLQ or MessageRetentionPeriod to prevent the SQS queue from filling up with invalid messages.

Codecs

The S3 Source will use configurable codecs to support multiple data formats in the S3 objects. Initially, two codecs are planned:

  1. single-line - This is used for logs which should be separated by a newline.
  2. json - A codec for parsing JSON logs

Single Line

The single-line codec has no configuration items.

Below is an example S3 object.

POST /search
POST /index
PUT /document/12345

With single-line, the S3 source will produce 3 Events, each with the following structure.

"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "POST /search"
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "POST /index"
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "PUT /document/12345"

JSON

The json codec supports reading a JSON file and will create Events for each JSON object in an array. This S3 plugin is starting with the expectation that the incoming JSON is formed as a large JSON array of JSON objects. Each JSON object in that array is an Event. Thus, this codec will find the first JSON array in the JSON. It will output the objects within that array as Events from the JSON.

Future iterations of this plugin could allow for more customization. One possibility is to use JSON Pointer. However, the first iteration should meet many use-cases and allows for streaming the JSON to support parsing large JSON objects.

Below is an example configuration. This configures the S3 Sink to read a JSON array from the items key.

s3:
  codec:
    json:

Given the following S3 Object:

{
  "http_requests" : [
    { "status" : 200, "path" : "/search", "method" : "POST" },
    { "status" : 200, "path" : "/index", "method" : "POST" },
    { "status" : 200, "path" : "/document/12345", "method" : "PUT" }
  ]
}

The S3 source will output 3 Log events:

"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/index", "method" : "POST" }
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/search", "method" : "POST" }
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/document/12345", "method" : "PUT" }

Compression

The S3 Source will support three configurations for compression.

  1. none - The object will be treated as uncompressed.
  2. gzip - The object will be decompressed using the gzip decompression algorithm
  3. automatic - The S3 Source will example the object key to guess if it is compressed or not. If the key ends with .gz the S3 Source will attempt to decompress it using gzip. It can support other heuristics to determine if the file is compressed in future iterations.

Full Configuration Options

Option Type Required Description
notification_type Enum: sqs Yes Only SQS is supported. SNS may be a future option
compression Enum: none, gzip, automatic No Default is none
codec Codec Yes See Codecs section above.
sqs.queue_url String - URL Yes The queue URL of the SQS queue.
sqs.maximum_messages Integer No Directly related to SQS input. Default is 10.
sqs.visibility_timeout Duration No Directly related to SQS input. Default is TBD.
sqs.wait_time Duration No Directly related to SQS input. Default is TBD.
sqs.poll_delay Duration No An optional delay between iterations of the process. Default is 0 seconds.
sqs.thread_count Integer No Number of threads polling S3. Default is 1.
region String Yes The AWS Region. TBD.
sts_role_arn String No Role used for accessing S3 and SQS
access_key_id String No Static access to S3 and SQS
secret_key_id String No Static access to S3 and SQS
buckets String List No If provided, only read objects from the buckets provided in the list.
account_ids String List No If provided, only read objects from the buckets owned by an accountId in this list.

S3 Events

The S3 Source will parse all SQS Messages according to the S3 Event message structure.

The S3 Source will also parse out any event types which are not s3:ObjectCreated:*. These events will be silently ignored. That is, the S3 Source will remove them from the SQS Queue, and will not create an Events for them.

Additionally, this source will have an optional buckets and account_ids lists. If supplied by the pipeline author, Data Prepper will only read objects for S3 events which are part of that list. For the buckets list, only S3 buckets in the list are used. For the account_ids list, only buckets owned by accounts with matching Ids are used. If this list is not provided, Data Prepper will read from any bucket which is owned by the accountId of the SQS queue. Use of this list is optional.

AWS Permissions Needed

The S3 Source will require the following permissions:

Action Resource
s3:GetObject The S3 bucket and key path for any object needed
sqs:ReceiveMessage The ARN of the SQS queue specified by sqs.queue_url
sqs:DeleteMessageBatch The ARN of the SQS queue specified by sqs.queue_url

Possible Future Enhancements

Direct SNS Notification

The notification_type currently only supports SQS. Some teams may want Data Prepper to receive notifications directly from SNS and thus remove the need for an SQS queue.

The notification_type could support an sns value in the future.

Additional Codecs

As needed, Data Prepper can support other codecs. Some possible candidates to consider are:

  • Multi-line
  • JSON List

Metrics

  • messagesReceived (Counter)
  • messagesDeleted (Counter)
  • messagesFailed (Counter)
  • eventsCreated (Counter)
  • requestsDuration (Timer)

Not Included

  • This proposal is focused only reading S3 objects starting with a notification. Thus any use-case for replay is not part of this scope. Also, use-cases for reading existing logs are not covered. These use-cases can have their own issue.
  • Updated S3 objects are not part of the scope. This work will only support use-cases when a log file is written once.
  • Configuration of SQS queue to receive SNS topics should be done externally. Data Prepper will not manage this.

Tasks

@laneholloway laneholloway added the plugin - source A plugin to receive data from a service or location. label Sep 6, 2021
@dlvenable dlvenable changed the title Accept log data from S3 Accept log data from S3 as a Source Feb 16, 2022
@dlvenable
Copy link
Member

There are at least two use-cases for this feature:

  • Playing back which Data Prepper originally created via the S3 Sink (to be created in Support S3 as a Sink #1048)
  • Reading data which was created in S3 by other sources

@stijnvanrenterghem
Copy link

+1

@dlvenable dlvenable changed the title Accept log data from S3 as a Source Receive log data from S3 as a Source May 2, 2022
@dlvenable dlvenable added this to the v1.5 milestone May 2, 2022
@dlvenable dlvenable self-assigned this May 2, 2022
@dlvenable dlvenable added the enhancement New feature or request label May 6, 2022
@dlvenable
Copy link
Member

I have created an initial draft for the S3 Source. Please see the description for details.

@graytaylor0
Copy link
Member

Does it make sense to require users to use the json processor for json logs? Why not just drop the message key and make each json object it's own Event?

@dlvenable
Copy link
Member

@graytaylor0 , That's a good question. At first, I was thinking that it may be valuable to use the json processor since it could possibly have more advanced features. But, as it is currently proposed, there isn't much there. Also, the way Jackson works, Data Prepper may have to re-serialize the JSON for the current proposal.

So, I am for changing this to output Event objects which fit the JSON model.

I'm also interested in including bucket and key data in the S3 object. So perhaps the JSON should go into a message or object key.

As an example, here is a possible input S3 object:

{
  "fieldA" : "abc",
  "fieldB" : "xyz",
  "http_requests" : [
    { "status" : 200, "path" : "/search", "method" : "POST" },
    { "status" : 200, "path" : "/index", "method" : "POST" },
    { "status" : 200, "path" : "/document/12345", "method" : "PUT" }
  ]
}

The output would be three Events:

"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : { "status" : 200, "path" : "/index", "method" : "POST" }
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : { "status" : 200, "path" : "/search", "method" : "POST" }
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : { "status" : 200, "path" : "/document/12345", "method" : "PUT" }

Thoughts?

@cmanning09
Copy link
Contributor

I have a few points to raise:

  • I think we have some potential configuration naming inconsistencies issues around credentials. The OpenSearch sink supports aws_region, aws_sts_role_arn, etc.
  • Would it be possible to make this solution a little more generic to support reading from a source based on a notification? I am thinking we have plugin support for notifications (AWS SQS, Azure Event Grid) and source (AWS S3, Azure Blob Storage, etc).
  • What metrics will be available with this new source plugin?

@dinujoh
Copy link
Member

dinujoh commented May 11, 2022

Some of the additional SQS properties to consider for polling:
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html

  • ReceiveMessage AttributeName: attributes that need to be returned along with each message. This can be used to generate client side metrics.
  • MaxNumberOfMessages: To handle batch request, this can be configured to read up to 10 messages.

Are we planning to use long polling to receive the message ?

@asifsmohammed
Copy link
Collaborator

  • I think we have some potential configuration naming inconsistencies issues around credentials. The OpenSearch sink supports aws_region, aws_sts_role_arn, etc.
  • Would it be possible to make this solution a little more generic to support reading from a source based on a notification? I am thinking we have plugin support for notifications (AWS SQS, Azure Event Grid) and source (AWS S3, Azure Blob Storage, etc).
  • What metrics will be available with this new source plugin?

Thanks @cmanning09 for bringing up these points.

  • I addressed the naming inconsistencies in a PR which aligns with the OpenSearch sink naming.
  • We can create a separate section in the configuration for different notifications as seen here in current configuration for AWS.
  • Metrics will be added to issue description.

@dlvenable
Copy link
Member

Some of the additional SQS properties to consider for polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html

  • ReceiveMessage AttributeName: attributes that need to be returned along with each message. This can be used to generate client side metrics.
  • MaxNumberOfMessages: To handle batch request, this can be configured to read up to 10 messages.

Are we planning to use long polling to receive the message ?

@dinujoh ,

Regarding the AttributeName, I think the only metric that this plugin might support in this initial version is ApproximateNumberOfMessages. I had not noticed that this was available in this API. If this does indeed return the number of messages in the whole queue, this can be a very useful metric for knowing when to scale up Data Prepper. This would be very important to produce as a metric.

The plugin configuration sqs.maximum_messages will map directly to the MaxNumberOfMessages property. We plan to use the default value of 10 (which is the maximum allowed value).

By default, the S3 plugin will use long polling. This is defined in the plugin configuration in the sqs.wait_time property. The value supplied here will be provided into the SQS WaitTimeSeconds (after converting into seconds). The default value will be to use long-polling. However, the pipeline author can configure 0sec which would disable long polling.

@dlvenable
Copy link
Member

I made a change to the proposal for how JSON is loaded. The new proposal is to choose the first JSON array found. The expectation for Data Prepper is to receive a JSON object which is a single array of many different events. This should work with systems such as AWS CloudTrail.

The original proposal was the use a JSON Pointer to select the array. The disadvantage to that approach is that it is not compatible with Jackson's streaming API. Using the new proposal will allow Data Prepper to use Jackson's streaming API. This will allow Data Prepper to load only parts of the JSON into memory at a time. And it could allow for retrying large files.

@dlvenable
Copy link
Member

dlvenable commented Jun 4, 2022

I'd like to propose adding a few other metrics:

  • eventsFailed - A count for the number of Record<Event> objects which the S3Source created, but was unable to place in the buffer before the buffer write times out.
  • s3ObjectsFailed - Counter for the number of S3 objects which the S3 Source was unable to either load or parse. This could happen if the permissions are insufficient, or if the format is not correct (e.g. the source is configured for JSON, but it is a text file).
  • sqsApproximateNumberOfMessages - Counter for the ApproximateNumberOfMessages value provided by GetQueueAttributes. This will require that the S3 Source also poll the the GetQueueAttributes endpoint to provide this metric. Including this metric should be optional since not all Data Prepper nodes will be given permissions to call GetQueueAttributes.

@dlvenable
Copy link
Member

Taking another look, I recommend that we remove the eventsCreated and eventsFailed metrics. These are redundant with the buffer's metrics - recordsWritten and writeTimeouts. Also, the Trace and HTTP sources do not have similar metrics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request plugin - source A plugin to receive data from a service or location.
Projects
Archived in project
Development

No branches or pull requests

7 participants