Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Document based ingest routing #63798

Closed
ruflin opened this issue Oct 16, 2020 · 7 comments · Fixed by #76511
Closed

[Feature] Document based ingest routing #63798

ruflin opened this issue Oct 16, 2020 · 7 comments · Fixed by #76511
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement Team:Data Management Meta label for data/management team

Comments

@ruflin
Copy link
Member

ruflin commented Oct 16, 2020

Problem

On the Ingest side many different dataset are available. Today our story of building integrations assumes that a single source only produces one data streams. One log file contains the access logs for nginx and other one contains the error logs for nginx and they are not mixed. This allows us to send each to different data streams with a specific ingest pipeline which knows how to process the events.

Unfortunately this is the ideal case and there are multiple scenarios where a single source can contain a mix of datasets. This happens in the logs and the metrics use case. A few examples:

  • Docker logs: Docker only allows to log to stderr or stdout. If there are more than 2 log files, the data will be mixed together.
  • Prometheus metrics: A single prometheus collector might collect metrics from many prometheus endpoints
  • Kinesis: Getting data out of kinesis mixes many different datasets
  • Syslog: Many services send their data a single syslog input

The routing of an event to the right destination could be done on the edge or centrally. This feature request is around having centrally managed routing as it would work well in the context of the integrations installed by Fleet. Having it centrally has the advantage that when new integrations are installed or updated, no updates of all the Agents are needed to add new routing information and it also works well in the case of standalone agents.

Story

In the following I’m going through a story on how I could imagine document based ingest routing to work. This is NOT a proposal for the API or how it should be designed, but it should help to better explain on how it would be used from Fleet.

The following story uses syslog as a data sink input and all data is sent to syslog.

Add routing rules

A user gets started with Elastic Agent by enrolling into Fleet. The user starts collecting logs from syslog and all the logs go into logs-generic-default. The services that send logs to syslog are nginx and mysql. Based on some ML detection rules, Fleet recognises that the logs being shipped to logs-generic-default contain logs for nginx and mysql. It requests the user to install these two packages which the user happily does.

On the Fleet side, this installs all the assets like dashboards, ingest pipelines and templates for nginx and mysql. But on the config side nothing changes as it still just pulls in data from syslog. Each package contains some ingest routing rules. These routing rules are added to logs-generic-* to be applied to all data which lands there. Each package can contain a list of conditions. For nginx the following happens:

PUT _routing/logs-generic-*
{
  “id”: “nginx”
  “condition”: “ctx.input.type”==”syslog” && contains(“file.path”,”nginx”)
  // Data should be routed to the nginx data stream within the same namespace
  “target”: “logs-nginx-${data_stream.namespace}”
}

As soon as the above routing rule is added, all documents coming into logs-generic-* are matched against this new rule. If no match is found, the documents are ingested into logs-generic-default as before. If a match is found, the documents are routed to the target data stream. If the user now also installs the mysql package, one more condition is added to the routing table for logs-generic-*:

PUT _routing/logs-generic-*
{
  “id”: “mysql”
  “condition”: “ctx.input.type”==”syslog” && contains(“file.path”,”mysql”)
  “target”: “logs-mysql-${data_stream.namespace}”
}

The nginx package did not only contain routing rules for logs-generic-* but also for logs-nginx-* to split up access and error logs:

PUT _routing/logs-nginx-*
{
  “id”: “nginx.access”
  // This is a condition that a certain grok pattern matches, example is just made up and not "correct" syntax
  “condition”: grok(“ctx.message”, %{TIMESTAMP_ISO8601}) == true
  “target”: “logs-nginx.access-${data_stream.namespace}”
}

PUT _routing/logs-nginx-*
{
  “id”: “nginx.error”
  “condition”: grok(“ctx.message”, %{IP}) == true)
  “target”: “logs-nginx.error-${data_stream.namespace}”
}

All the data that is forwarded to logs-nginx-* is now going through these additional routing conditions. If no condition matches, data goes into logs-nginx-*. As logs-nginx.access-default and logs-nginx.error-default already contain an ingest pipeline as part of the mapping, the processing of the event will happen as expected. This works exactly as when data would have been sent to logs-nginx.acess-* data streams directly.

In some cases, it might be needed that logs-nginx-* data streams already contain an ingest pipeline to do some preprocessing on all events. This preprocessing could be needed to simplify the conditions. An other use case with pipelines is that the logs-nginx-* data streams want to add a final pipeline that should be run in the end.

Remove routing rules

The user now has fully processed nginx and mysql logs. At some stage, the user decides to remove the nginx package as the usage of nginx service has stopped and it is not needed anymore. Removing a package means removing all ingest pipelines, templates and in this case also routing rules:

DELETE _routing/logs-generic-*
{
  “id”: “nginx”
}
DELETE _routing/logs-nginx-*
{
  “id”: “nginx.access”
}
DELETE _routing/logs-nginx-*
{
  “id”: “nginx.error”
}

This deletes the routing rule for nginx. If any future data for nginx comes in, it will just be routed to “logs-generic-default” again as no rule matches.

Additional notes

It might be, that some routing rules will need a priority to make sure they are applied before others. In case of two rules applying to a single data, the first or last one should win.

It is possible today to do some basic routing with ingest pipeline by rewriting the _index field in an ingest pipeline. But this requires to modify ingest pipelines every time a package is installed / removed. An initial idea was to simplify this by potentially having support for multiple ingest pipelines per data stream so only pipelines need to be added or removed instead of modified but this seems more like a workaround.

@cbuescher cbuescher added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement labels Oct 19, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features (:Core/Features/Ingest)

@elasticmachine elasticmachine added the Team:Data Management Meta label for data/management team label Oct 19, 2020
@leehinman
Copy link

Adding 2 other use cases.

Splunk REST API: If we need to pull data from Splunk REST API it would be nice to be able to route data to specific ingest pipelines based on fields in the data. For example if sourcetype = apache_error then send the document through the Apache error pipeline, if sourcetype = mysqld_error send to the mysql error pipeline. If we had that we could have one config for connecting to Splunk REST API, and one set of routing mappings.

Kafka: Specific logs can be sent to specific topics or several log types could be multiplexed onto one topic. Either way it would be nice to configure Kafka once and then specify the rules to route documents to particular pipelines in one place.

@Bernhard-Fluehmann
Copy link

Adding yet another use case.

Routing based on query result: It is somtimes required to route logs based on criteria which is stored in a separate source, like an elasticsearch index. An example would be to set the target based on the result of a query against an index with some inventory data. An idea would be to integrate enrichment

@felixbarny
Copy link
Member

felixbarny commented Apr 11, 2022

I would like to propose a different approach to solving the problem. Instead of adding a first-class concept and API around routing to Elasticsearch, this is proposal is based mostly on existing primitives: Ingest pipelines, default index pipelines, and index templates. IMO, this makes it easier to reason about how a document gets processed and routed as there’s a single place to look at - the default pipeline of an index.

The only new concept needed for this is a data_stream_router processor. I’ve created a POC of this processor here: #76511.
The data_stream_router processor overrides the _index field based on static values or data_stream.* values from the document. It terminates the pipeline when it is executed, so that at most one data_stream_router processor is ever executed within a pipeline, simulating an if, else-if, else-if, … chaining of the routing conditions.

Here’s the same example you added about a routing of logs from logs-generic-* to logs-nginx-* to logs-nginx.access-* or logs-nginx.error- *:

Click to expand example
PUT _index_template/logs-generic
{
  "index_patterns": ["logs-generic-*"],
  "data_stream": { },
  "priority": 500,
  "template": {
    "settings": {
      "index.default_pipeline": "logs-generic"
    }
  }
}

