Skip to content

Commit

Permalink
Merge pull request #2 from mahesh724/parquetCodec
Browse files Browse the repository at this point in the history
Support for Source Codecs opensearch-project#1532
  • Loading branch information
mahesh724 authored Feb 27, 2023
2 parents 987b50c + 950ebe1 commit 3b77164
Show file tree
Hide file tree
Showing 25 changed files with 335 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.csvinputcodec;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
Expand All @@ -12,7 +12,7 @@
import java.util.Objects;

/**
* Configuration class for {@link CsvCodec}.
* Configuration class for {@link CsvInputCodec}.
*/
public class CsvCodecConfig {
static final String DEFAULT_DELIMITER = ",";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.csvinputcodec;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,15 +30,15 @@
import java.util.function.Consumer;

/**
* An implementation of {@link Codec} which parses CSV records into fields.
* An implementation of {@link InputCodec} which parses CSV records into fields.
*/
@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class)
public class CsvCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class);
@DataPrepperPlugin(name = "Csv", pluginType = InputCodec.class, pluginConfigurationType = CsvCodecConfig.class)
public class CsvInputCodec implements InputCodec {
private static final Logger LOG = LoggerFactory.getLogger(CsvInputCodec.class);
private final CsvCodecConfig config;

@DataPrepperPluginConstructor
public CsvCodec(final CsvCodecConfig config) {
public CsvInputCodec(CsvCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.csvinputcodec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvParser;
Expand All @@ -20,40 +17,33 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.function.Consumer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
public class CsvCodecTest {
@Mock
private CsvCodecConfig config;
@Mock
private Consumer<Record<Event>> eventConsumer;
private CsvCodec csvCodec;
private CsvCodec createObjectUnderTest() {
return new CsvCodec(config);
private CsvInputCodec csvCodec;
private CsvInputCodec createObjectUnderTest() {
return new CsvInputCodec(config);
}

@BeforeEach
Expand Down
18 changes: 18 additions & 0 deletions data-prepper-plugins/newline-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id 'java'
}


repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.newlineinputcodec;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -20,14 +21,14 @@
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "newline", pluginType = Codec.class, pluginConfigurationType = NewlineDelimitedConfig.class)
public class NewlineDelimitedCodec implements Codec {
@DataPrepperPlugin(name = "Newline", pluginType = InputCodec.class, pluginConfigurationType = NewlineDelimitedInputConfig.class)
public class NewlineDelimitedInputCodec implements InputCodec {
private static final String MESSAGE_FIELD_NAME = "message";
private final int skipLines;
private final String headerDestination;

@DataPrepperPluginConstructor
public NewlineDelimitedCodec(final NewlineDelimitedConfig config) {
public NewlineDelimitedInputCodec(final NewlineDelimitedInputConfig config) {
Objects.requireNonNull(config);
skipLines = config.getSkipLines();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.newlineinputcodec;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
Expand All @@ -13,7 +13,7 @@
/**
* Configuration class for the newline delimited codec.
*/
public class NewlineDelimitedConfig {
public class NewlineDelimitedInputConfig {
private int skipLines = 0;

@JsonProperty("header_destination")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.newlineinputcodec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -31,18 +31,16 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class NewlineDelimitedCodecTest {

@Mock
private NewlineDelimitedConfig config;
private NewlineDelimitedInputConfig config;

private NewlineDelimitedCodec createObjectUnderTest() {
return new NewlineDelimitedCodec(config);
private NewlineDelimitedInputCodec createObjectUnderTest() {
return new NewlineDelimitedInputCodec(config);
}

@Test
Expand All @@ -54,7 +52,7 @@ void constructor_throws_if_config_is_null() {

@Test
void constructor_throws_if_header_destination_is_empty() throws NoSuchFieldException, IllegalAccessException {
final NewlineDelimitedConfig objectUnderTest = new NewlineDelimitedConfig();
final NewlineDelimitedInputConfig objectUnderTest = new NewlineDelimitedInputConfig();
reflectivelySetField(objectUnderTest, "headerDestination", "");
assertThat(objectUnderTest.isValidHeaderDestination(), equalTo(false));
}
Expand Down Expand Up @@ -242,12 +240,12 @@ private String generateMultilineString(final List<String> numberOfLines) {
return stringWriter.toString();
}

private void reflectivelySetField(final NewlineDelimitedConfig newlineDelimitedConfig, final String fieldName, final Object value)
private void reflectivelySetField(final NewlineDelimitedInputConfig newlineDelimitedInputConfig, final String fieldName, final Object value)
throws NoSuchFieldException, IllegalAccessException {
final Field field = NewlineDelimitedConfig.class.getDeclaredField(fieldName);
final Field field = NewlineDelimitedInputConfig.class.getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(newlineDelimitedConfig, value);
field.set(newlineDelimitedInputConfig, value);
} finally {
field.setAccessible(false);
}
Expand Down
Binary file not shown.
26 changes: 26 additions & 0 deletions data-prepper-plugins/parquet-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'java'
}


repositories {
mavenCentral()
maven { url 'https://jitpack.io'}
}

dependencies {
implementation project(':data-prepper-api')
implementation 'org.apache.parquet:parquet-hadoop:1.12.0'
implementation 'org.apache.hadoop:hadoop-common:3.3.3'
implementation 'org.apache.parquet:parquet-avro:1.10.1'
implementation("org.apache.avro:avro:1.9.0")
implementation 'org.apache.hadoop:hadoop-core:1.2.1'

testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}
Loading

0 comments on commit 3b77164

Please sign in to comment.