Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Stateful Aggregation #699

Closed
dlvenable opened this issue Dec 3, 2021 · 2 comments
Closed

[RFC] Stateful Aggregation #699

dlvenable opened this issue Dec 3, 2021 · 2 comments
Assignees
Labels
proposal Proposed major changes to Data Prepper

Comments

@dlvenable
Copy link
Member

dlvenable commented Dec 3, 2021

Background and Current Design

Users of Data Prepper often want to aggregate data flowing through Data Prepper.
Two common examples are:

  • Deduplicating multiple events which should be only one conceptual event.
  • Combing events which have data split is another example.

These types of operations require more than one event over a period of time. For example, to combine four distinct events into one, Data Prepper needs to retain the first three events. When the fourth event arrives, then the data is combined and sent through the pipeline. Because Data Prepper must maintain previous events, this is stateful aggregation.

This RFC outlines a proposal for supporting stateful aggregation in Data Prepper.

Current Design

Data Prepper currently supports stateful aggregation only for Trace Analytics. Data Prepper can build an application service map using data from traces. There are two major components used for stateful aggregation in this scenario.

  • peer-forwarder
  • service-map-stateful

Data Prepper partitions stateful data in multi-node clusters by assigning each node a set of traces to process. Each node need only maintain the state for its set of data. The peer forwarder determines which node should handle a given trace and reroutes it to that node. It determines the dedicated node for a trace using consistent hashing and a hash ring.

The current Peer Forwarder takes an ExportTraceServiceRequest and splits it into different spans. It groups these spans by traceId and determines which nodes should operate on the traces. It then re-builds new ExportTraceServiceRequest from those traces. Then it makes an HTTP request to the OTel Source for the destination node. For traces that are already on the correct node, it returns them in the current pipeline.

The following diagram outlines the current approach for aggregating traces into a service map. (For simplicity, this diagram excludes the raw trace pipeline). It shows the flow of trace data through a pipeline.

Aggregation-TraceCurrent

The current approach has limitations which prevent it from being used in situations other than trace analytics.

  1. Peer Forwarder must know what the incoming message format is.
  2. Peer Forwarder must use the same protocol of that source
  3. Events must be rebuilt, which would be difficult to perform generally.
  4. Pipelines cannot enrich data before Peer Forwarder because the input must be of the correct shape for the OTel Source.

Data Prepper also has a service-map-stateful plugin which creates a service map from trace data. This plugin uses two windows to maintain state. There is a current window and a previous window. The plugin saves new state data in the current window and loads data from both current and previous. When the window duration ends, it replaces the previous window with the current and creates a fresh current window.

Proposed Changes

Data Prepper will include a stateful aggregate processor. Data Prepper will also include peer forwarding as a core feature which the aggregate processor can use. Other plugins could also make use of this feature if they need it.

The following diagram outlines the flow of an Event through a pipeline with the Aggregate processor.

Aggregation

Peer Forwarder Design

The proposed design is to create a more general Peer Forwarder as part of Data Prepper Core. In this design, any plugin can request peer forwarding of events between Data Prepper nodes. The details of the peer forwarder are outlined in #700.

For this design, the aggregate plugin will use the new Peer Forwarder which Data Prepper will provide.

Aggregate Plugin

Data Prepper will have a new processor named aggregate. The processor will handle the common aspects of aggregation such as storing state. Because the aggregations will vary between pipelines, users need to configure the actual aggregation logic. For the first iteration, the Aggregate processor will use the plugin framework. Customers can provide implementations which they can inject in the pipeline configuration file.

The following example shows how the Aggregate processor could work.

processor:
  - aggregate:
      identification_keys:
        - 'sourceIp'
        - 'destinationIp'
        - 'port'
      window_duration: 180
      data_path: data/aggregate
      action:
        remove_duplicates:

Additionally, Data Prepper can include some default actions such as:

  • remove_duplicates
  • combine

User-Defined Aggregations

Some Data Prepper users will want their own custom aggregations. The action uses the plugin framework so that users can add custom actions. Users can write these plugins in Java and include them in their Data Prepper installations.

The following class diagram outlines the relevant classes.

AggregationProcessorCD

The AggregateProcessor is the Data Prepper Processor which performs the bulk of the aggregation work. The AggregateAction interface is a pluggable type for performing the custom aggregation steps.

Explanation of operations:

  1. The AggregateProcessor groups Events by the values of the identification_key for each Event.
  2. The AggregateProcessor creates a single Map<Object, Object> for each group. It persists the map between Events in the same group.
  3. For each Event, the AggregateProcessor calls the AggregateAction’s handleEvent method with the Event and with the shared Map for that group.
  4. The implementor of AggregateAction controls whether the AggregateProcessor returns individual events or aggregate events.
    1. If handleEvent returns an Event, then the AggregateProcessor passes that Event onto the next Processor
    2. If handleEvent returns empty, then the AggregateProcessor drops that event and does not pass it onto the next Processor.
  5. After the window completes based on the window duration value, the AggregateProcessor calls concludeGroup with the shared map for that group.
    1. If concludeGroup returns an Event, then the AggregateProcessor passes that Event onto the next Processor
    2. If handleEvent returns empty, then the AggregateProcessor drops that Event does not pass it onto the next Processor
  6. The AggregateProcessor removes the shared map from memory after the window duration expires.

The following interface represents what is necessary for aggregation.

public interface AggregateAction {
    /**
     * Handles an event as part of aggregation.
     *
     * @param event The current event
     * @param groupState An arbitrary map for the current group
     * @return The Event to return. Empty if this event should be removed from processing.
     */
    default Optional<Event> handleEvent(Event event, Map<Object, Object> groupState) {
        return Optional.of(event);
    }

    /**
     * Concludes a group of Events
     *
     * @param groupState The groupState map from previous calls to handleEvent
     * @return The final Event to return. Return empty if the aggregate processor
     * should not pass an event
     */
    default Optional<Event> concludeGroup(Map<Object, Object> groupState) {
        return Optional.empty();
    }
}

The following sequence diagram outlines the interactions:

AggregationSD

This proposed design moves much of the complexity into the AggregateProcessor. It expects that the AggregateAction implementations are as straightforward as possible.

An example implementation for combining Events is as follows:

@DataPrepperPlugin(name = "combine", pluginConfigurationType = AggregateAction.class)
public class CombineAggregateAction implements AggregateAction {
    @Override
    public Optional<Event> handleEvent(Event event, Map<Object, Object> groupState) {
        groupState.putAll(event.getAsMap());
        return Optional.empty();
    }

    @Override
    public Optional<Event> concludeGroup(final Map<Object, Object> groupState) {
        return Optional.of(Event.fromMap(groupState));
    }
}

An example implementation for filtering duplicates:

@DataPrepperPlugin(name = "remove_duplicates", pluginConfigurationType = AggregateAction.class)
public class FilterDuplicatesAggregateAction implements AggregateAction {
    @Override
    public Optional<Event> handleEvent(Event event, Map<Object, Object> groupState) {
        if(groupState.containsKey("previousEvent"))
            return Optional.empty();

        groupState.put("event", event);
        return Optional.of(event);
    }
}

Thread Synchronization

The AggregateProcessor must perform locking so that multiple processors can run. Each group state map can have its own lock to prevent thread contention for all Events.

Conclusion Conditions

Some Events have distinct ending conditions. In these cases, pipeline authors can configure a longer window and close the group early when the condition occurs. When the condition is true, then the AggregateProcessor will call the concludeGroup action immediately. Additionally, AggregateProcessor will clear the group state.

If the condition is not reached within the window, then the AggregateProcessor will call concludeGroup and clear the state when the window ends.

The conditions will use the same syntax as that proposed by Basic Conditional Logic in Preppers #522.

The following example shows the conclude_when property with an example of closing a network connection.

processor:
  - aggregate:
      identification_keys:
        - 'sourceIp'
        - 'destinationIp'
        - 'port'
      window_duration: 300
      data_path: data/aggregate
      action:
        remove_duplicates:
      conclude_when: "/event/type == 'CLOSED'"

This approach can allow for the following when there is a conclusion condition.

  • Events which all arrive quickly close together in time can reach the sink quickly.
  • Events which all arrive within the window_duration will reach the sink as a group, even if it takes a few minutes, as defined by the window_duration.
  • Events which take much longer than the window_duration will send multiple aggregates to the sink. There will be duplicates for these.

Peer Forwarder Integration

This section is based on the Peer Forwarder RFC as detailed in #700.

The aggregate processor will provide the identification_keys as the value for the RequiresPeerForwarding::getCorrelationKeys method. It should look somewhat like the following.

// Set in the constructor as part of the plugin settings.
private final Set<String> identificationKeys;

