Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline DLQ #3857

Open
kkondaka opened this issue Dec 13, 2023 · 5 comments
Open

Pipeline DLQ #3857

kkondaka opened this issue Dec 13, 2023 · 5 comments
Labels
enhancement New feature or request Roadmap:Stability/Availability/Resiliency Project-wide roadmap label

Comments

@kkondaka
Copy link
Collaborator

Is your feature request related to a problem? Please describe.
Provide a way to send all failed events to a global/pipeline-level DLQ. Failed events any where in the pipeline (sources, processors, and sinks) are sent directly to this DLQ. This will eventually replace sink level DLQs we have today.

Describe the solution you'd like
Preferred solution (based on @dlvenable's initial thoughts and a discussion meeting)

  1. Option to define failure pipeline in the YAML file like
my-failure-pipeline:
  type: failure
  sink:
    - s3:
        bucket: "..."
        codec:
          ndjson:

And each sub-pipeline in the yaml may have an entry pointing to this as follows

sample-pipeline:
  failure-pipeline: my-failure-pipeline
  source:
     ...
  processor:
     ...
  sink:
     ...

In addition, there may be an option to have a default pipeline which is used if no failure pipeline is mentioned in a sub-pipeline

default-failure-pipeline:
  type: failure
  sink:
    - s3:
        bucket: "..."
        codec:
          ndjson:

And finally an implicit failure pipeline which is created without any entries in the YAML file. The implicit failure pipeline will send all failed events to stdout

This requires changes to code in many places and so it is better to introduce a new API (For example, executeWithFailures() API in processors which will return both output records and failed records). Data Prepper core code can then take the failed records and send them to appropriate failure pipeline (configured failure pipeline or default failure pipeline or implicit failure pipeline). Similarly new API at source and sink level maybe added. Once the API is added, code may be modified slowly so that all sources/sinks/processors use this new API.

Having a separate pipeline for failure, allows the same pipeline to be used by multiple pipelines. And also makes it possible to write sub-pipelines under it and do conditional routing etc.

Describe alternatives you've considered (Optional)
Instead of new API in processors/sources/sinks, we could have a global singleton for DLQEvents managed by DLQEventsManager which each source uses in its constructor and failed events are handed over to this DLQEventsManager which will route the events to failure pipeline. I think this approach is also ok.

Additional context
Add any other context or screenshots about the feature request here.

@jw-amazon
Copy link

If I understand correctly, right now, only sink failure are sent to DLQ, that might explains a bug I am seeing in my pipeline.

@dlvenable
Copy link
Member

If I understand correctly, right now, only sink failure are sent to DLQ, that might explains a bug I am seeing in my pipeline.

@jw-amazon , Yes, only sinks failures are sent to the DLQ currently.

@chenqi0805
Copy link
Collaborator

chenqi0805 commented Jan 24, 2024

My proposal of approach to this is different: We can define a dlq as extension such that any pipeline plugin can integrate with:
data-prepper-config.yaml

extensions:
    dlq:
       store:
            s3:
                bucket: test-bucket
                key_path_prefix: dlq/

In pipeline definition, user no longer need to explicitly specify dlq details within any plugin (except for enabling or disabling dlq). The plugin integration with DLQ extension will be taken care of in the code logic.

Alternatively, we can move DLQ out of the extensions and make it a standalone global config in data-prepper-config.yaml:

dlq:
     store:
          s3:
              bucket: test-bucket
              key_path_prefix: dlq/

@dlvenable
Copy link
Member

@chenqi0805 , The proposal in this issue creates a specific pipeline for handling failed events. This can allow for processing data before putting in the DLQ.

I think there could be some overlap between your idea of extensions and the pipeline DLQ. For one, this proposal has a default failure pipeline.

Perhaps that can be configurable via an extension.

extensions:
  dlq:
    default-failure-pipeline:
      type: failure
      sink:
      - s3:
          bucket: "..."
          codec:
            ndjson:

@dlvenable
Copy link
Member

In terms of implementation, I propose that we create a new interface in data-prepper-api: FailurePipeline.

public interface FailurePipeline {
  void sendFailedEvents(Collection<Record<Event>> events);
}

We may also want some way to include additional information on the failures.

Then, sinks or processors call write failed events. For example:

try {
  doSomething(events);
} catch(Exception e) {
  failurePipeline.sendFailedEvents(events);
}

Within data-prepper-core, this interface can have an implementation which acts as the source for the failure pipeline.

class FailurePipelineSource implements Source<Record<Event>>, FailurePipeline {

  private Buffer buffer;

  @Override
  public void start(Buffer buffer) {
    this.buffer = buffer;
  }

  @Override
  void sendFailedEvents(Collection<Record<Event>> events) {
    buffer.writeAll(events);
  }
}

When creating the pipeline in data-prepper-core, we can add an instance of FailurePipelineSource into the Pipeline class. I think that pipeline authors should be freed from having to think about writing the source: configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Roadmap:Stability/Availability/Resiliency Project-wide roadmap label
Projects
Status: New
Development

No branches or pull requests

4 participants