PUT _ingest/pipeline/logs-generic
{
  "processors": [
    {
      "data_stream_router": {
        "tag": "nginx",
        "if": "ctx.input?.type == 'syslog' && contains('file.path','nginx')",
        "type": "logs",
        "dataset": "nginx"
      }
    }
  ]
}

PUT _index_template/logs-nginx
{
  "index_patterns": ["logs-nginx-*"],
  "data_stream": { },
  "priority": 500,
  "template": {
    "settings": {
      "index.default_pipeline": "logs-nginx"
    }
  }
}

PUT _ingest/pipeline/logs-nginx
{
  "processors": [
    {
      "data_stream_router": {
        "tag": "nginx.access",
        "if": "grok('ctx.message', %{TIMESTAMP_ISO8601}) == true",
        "type": "logs",
        "dataset": "nginx.access"
      }
    },
    {
      "data_stream_router": {
        "tag": "nginx.error",
        "if": "grok('ctx.message', %{IP}) == true",
        "type": "logs",
        "dataset": "nginx.error"
      }
    }
  ]
}

The approach also relates to my suggestion for specifying multiple/custom ingest pipelines for a data stream (#61185 (comment)) and the proposed convenience API that lets you put/upsert to or remove a processor from a pipeline. However, to keep the examples simple, I’ll not conflate them in this proposal and just mention that they are compatible and work best when combined, yet they are decoupled and standalone.

@felixbarny felixbarny linked a pull request Apr 11, 2022 that will close this issue
@ruflin
Copy link
Member Author

ruflin commented Apr 11, 2022

What you propose above will likely work @felixbarny The part I worry is that there is no Elasticsearch native API on top of it. "Someone" (likely Fleet) will have to manage the list of processors added. If a user manually modifies some of the pipelines or adds its own rules and removes a part by accident, things will break.

The problem here is I think similar to alias with indices vs data streams. Alias with indices behind it works, as long as users don't wipe the alias by accident. If they to, there are troubles ahead. Having data streams as a native concept in Elasticsearch will make sure we can improve the user experience and the user does not have to care about what happens behind a data stream.

The data stream processor PR you are putting together is great. The part I'm concerned is that we keep adding stack features to solve the problem instead of having a simple API / solution we can expose to the user and then we can keep improving how it works behind the scene.

@felixbarny
Copy link
Member

If a user manually modifies some of the pipelines or adds its own rules and removes a part by accident, things will break.

True, but my default instinct would be to enhance the existing mechanisms, for example by restricting who is allowed to modify a pipeline, rather than creating a new primitive that's very similar to an existing one.

The part I'm concerned is that we keep adding stack features to solve the problem instead of having a simple API / solution we can expose to the user and then we can keep improving how it works behind the scene.

Got it. The part I'm personally worried about more is the cost of introducing a new primitive vs improving an existing one. Having several primitives that have a lot of overlap but do slightly different things makes it really hard to understand what's going on.

For ingest pipelines, we have an extensive management UI and simulate functionality. If we would implement event routing as a dedicated primitive, we'd need to re-implement something like that again. If we use ingest pipelines as an implementation detail for a dedicated event routing API, the implementation details would leak through in a way that wouldn't let us change the implementation afterwards. That's because users would want to see, customize, and simulate the rules. Something that we can offer them with ingest pipelines.

Generally, I rather have a simple but flexible set of primitives that can be used to build complex systems. But there's a tripping point where flexibility turns into shoehorning. Finding out where the tripping point is, that's the hard part. But IMHO, in this particular case, I don't see why the existing primitives can't be enhanced to support event routing. And I think before we can consider creating a new primitive, we need to try to make it work with the existing primitives, or at least think trough the implications of doing so. If the end result looks like a shoehorn, we have good arguments for creating a new primitive.

@ruflin
Copy link
Member Author

ruflin commented Apr 19, 2023

Thank you @felixbarny @dakrone @joegallo for making this happen ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement Team:Data Management Meta label for data/management team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants