Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event aggregation use case to Data Prepper documentation #6206

Merged
merged 12 commits into from
Feb 22, 2024
135 changes: 135 additions & 0 deletions _data-prepper/common-use-cases/event-aggregation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
---
layout: default
title: Event aggregation with Data Prepper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also drop the " with Data Prepper" here?

parent: Common use cases
nav_order: 40
---

# Event aggregation with Data Prepper

You can use Data Prepper to aggregate data from different events over a period of time. Aggregating events can help reduce unnecessary log volume and handle use cases like multiline logs that come in as separate events. The [`aggregate` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/aggregate/) is a stateful processor that groups events based on the values for a set of specified identification keys, and performs a configurable action on each group.
vagimeli marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"multiline logs that are received as"?


State in the `aggregate` processor is stored in memory. For example, in order to combine four events into one, the processor needs to retain pieces of the first three events. The state of an aggregate group of events is kept for a configurable amount of time. Depending on your logs, the aggregate action being used, and the amount of memory options in the processor configuration, the aggregation could take place over a long period of time.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
State in the `aggregate` processor is stored in memory. For example, in order to combine four events into one, the processor needs to retain pieces of the first three events. The state of an aggregate group of events is kept for a configurable amount of time. Depending on your logs, the aggregate action being used, and the amount of memory options in the processor configuration, the aggregation could take place over a long period of time.
State in the `aggregate` processor is stored in memory. For example, in order to combine four events into one, the processor needs to retain pieces of the first three events. The state of an aggregate group of events is kept for a configurable amount of time. Depending on your logs, the aggregate action being used, and the number of memory options in the processor configuration, the aggregation could take place over a long period of time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The aggregate processor state is stored in memory"? Otherwise, can the sentence start with "The state"?


## Basic usage

The following example pipeline extracts the fields `sourceIp`, `destinationIp`, and `port` using the [`grok` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/grok/), and then aggregates on those fields over a period of 30 seconds using the [`aggregate` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/aggregate/) and the `put_all` action. At the end of the 30 seconds, the aggregated log is sent to the OpenSearch sink.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The following example pipeline extracts the fields `sourceIp`, `destinationIp`, and `port` using the [`grok` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/grok/), and then aggregates on those fields over a period of 30 seconds using the [`aggregate` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/aggregate/) and the `put_all` action. At the end of the 30 seconds, the aggregated log is sent to the OpenSearch sink.
The following example pipeline extracts the fields `sourceIp`, `destinationIp`, and `port` using the [`grok` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/grok/) and then aggregates on those fields over a period of 30 seconds using the [`aggregate` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/aggregate/) and the `put_all` action. At the end of the 30-second period, the aggregated log is sent to the OpenSearch sink.


```json
aggregate_pipeline:
source:
http:
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: ["%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port:int}"]
- aggregate:
group_duration: "30s"
identification_keys: ["sourceIp", "destinationIp", "port"]
action:
put_all:
sink:
- opensearch:
...
index: aggregated_logs
```
{% include copy-curl.html %}

For example, consider the following batch of logs:

```json
{ "log": "127.0.0.1 192.168.0.1 80", "status": 200 }
{ "log": "127.0.0.1 192.168.0.1 80", "bytes": 1000 }
{ "log": "127.0.0.1 192.168.0.1 80" "http_verb": "GET" }
```
{% include copy-curl.html %}

he `grok` processor will extract keys such that the log events look like the following. These events now have the data that the `aggregate` processor will need for the `identification_keys`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
he `grok` processor will extract keys such that the log events look like the following. These events now have the data that the `aggregate` processor will need for the `identification_keys`.
The `grok` processor will extract keys such that the log events will look like the following. These events now have the data that the `aggregate` processor will need for the `identification_keys`.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a noun after "following"?


```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "http_verb": "GET" }
```
{% include copy-curl.html %}

When the group finishes 30 seconds after when the first log is received by the `aggregate` processor, the following aggregated log is written to the sink:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first phrase needs some revision/simplification. Are all of these words necessary?


```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200, "bytes": 1000, "http_verb": "GET" }
```
{% include copy-curl.html %}

## Removing duplicates

