Skip to content

Commit

Permalink
-Support for Source Codecs (#2519)
Browse files Browse the repository at this point in the history
-Support for Source Codecs
Signed-off-by: umairofficial <[email protected]>

---------

Co-authored-by: umairofficial <[email protected]>
  • Loading branch information
umayr-codes and umairofficial authored Apr 18, 2023
1 parent 8ad4d8c commit 5dadde4
Show file tree
Hide file tree
Showing 22 changed files with 133 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,22 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.model.codec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;

/**
* A codec parsing data through an input stream. Each implementation of this class should
* support parsing a specific type or format of data. See sub-classes for examples.
*/
public interface Codec {
public interface InputCodec {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for the S3 object
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.csv;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,15 +30,15 @@
import java.util.function.Consumer;

/**
* An implementation of {@link Codec} which parses CSV records into fields.
* An implementation of {@link InputCodec} which parses CSV records into fields.
*/
@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class)
public class CsvCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class);
private final CsvCodecConfig config;
@DataPrepperPlugin(name = "csv", pluginType = InputCodec.class, pluginConfigurationType = CsvInputCodecConfig.class)
public class CsvInputCodec implements InputCodec {
private static final Logger LOG = LoggerFactory.getLogger(CsvInputCodec.class);
private final CsvInputCodecConfig config;

@DataPrepperPluginConstructor
public CsvCodec(final CsvCodecConfig config) {
public CsvInputCodec(CsvInputCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.csv;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
Expand All @@ -12,9 +12,9 @@
import java.util.Objects;

/**
* Configuration class for {@link CsvCodec}.
* Configuration class for {@link CsvInputCodec}.
*/
public class CsvCodecConfig {
public class CsvInputCodecConfig {
static final String DEFAULT_DELIMITER = ",";
static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote
static final Boolean DEFAULT_DETECT_HEADER = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.csv;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
Expand Down Expand Up @@ -50,17 +50,17 @@
@ExtendWith(MockitoExtension.class)
public class CsvCodecTest {
@Mock
private CsvCodecConfig config;
private CsvInputCodecConfig config;
@Mock
private Consumer<Record<Event>> eventConsumer;
private CsvCodec csvCodec;
private CsvCodec createObjectUnderTest() {
return new CsvCodec(config);
private CsvInputCodec csvCodec;
private CsvInputCodec createObjectUnderTest() {
return new CsvInputCodec(config);
}

@BeforeEach
void setup() {
CsvCodecConfig defaultCsvCodecConfig = new CsvCodecConfig();
CsvInputCodecConfig defaultCsvCodecConfig = new CsvInputCodecConfig();
lenient().when(config.getDelimiter()).thenReturn(defaultCsvCodecConfig.getDelimiter());
lenient().when(config.getQuoteCharacter()).thenReturn(defaultCsvCodecConfig.getQuoteCharacter());
lenient().when(config.getHeader()).thenReturn(defaultCsvCodecConfig.getHeader());
Expand Down
20 changes: 20 additions & 0 deletions data-prepper-plugins/newline-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id 'java'
}


repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.newline;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -20,14 +21,14 @@
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "newline", pluginType = Codec.class, pluginConfigurationType = NewlineDelimitedConfig.class)
public class NewlineDelimitedCodec implements Codec {
@DataPrepperPlugin(name = "newline", pluginType = InputCodec.class, pluginConfigurationType = NewlineDelimitedInputConfig.class)
public class NewlineDelimitedInputCodec implements InputCodec {
private static final String MESSAGE_FIELD_NAME = "message";
private final int skipLines;
private final String headerDestination;

@DataPrepperPluginConstructor
public NewlineDelimitedCodec(final NewlineDelimitedConfig config) {
public NewlineDelimitedInputCodec(final NewlineDelimitedInputConfig config) {
Objects.requireNonNull(config);
skipLines = config.getSkipLines();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.newline;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
Expand All @@ -13,7 +13,7 @@
/**
* Configuration class for the newline delimited codec.
*/
public class NewlineDelimitedConfig {
public class NewlineDelimitedInputConfig {
private int skipLines = 0;

@JsonProperty("header_destination")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.newline;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -32,17 +32,18 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verifyNoInteractions;


@ExtendWith(MockitoExtension.class)
class NewlineDelimitedCodecTest {

@Mock
private NewlineDelimitedConfig config;
private NewlineDelimitedInputConfig config;

private NewlineDelimitedCodec createObjectUnderTest() {
return new NewlineDelimitedCodec(config);
private NewlineDelimitedInputCodec createObjectUnderTest() {
return new NewlineDelimitedInputCodec(config);
}

@Test
Expand All @@ -54,7 +55,7 @@ void constructor_throws_if_config_is_null() {

@Test
void constructor_throws_if_header_destination_is_empty() throws NoSuchFieldException, IllegalAccessException {
final NewlineDelimitedConfig objectUnderTest = new NewlineDelimitedConfig();
final NewlineDelimitedInputConfig objectUnderTest = new NewlineDelimitedInputConfig();
reflectivelySetField(objectUnderTest, "headerDestination", "");
assertThat(objectUnderTest.isValidHeaderDestination(), equalTo(false));
}
Expand Down Expand Up @@ -242,12 +243,12 @@ private String generateMultilineString(final List<String> numberOfLines) {
return stringWriter.toString();
}

private void reflectivelySetField(final NewlineDelimitedConfig newlineDelimitedConfig, final String fieldName, final Object value)
private void reflectivelySetField(final NewlineDelimitedInputConfig newlineDelimitedInputConfig, final String fieldName, final Object value)
throws NoSuchFieldException, IllegalAccessException {
final Field field = NewlineDelimitedConfig.class.getDeclaredField(fieldName);
final Field field = NewlineDelimitedInputConfig.class.getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(newlineDelimitedConfig, value);
field.set(newlineDelimitedInputConfig, value);
} finally {
field.setAccessible(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -22,10 +23,11 @@
import java.util.function.Consumer;

/**
* An implementation of {@link Codec} which parses JSON objects for arrays.
* An implementation of {@link InputCodec} which parses JSON Objects for arrays.
*/
@DataPrepperPlugin(name = "json", pluginType = Codec.class)
public class JsonCodec implements Codec {
@DataPrepperPlugin(name = "json", pluginType = InputCodec.class)
public class JsonInputCodec implements InputCodec {

private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

Expand All @@ -42,6 +44,7 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
parseRecordsArray(jsonParser, eventConsumer);
}
}

}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
Expand All @@ -60,4 +63,5 @@ private Record<Event> createRecord(final Map<String, Object> json) {

return new Record<>(event);
}

}
Loading

0 comments on commit 5dadde4

Please sign in to comment.