@Override
public Set<String> getCorrelationKeys() {
  return identificationKeys;
}

Alternatives and Questions

What might a Complete Configuration Look Like?

Here are two example files. One is the pipeline configuration. The second is the Data Prepper configuration file.

pipelines.yaml:

log-aggregation-pipeline:
  source:
    http:
  processor:
    - grok:
        match: '%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port} %{NUMBER:status}'
    - aggregate:
        identification_keys:
          - 'sourceIp'
          - 'destinationIp'
          - 'port'
        window_duration: 180
        data_path: data/aggregate
        action:
          combine:
  sink:
    opensearch:
      hosts: ['https://opensearch.my-domain.net']

data-prepper-config.yaml:

ssl: true
peer_forwarder:
  max_batch_event_count: 48
  port: 4910
  time_out: 300
  discovery_mode: "dns"
  domain_name: "data-prepper-cluster.my-domain.net"

How will the Existing Trace Plugins Change?

The Trace Analytics pipeline currently uses the ExportTraceServiceRequest for trace data moving through peer-forwarder and service-map-stateful. The pipeline must be updated such that the specialized work of splitting up the ExportTraceServiceRequest happens prior to peer forwarding and building the service map. Each Event in Data Prepper for traces should represent a single span rather than holding batches.

The current service map may be more complicated than the aggregate plugin is supporting. Refactoring service-map-stateful to use the aggregate plugin is beyond the scope of this RFC.

The service-map-stateful will use the core Peer Forwarder by implementing the RequiresPeerForwarding interface.

AggregateAction in Pipeline

An alternate design would be to support the aggregate action in code within the pipeline definition. This could be supported by parsing the string as Groovy or Kotlin.

processor:
  - aggregate:
      identification_keys:
        - 'sourceIp'
        - 'destinationIp'
        - 'port'
      window_duration: 180
      data_path: data/aggregate
      action_source:
        language: groovy
        handleEvent: |
          groupState.putAll(event.getAsMap())
          return Optional.empty()
        concludeGroup: return Optional.of(Event.fromMap(groupState))

This is a feature which should be considered in the future if users of Data Prepper have much interest in it.

Users value having pre-defined aggregations so that they don’t have to re-writing similar code. If this is added later, it would complement the proposed design of having a pluggable AggregateAction.

Aggregation Persistence

This RFC only includes in-memory storage of the group state information. A future extension could allow the aggregate plugin to use a configurable store. This can help for groups which must have a window which is over a few minutes. Some likely options are local disk, Redis, or DynamoDB. Additionally, Redis or DynamoDB would help in scenarios where nodes leave or enter the cluster for rebalancing stored group state.

Default Aggregations

Are there any aggregations which are so common that Data Prepper should have available as part of the Aggregate plugin? This design includes deduplication and merging as possible candidates for defaults. Default implementations would be distributed along with the aggregate plugin.

@dlvenable dlvenable added untriaged proposal Proposed major changes to Data Prepper labels Dec 3, 2021
@dlvenable dlvenable mentioned this issue Dec 3, 2021
17 tasks
@graytaylor0 graytaylor0 self-assigned this Jan 5, 2022
@dlvenable
Copy link
Member Author

For persisted windows, one scenario to consider is the window time when Data Prepper restarts. What should the window time be for existing groups? I see three possible scenarios: 1) Use the initial window start time; 2) Start the window duration over; and 3) attempt to track the remaining time per window before shutting down and use whatever time remains.

Option 1 will not work well for short windows. This is because the window may have expired during the restart. So Data Prepper immediately closes the group, even if new messages are waiting. But, option 2 may not work well if window durations are very large. If a window duration is 24 hours and Data Prepper restarts a few times then windows will never close.

Despite this possible issue, I suggest that Data Prepper initially solve this issue with approach 2. We shouldn't assume that Data Prepper will restart very often even during long windows. Also, we can make this a future configuration option so that pipeline authors can determine the behavior. But, for simplicity, I think 2 is a good start.

Option 3 would be a good balance, but may be slightly more complicated since it requires updating the remaining time each time a group is processed. I think it is a good approach, but don't believe the added complexity is completely necessary for the first iteration.

@dlvenable
Copy link
Member Author

Work on the implementation for this RFC started in #829. I'm closing this RFC now that time for comments is complete. We will track the feature development in #829.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Proposed major changes to Data Prepper
Projects
None yet
Development

No branches or pull requests

2 participants