You can remove duplicate entries by deriving keys from incoming events and specifying the `remove_duplicates` option for the `aggregate` processor. This action immediately processes the first event for a group, and drops all following events in that group.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can remove duplicate entries by deriving keys from incoming events and specifying the `remove_duplicates` option for the `aggregate` processor. This action immediately processes the first event for a group, and drops all following events in that group.
You can remove duplicate entries by deriving keys from incoming events and specifying the `remove_duplicates` option for the `aggregate` processor. This action immediately processes the first event for a group and drops all following events in that group.


In the following example, the first event is processed with the identification keys `sourceIp` and `destinationIp`:

```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
```
{% include copy-curl.html %}

The pipeline will then drop the following event because it has the same keys:

```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
```
{% include copy-curl.html %}

The pipeline processes this event and creates a new group because the `sourceIp` is different:

```json
{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }
```
{% include copy-curl.html %}

## Log aggregation and conditional routing

You can use multiple plugins to combine log aggregation with conditional routing. In this example, the sub-pipeline `log-aggregate-pipeline` receives logs by using an HTTP client like FluentBit and extracts important values from the logs by matching the value in the `log` key against the common Apache log pattern.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can use multiple plugins to combine log aggregation with conditional routing. In this example, the sub-pipeline `log-aggregate-pipeline` receives logs by using an HTTP client like FluentBit and extracts important values from the logs by matching the value in the `log` key against the common Apache log pattern.
You can use multiple plugins to combine log aggregation with conditional routing. In this example, the sub-pipeline `log-aggregate-pipeline` receives logs by using an HTTP client, like FluentBit, and extracts important values from the logs by matching the value in the `log` key against the common Apache log pattern.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End of last sentence: "against the Apache Common Log Format"?


Two of the values the sub-pipeline extracts from the logs with a grok pattern include `response` and `clientip`. The `aggregate` processor then uses the `clientip` value, along with the `remove_duplicates` option, to drop any logs that contain a `clientip` that has already been processed within the given `group_duration`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Two of the values the sub-pipeline extracts from the logs with a grok pattern include `response` and `clientip`. The `aggregate` processor then uses the `clientip` value, along with the `remove_duplicates` option, to drop any logs that contain a `clientip` that has already been processed within the given `group_duration`.
Two of the values that the sub-pipeline extracts from the logs with a grok pattern include `response` and `clientip`. The `aggregate` processor then uses the `clientip` value, along with the `remove_duplicates` option, to drop any logs that contain a `clientip` that has already been processed within the given `group_duration`.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "Grok" be capitalized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we capitalize when it's part of a proper name, such as Grok Debugger, lowercase it in common usage.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like "Grok pattern" is used here and here and in many other places, with Elastic being the notable exception. I would lean toward capitalizing it if it's a reference to a proper noun, which it appears to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'll clean up the capitalization across Data Prepper and OpenSearch ingest processors.


Three routes, or conditional statements, exist in the pipeline. These routes separate the value of the response into 2xx/3xx, 4xx, and 5xx responses. Logs with a 2xx and 3xx status are sent to the aggregated_2xx_3xx index, logs with a 4xx status are sent to the aggregated_4xx index, and logs with a 5xx status are sent to the aggregated_5xx index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second sentence: Confirm that it shouldn't be "2xx or 3xx status". Should the index names be in code font?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@natebower is correct. These are logs that have either a 2xx or 3xx status.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vagimeli , We want to change this:

Logs with a 2xx and 3xx status

to:

Logs with a 2xx or 3xx status


```json
log-aggregate-pipeline:
source:
http:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/log-aggregate-pipeline/logs". This will be the FluentBit output URI value.
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
- aggregate:
identification_keys: ["clientip"]
action:
remove_duplicates:
group_duration: "180s"
route:
- 2xx_status: "/response >= 200 and /response < 300"
- 3xx_status: "/response >= 300 and /response < 400"
- 4xx_status: "/response >= 400 and /response < 500"
- 5xx_status: "/response >= 500 and /response < 600"
sink:
- opensearch:
...
index: "aggregated_2xx_3xx"
routes:
- 2xx_status
- 3xx_status
- opensearch:
...
index: "aggregated_4xx"
routes:
- 4xx_status
- opensearch:
...
index: "aggregated_5xx"
routes:
- 5xx_status
```