Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support generic parsers/codecs #1532

Closed
dlvenable opened this issue Jun 23, 2022 · 4 comments · Fixed by #2519, #2527 or #2715
Closed

Support generic parsers/codecs #1532

dlvenable opened this issue Jun 23, 2022 · 4 comments · Fixed by #2519, #2527 or #2715
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@dlvenable
Copy link
Member

Is your feature request related to a problem? Please describe.

Both the S3 Source and HTTP Source use similar concepts of codecs for parsing input data. The S3 Source currently makes these codecs available as plugins. So they can be extended for the S3 source. But, if another source wanted to use these plugins it would be unable to.

Describe the solution you'd like

Create a core concept in Data Prepper of source-based codecs or parsers. These should be generic enough to take any Java InputStream and produce events from them.

I propose that we based this concept on the S3 codec. It has a few advantages:

  1. It uses an InputStream. This is advantageous for large inputs2.
  2. It has a Consumer for each event. This also allows the source using the codec to receive Event objects and decide independently of the Codec of the best way to handle these.
  3. It is not connected to HTTP in anyway.

Describe alternatives you've considered (Optional)

Data Prepper can have a similar concept for output codecs/parsers. However, I see no reason to force these to be the same concept. (Implementors may choose to pair them together to avoid code duplication).

Additional context

S3 Codec interface:

public interface Codec {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for the S3 object
* @param eventConsumer The consumer which handles each event from the stream
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;

HTTP Codec interface:

public interface Codec<T> {
/**
* parse the request into custom type
*
* @param httpData The content of the original HTTP request
* @return The target data type
*/
T parse(HttpData httpData) throws IOException;

@dlvenable
Copy link
Member Author

Here is a concept for how Data Prepper can provide this to pipeline authors.

Define Interfaces in data-prepper-api

The source codec interface can exist in the data-prepper-api package. In this way, any source will have access to the interface. None of the implementations should be in data-prepper-api.

It might look like the following.

public interface InputCodec {
  void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException; 
}

Create plugin projects for each type

Under data-prepper-plugins, create new projects which support different implementations based on the type. These projects could also have sink codecs in them.

Taking CSV as an example, we could have a project: data-prepper-plugins/csv-codecs. It would be able to have both the CSV source codec and the CSV sink codec. In this way, they can share some logic and dependencies.

Use the plugin framework for loading codecs

The Data Prepper plugin framework supports arbitrary interfaces. This can follow the same pattern as HTTP authentication in Armeria.

Following the CSV example, we might have the following class in data-prepper-plugins/csv-codec:

@DataPrepperPlugin(name = "csv",
        pluginType = InputCodec.class,
        pluginConfigurationType = CsvCodecConfig.class)
public CsvInputCodec implements InputCodec {
 ...
}

Sources load using the plugin framework

The S3 source, for example, can load from the plugin framework similar to how the HTTP source loads authentication. Unlike HTTP, the S3 source should not have a default value.

@dlvenable
Copy link
Member Author

Based on some of the changes coming to support this, I think the interface should have some modifications to support additional flexibility.

Here are some of the things that we may need InputCodec implementations to also need.

  • The content length of the stream. If the Parquet input codec needs to read all the data, parse, and then create events, it could be useful to have the total content length. In this way, the Parquet input codec could use an in-memory byte array for smaller files and a file system for larger files.
  • Two forms of exception callbacks:
    • When the whole stream cannot be parsed (say it is not JSON), then the parse method should throw an exception. This is already in place.
    • When a single event cannot be parsed (say a single invalid JSON object in an array), have a method to return an error to the Source that a single event could not be parsed.
  • A context. We may wish to re-use a single file for the Parquet input codec. The Source can set this value to the pipeline name. Or if it uses multiple threads, the pipeline name plus an identifier.
public class InputCodecStream {
  public InputStream getInputStream();
  /**
   * Optional. If provided, has the total length of the content supplied.
   */
  public Integer getContentLength();
}

public class InputCodecContext {
  public Consumer<Record<Event>> getEventConsumer();

  /*
   * Gets a `Consumer` for handling any errors parsing individual events.
   * This is called when a single event cannot be parsed from the stream.
   */
  public Consumer<Exception> getEventExceptionConsumer();

  /**
   * Represents a name that is unique across calls and also not called for multiple calls in parallel.
   * This could be used by the Parquet codec to re-use a file.
   */
  public String getContextName();
}

public interface InputCodec {
  void parse(InputCodecStream inputCodecStream, InputCodecContext inputCodecContext) throws IOException; 
}

Thoughts on this approach? @kkondaka , @graytaylor0 , @umairofficial

@github-project-automation github-project-automation bot moved this from In progress to To do in Data Prepper Tracking Board Apr 24, 2023
@dlvenable
Copy link
Member Author

I'm re-opening this issue to improve the interface before we release 2.3.

@dlvenable
Copy link
Member Author

In order to support Parquet codecs, we may need to update the codec interface to attempt to support seekable input.

Ideally, this means we have two forms of codecs - a base codec, and a seekable codec. It is possible that not all sources will support the means of choosing bytes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment