From 2ed9c7c443d42c859e8694fd0a414fff37e845ac Mon Sep 17 00:00:00 2001 From: umairofficial Date: Fri, 19 May 2023 19:17:42 +0530 Subject: [PATCH 1/4] -Support for Source Codecs Signed-off-by: umairofficial --- data-prepper-plugins/avro-codecs/build.gradle | 7 --- .../plugins/codec/avro/AvroInputCodec.java | 39 +++++++------ .../codec/avro/AvroInputCodecTest.java | 55 +++++++++++++++++-- 3 files changed, 72 insertions(+), 29 deletions(-) diff --git a/data-prepper-plugins/avro-codecs/build.gradle b/data-prepper-plugins/avro-codecs/build.gradle index 2948725b91..dfc373d370 100644 --- a/data-prepper-plugins/avro-codecs/build.gradle +++ b/data-prepper-plugins/avro-codecs/build.gradle @@ -2,13 +2,6 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -plugins { - id 'java' -} - -repositories { - mavenCentral() -} dependencies { implementation project(path: ':data-prepper-api') diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java index feb0908de7..3ef4eb56af 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java @@ -60,14 +60,10 @@ private void parseAvroStream(final InputStream inputStream, final Consumer eventData = new HashMap<>(); GenericRecord avroRecord= stream.next(); - for(Schema.Field field : schema.getFields()) { - Object value=decodeValueIfEncoded(avroRecord.get(field.name())); - eventData.put(field.name(), value); + final Map eventData = convertRecordToMap(avroRecord, schema); - } final Event event = JacksonLog.builder().withData(eventData).build(); eventConsumer.accept(new Record<>(event)); } @@ -78,20 +74,31 @@ private void parseAvroStream(final InputStream inputStream, final Consumer convertRecordToMap(GenericRecord record, Schema schema) throws Exception { + + final Map eventData = new HashMap<>(); + + for(Schema.Field field : schema.getFields()){ + + Object value = record.get(field.name()); + + if(value instanceof GenericRecord){ + Schema schemaOfNestedRecord = ((GenericRecord) value).getSchema(); + value = convertRecordToMap((GenericRecord) value, schemaOfNestedRecord); } - else if(rawValue instanceof GenericEnumSymbol || rawValue instanceof GenericData.EnumSymbol || rawValue instanceof GenericFixed || rawValue instanceof GenericRecord){ - throw new Exception("The Avro codec does not support this data type presently"); + + else if(value instanceof GenericEnumSymbol || value instanceof GenericData.EnumSymbol){ + value = value.toString(); } - return rawValue; - } - catch (Exception e){ - return rawValue; + + else if(value instanceof Utf8){ + byte[] utf8Bytes = value.toString().getBytes("UTF-8"); + value = new String(utf8Bytes, "UTF-8"); + } + + eventData.put(field.name(), value); } + return eventData; } } \ No newline at end of file diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java index 2fbd12031d..64e912db16 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java @@ -8,9 +8,11 @@ import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; import org.junit.Test; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; @@ -166,18 +168,40 @@ private static Object decodeOutputIfEncoded(Map encodedOutput, Object key){ } } - private static Event getEvent(int index){ + private static Event getEvent(int index) throws Exception { List recordList=generateRecords(parseSchema(),numberOfRecords); GenericRecord record=recordList.get(index); - Schema schema=parseSchema(); + Schema schema = record.getSchema(); + final Map eventData = convertRecordToMap(record, schema); + final Event event = JacksonLog.builder().withData(eventData).build(); + return event; + } + + private static Map convertRecordToMap(GenericRecord record, Schema schema) throws Exception { + final Map eventData = new HashMap<>(); - for(Schema.Field field : schema.getFields()) { - eventData.put(field.name(), record.get(field.name())); + for(Schema.Field field : schema.getFields()){ + + Object value = record.get(field.name()); + if(value instanceof GenericRecord){ + Schema schemaOfNestedRecord = ((GenericRecord) value).getSchema(); + value = convertRecordToMap((GenericRecord) value, schemaOfNestedRecord); + } + + else if(value instanceof GenericEnumSymbol){ + value = value.toString(); + } + + else if(value instanceof Utf8){ + byte[] utf8Bytes = value.toString().getBytes("UTF-8"); + value = new String(utf8Bytes, "UTF-8"); + } + + eventData.put(field.name(), value); } - final Event event = JacksonLog.builder().withData(eventData).build(); - return event; + return eventData; } @@ -211,9 +235,13 @@ private static List generateRecords(Schema schema, int numberOfRe for(int rows = 0; rows < numberOfRecords; rows++){ GenericRecord record = new GenericData.Record(schema); + GenericRecord innerRecord = new GenericData.Record(parseInnerSchemaForNestedRecord()); + innerRecord.put("firstFieldInNestedRecord", "testString"+rows); + innerRecord.put("secondFieldInNestedRecord", rows); record.put("name", "Person"+rows); record.put("age", rows); + record.put("nestedRecord", innerRecord); recordList.add((record)); } @@ -224,14 +252,29 @@ private static List generateRecords(Schema schema, int numberOfRe private static Schema parseSchema() { + Schema innerSchema=parseInnerSchemaForNestedRecord(); return SchemaBuilder.record("Person") .fields() .name("name").type().stringType().noDefault() .name("age").type().intType().noDefault() + .name("nestedRecord").type(innerSchema).noDefault() .endRecord(); } + private static Schema parseInnerSchemaForNestedRecord(){ + return SchemaBuilder + .record("InnerRecord") + .fields() + .name("firstFieldInNestedRecord") + .type(Schema.create(Schema.Type.STRING)) + .noDefault() + .name("secondFieldInNestedRecord") + .type(Schema.create(Schema.Type.INT)) + .noDefault() + .endRecord(); + } + private static InputStream createInvalidAvroStream() { return new ByteArrayInputStream(INVALID_AVRO_INPUT_STREAM.getBytes()); } From 09df7eda8e344dc8dea4a73479d4d7b74adb6c50 Mon Sep 17 00:00:00 2001 From: umairofficial Date: Fri, 26 May 2023 22:16:18 +0530 Subject: [PATCH 2/4] -Support for Source Codecs Signed-off-by: umairofficial --- .../plugins/codec/avro/AvroInputCodec.java | 1 - .../codec/avro/AvroInputCodecTest.java | 76 ++++--------------- 2 files changed, 16 insertions(+), 61 deletions(-) diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java index 3ef4eb56af..4bb218fe45 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java @@ -9,7 +9,6 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericFixed; import org.apache.avro.file.DataFileStream; import org.apache.avro.util.Utf8; diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java index 64e912db16..5ef977d7b5 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodecTest.java @@ -8,11 +8,9 @@ import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.util.Utf8; import org.junit.Test; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; @@ -24,16 +22,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventType; -import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; -import org.json.JSONObject; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.FileInputStream; import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -141,67 +136,28 @@ public void test_HappyCaseAvroInputStream_then_callsConsumerWithParsedEvents(fin assertThat(actualRecord.getData().getMetadata(),notNullValue()); assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); - Map expectedMap=getEvent(index).toMap(); - assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + Map expectedMap=new HashMap<>(); + GenericRecord record=generateRecords(parseSchema(),numberOfRecords).get(index); + for(Schema.Field field:record.getSchema().getFields()){ + expectedMap.put(field.name(),record.get(field.name())); + } - for(Object key: actualRecord.getData().toMap().keySet()){ - Object decodedOutput = decodeOutputIfEncoded(actualRecord.getData().toMap() , key); - Object expectedOutput = getEvent(index).toMap().get(key.toString()); - assertThat(decodedOutput, equalTo(expectedOutput)); + for(String key: expectedMap.keySet()){ + Object actualRecordValue=actualRecord.getData().toMap().get(key); + if(!(actualRecordValue instanceof Map)) + assertThat(actualRecord.getData().toMap().get(key), equalTo(expectedMap.get(key))); + else{ + GenericRecord expectedInnerRecord= (GenericRecord) expectedMap.get(key); + Schema innerSchema=expectedInnerRecord.getSchema(); + for(Schema.Field innerField : innerSchema.getFields()){ + assertThat(((Map)actualRecordValue).get(innerField.name()),equalTo(expectedInnerRecord.get(innerField.name()))); + } + } } index++; } fileInputStream.close(); Files.delete(path); - - } - - private static Object decodeOutputIfEncoded(Map encodedOutput, Object key){ - try{ - JSONObject outputJson = new JSONObject(encodedOutput); - Map innerJson= (Map) outputJson.get(key.toString()); - byte[] encodedString=(byte[]) innerJson.get("bytes"); - return new String(encodedString, StandardCharsets.UTF_8); - - }catch (Exception e){ - return encodedOutput.get(key); - } - } - - private static Event getEvent(int index) throws Exception { - List recordList=generateRecords(parseSchema(),numberOfRecords); - GenericRecord record=recordList.get(index); - Schema schema = record.getSchema(); - final Map eventData = convertRecordToMap(record, schema); - final Event event = JacksonLog.builder().withData(eventData).build(); - return event; - } - - private static Map convertRecordToMap(GenericRecord record, Schema schema) throws Exception { - - final Map eventData = new HashMap<>(); - - for(Schema.Field field : schema.getFields()){ - - Object value = record.get(field.name()); - - if(value instanceof GenericRecord){ - Schema schemaOfNestedRecord = ((GenericRecord) value).getSchema(); - value = convertRecordToMap((GenericRecord) value, schemaOfNestedRecord); - } - - else if(value instanceof GenericEnumSymbol){ - value = value.toString(); - } - - else if(value instanceof Utf8){ - byte[] utf8Bytes = value.toString().getBytes("UTF-8"); - value = new String(utf8Bytes, "UTF-8"); - } - - eventData.put(field.name(), value); - } - return eventData; } From 04131b7f800497e4594efd3c7b6cac199c1117b8 Mon Sep 17 00:00:00 2001 From: umairofficial Date: Wed, 31 May 2023 13:50:21 +0530 Subject: [PATCH 3/4] -Support for Sink Codecs Signed-off-by: umairofficial --- .../dataprepper/model/codec/OutputCodec.java | 28 +++++++ .../plugins/codec/avro/AvroOutputCodec.java | 36 +++++++++ .../codec/avro/AvroOutputCodecTest.java | 8 ++ .../plugins/codec/avro/Avrotest.java | 73 +++++++++++++++++++ .../plugins/codec/csv/CsvOutputCodec.java | 32 ++++++++ .../plugins/codec/csv/CsvOutputCodecTest.java | 8 ++ .../newline/NewlineDelimitedOutputCodec.java | 36 +++++++++ .../NewlineDelimitedOutputCodecTest.java | 8 ++ .../plugins/codec/json/JsonOutputCodec.java | 33 +++++++++ .../codec/json/JsonOutputCodecTest.java | 8 ++ 10 files changed, 270 insertions(+) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java create mode 100644 data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java create mode 100644 data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java create mode 100644 data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java create mode 100644 data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java create mode 100644 data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java create mode 100644 data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java create mode 100644 data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java create mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java create mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java new file mode 100644 index 0000000000..8ec1b68594 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.codec; + +import org.opensearch.dataprepper.model.event.Event; + +import java.io.IOException; +import java.io.OutputStream; + +public interface OutputCodec { + /** + * converts a data-prepper {@link Event} into x-format of data to be loaded into any sink + * + * @param outputStream Underlying stream into which Data-prepper events are written into + * @throws IOException throws IOException when invalid or incompatible event comes up + */ + + void start(OutputStream outputStream); + + void complete(OutputStream outputStream) throws IOException; + + void writeEvent(Event event, OutputStream outputStream) throws IOException; + + String getExtension(); +} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java new file mode 100644 index 0000000000..5fd54ea56a --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.avro; + +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; + +import java.io.OutputStream; +import java.util.Objects; + +public class AvroOutputCodec implements OutputCodec { + + @Override + public void start(OutputStream outputStream) { + Objects.requireNonNull(outputStream); + + + } + + @Override + public void complete(OutputStream outputStream) { + + } + + @Override + public void writeEvent(Event event, OutputStream outputStream) { + + } + + @Override + public String getExtension() { + return null; + } +} diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java new file mode 100644 index 0000000000..96b40e0a74 --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.avro; + +public class AvroOutputCodecTest { +} diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java new file mode 100644 index 0000000000..ac73e0d9dc --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java @@ -0,0 +1,73 @@ +package org.opensearch.dataprepper.plugins.codec.avro; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +public class Avrotest { + public static void main(String[] args) throws IOException { + Schema schema = parseSchema(); + DatumWriter datumWriter = new GenericDatumWriter(schema); + ByteArrayOutputStream outputStream=new ByteArrayOutputStream(); + + DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.create(schema, outputStream); + System.out.println("EXECUTED!"); + for(GenericRecord record: generateRecords(schema,100)){ + dataFileWriter.append(record); + } + dataFileWriter.close(); + final byte[] avroData=outputStream.toByteArray(); + ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(avroData); + DataFileStream stream = new DataFileStream(byteArrayInputStream, new GenericDatumReader()); + Schema schema1=stream.getSchema(); + + while (stream.hasNext()) { + + GenericRecord avroRecord= stream.next(); + + for(Schema.Field field: avroRecord.getSchema().getFields()){ + System.out.print(" Key: "+ field.name()+" Value: "+avroRecord.get(field.name())); + } + System.out.println(); + } + + + } + private static Schema parseSchema() { + return SchemaBuilder.record("Person") + .fields() + .name("name").type().stringType().noDefault() + .name("age").type().intType().noDefault() + .endRecord(); + + } + + private static List generateRecords(Schema schema, int numberOfRecords) { + + List recordList = new ArrayList<>(); + + for(int rows = 0; rows < numberOfRecords; rows++){ + + GenericRecord record = new GenericData.Record(schema); + + record.put("name", "Person"+rows); + record.put("age", rows); + recordList.add((record)); + + } + + return recordList; + + } +} diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java new file mode 100644 index 0000000000..c2f63f361c --- /dev/null +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.csv; + +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; + +import java.io.OutputStream; + +public class CsvOutputCodec implements OutputCodec { + @Override + public void start(OutputStream outputStream) { + + } + + @Override + public void complete(OutputStream outputStream) { + + } + + @Override + public void writeEvent(Event event, OutputStream outputStream) { + + } + + @Override + public String getExtension() { + return null; + } +} diff --git a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java new file mode 100644 index 0000000000..ef388bca1b --- /dev/null +++ b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.csv; + +public class CsvOutputCodecTest { +} diff --git a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java new file mode 100644 index 0000000000..afa404cf33 --- /dev/null +++ b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.newline; + +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +public class NewlineDelimitedOutputCodec implements OutputCodec { + @Override + public void start(OutputStream outputStream) { + Objects.requireNonNull(outputStream); + } + + @Override + public void complete(OutputStream outputStream) throws IOException { + outputStream.close(); + } + + @Override + public void writeEvent(Event event, OutputStream outputStream) throws IOException { + final byte[] byteArr = event.toJsonString().getBytes(); + outputStream.write(byteArr); + outputStream.write("\n".getBytes()); + } + + @Override + public String getExtension() { + return null; + } +} diff --git a/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java new file mode 100644 index 0000000000..10fa143dcb --- /dev/null +++ b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.newline; + +public class NewlineDelimitedOutputCodecTest { +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java new file mode 100644 index 0000000000..7c89b44d0e --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.json; + +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; + +import java.io.OutputStream; + +public class JsonOutputCodec implements OutputCodec { + @Override + public void start(OutputStream outputStream) { + + + } + + @Override + public void complete(OutputStream outputStream) { + + } + + @Override + public void writeEvent(Event event, OutputStream outputStream) { + + } + + @Override + public String getExtension() { + return null; + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java new file mode 100644 index 0000000000..d1f4581fe7 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.json; + +public class JsonOutputCodecTest { +} From 3956d3944927d38bef3b02a38a34b6f8379fbcaa Mon Sep 17 00:00:00 2001 From: umairofficial Date: Thu, 1 Jun 2023 12:42:09 +0530 Subject: [PATCH 4/4] -Support for Sink Codecs Signed-off-by: umairofficial --- .../dataprepper/model/codec/OutputCodec.java | 28 ------- .../plugins/codec/avro/AvroOutputCodec.java | 36 --------- .../codec/avro/AvroOutputCodecTest.java | 8 -- .../plugins/codec/avro/Avrotest.java | 73 ------------------- .../plugins/codec/csv/CsvOutputCodec.java | 32 -------- .../plugins/codec/csv/CsvOutputCodecTest.java | 8 -- .../newline/NewlineDelimitedOutputCodec.java | 36 --------- .../NewlineDelimitedOutputCodecTest.java | 8 -- .../plugins/codec/json/JsonOutputCodec.java | 33 --------- .../codec/json/JsonOutputCodecTest.java | 8 -- 10 files changed, 270 deletions(-) delete mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java delete mode 100644 data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java delete mode 100644 data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java delete mode 100644 data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java delete mode 100644 data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java delete mode 100644 data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java delete mode 100644 data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java delete mode 100644 data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java delete mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java delete mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java deleted file mode 100644 index 8ec1b68594..0000000000 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.model.codec; - -import org.opensearch.dataprepper.model.event.Event; - -import java.io.IOException; -import java.io.OutputStream; - -public interface OutputCodec { - /** - * converts a data-prepper {@link Event} into x-format of data to be loaded into any sink - * - * @param outputStream Underlying stream into which Data-prepper events are written into - * @throws IOException throws IOException when invalid or incompatible event comes up - */ - - void start(OutputStream outputStream); - - void complete(OutputStream outputStream) throws IOException; - - void writeEvent(Event event, OutputStream outputStream) throws IOException; - - String getExtension(); -} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java deleted file mode 100644 index 5fd54ea56a..0000000000 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.avro; - -import org.opensearch.dataprepper.model.codec.OutputCodec; -import org.opensearch.dataprepper.model.event.Event; - -import java.io.OutputStream; -import java.util.Objects; - -public class AvroOutputCodec implements OutputCodec { - - @Override - public void start(OutputStream outputStream) { - Objects.requireNonNull(outputStream); - - - } - - @Override - public void complete(OutputStream outputStream) { - - } - - @Override - public void writeEvent(Event event, OutputStream outputStream) { - - } - - @Override - public String getExtension() { - return null; - } -} diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java deleted file mode 100644 index 96b40e0a74..0000000000 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.avro; - -public class AvroOutputCodecTest { -} diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java deleted file mode 100644 index ac73e0d9dc..0000000000 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/Avrotest.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.opensearch.dataprepper.plugins.codec.avro; - -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; - -import java.io.*; -import java.util.ArrayList; -import java.util.List; - -public class Avrotest { - public static void main(String[] args) throws IOException { - Schema schema = parseSchema(); - DatumWriter datumWriter = new GenericDatumWriter(schema); - ByteArrayOutputStream outputStream=new ByteArrayOutputStream(); - - DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); - dataFileWriter.create(schema, outputStream); - System.out.println("EXECUTED!"); - for(GenericRecord record: generateRecords(schema,100)){ - dataFileWriter.append(record); - } - dataFileWriter.close(); - final byte[] avroData=outputStream.toByteArray(); - ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(avroData); - DataFileStream stream = new DataFileStream(byteArrayInputStream, new GenericDatumReader()); - Schema schema1=stream.getSchema(); - - while (stream.hasNext()) { - - GenericRecord avroRecord= stream.next(); - - for(Schema.Field field: avroRecord.getSchema().getFields()){ - System.out.print(" Key: "+ field.name()+" Value: "+avroRecord.get(field.name())); - } - System.out.println(); - } - - - } - private static Schema parseSchema() { - return SchemaBuilder.record("Person") - .fields() - .name("name").type().stringType().noDefault() - .name("age").type().intType().noDefault() - .endRecord(); - - } - - private static List generateRecords(Schema schema, int numberOfRecords) { - - List recordList = new ArrayList<>(); - - for(int rows = 0; rows < numberOfRecords; rows++){ - - GenericRecord record = new GenericData.Record(schema); - - record.put("name", "Person"+rows); - record.put("age", rows); - recordList.add((record)); - - } - - return recordList; - - } -} diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java deleted file mode 100644 index c2f63f361c..0000000000 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.csv; - -import org.opensearch.dataprepper.model.codec.OutputCodec; -import org.opensearch.dataprepper.model.event.Event; - -import java.io.OutputStream; - -public class CsvOutputCodec implements OutputCodec { - @Override - public void start(OutputStream outputStream) { - - } - - @Override - public void complete(OutputStream outputStream) { - - } - - @Override - public void writeEvent(Event event, OutputStream outputStream) { - - } - - @Override - public String getExtension() { - return null; - } -} diff --git a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java deleted file mode 100644 index ef388bca1b..0000000000 --- a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.csv; - -public class CsvOutputCodecTest { -} diff --git a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java deleted file mode 100644 index afa404cf33..0000000000 --- a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.newline; - -import org.opensearch.dataprepper.model.codec.OutputCodec; -import org.opensearch.dataprepper.model.event.Event; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Objects; - -public class NewlineDelimitedOutputCodec implements OutputCodec { - @Override - public void start(OutputStream outputStream) { - Objects.requireNonNull(outputStream); - } - - @Override - public void complete(OutputStream outputStream) throws IOException { - outputStream.close(); - } - - @Override - public void writeEvent(Event event, OutputStream outputStream) throws IOException { - final byte[] byteArr = event.toJsonString().getBytes(); - outputStream.write(byteArr); - outputStream.write("\n".getBytes()); - } - - @Override - public String getExtension() { - return null; - } -} diff --git a/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java deleted file mode 100644 index 10fa143dcb..0000000000 --- a/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.newline; - -public class NewlineDelimitedOutputCodecTest { -} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java deleted file mode 100644 index 7c89b44d0e..0000000000 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.json; - -import org.opensearch.dataprepper.model.codec.OutputCodec; -import org.opensearch.dataprepper.model.event.Event; - -import java.io.OutputStream; - -public class JsonOutputCodec implements OutputCodec { - @Override - public void start(OutputStream outputStream) { - - - } - - @Override - public void complete(OutputStream outputStream) { - - } - - @Override - public void writeEvent(Event event, OutputStream outputStream) { - - } - - @Override - public String getExtension() { - return null; - } -} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java deleted file mode 100644 index d1f4581fe7..0000000000 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.codec.json; - -public class JsonOutputCodecTest { -}