diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java new file mode 100644 index 0000000000..9e80ab5791 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java @@ -0,0 +1,18 @@ +package org.opensearch.dataprepper.model.codec; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + +public interface InputCodec { + /** + * Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each + * {@link Record} loaded from the {@link InputStream}. + * + * @param inputStream The input stream for the source plugin(e.g. S3, Http, RssFeed etc) object + * @param eventConsumer The consumer which handles each event from the stream + */ + void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException; +} diff --git a/data-prepper-plugins/json-codec/src/main/java/org/opensearch/dataprepper/jsonCodec/JsonInputCodec.java b/data-prepper-plugins/json-codec/src/main/java/org/opensearch/dataprepper/jsonCodec/JsonInputCodec.java new file mode 100644 index 0000000000..e135a325df --- /dev/null +++ b/data-prepper-plugins/json-codec/src/main/java/org/opensearch/dataprepper/jsonCodec/JsonInputCodec.java @@ -0,0 +1,59 @@ +package org.opensearch.dataprepper.jsonCodec; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +@DataPrepperPlugin(name = "json", pluginType = InputCodec.class) +public class JsonInputCodec implements InputCodec { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JsonFactory jsonFactory = new JsonFactory(); + + @Override + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + final JsonParser jsonParser = jsonFactory.createParser(inputStream); + + while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { + parseRecordsArray(jsonParser, eventConsumer); + } + } + + } + + private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { + while (jsonParser.nextToken() != JsonToken.END_ARRAY) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + + final Record record = createRecord(innerJson); + eventConsumer.accept(record); + } + } + + private Record createRecord(final Map json) { + final JacksonEvent event = JacksonLog.builder() + .withData(json) + .build(); + + return new Record<>(event); + } + +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/parquetInputCodec/parquetInputCodec.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/parquetInputCodec/parquetInputCodec.java new file mode 100644 index 0000000000..6fba09f05a --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/parquetInputCodec/parquetInputCodec.java @@ -0,0 +1,114 @@ +package org.opensearch.dataprepper.parquetInputCodec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.function.Consumer; + +@DataPrepperPlugin(name = "parquet", pluginType = InputCodec.class) +public class parquetInputCodec implements InputCodec { + + private static final String MESSAGE_FIELD_NAME = "message"; + + private static final Logger LOG = LoggerFactory.getLogger(parquetInputCodec.class); + + @DataPrepperPluginConstructor + public parquetInputCodec() { + + } + + @Override + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + parseParquetStream(inputStream, eventConsumer); + + } + + private void parseParquetStream(InputStream inputStream, Consumer> eventConsumer) throws IOException { + + // extracting parquet data in temporary file + File tempFile = File.createTempFile("parquet-data", ".parquet"); + Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + + // ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(tempFile.toURI())).build(); + + ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build()); + + ParquetMetadata footer = parquetFileReader.getFooter(); + MessageType schema = createdParquetSchema(footer); + List fields = schema.getFields(); + +// for (Type field : fields) { +// System.out.println(field.getName()); +// System.out.println(field.asPrimitiveType().getPrimitiveTypeName()); +// } + + List simpleGroups = new ArrayList<>(); + + PageReadStore pages; + while ((pages = parquetFileReader.readNextRowGroup()) != null) { + long rows = pages.getRowCount(); + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); + + // code to convert rows into events + final Map eventData = new HashMap<>(); + + + for (int i = 0; i < rows; i++) { + SimpleGroup simpleGroup = (SimpleGroup) recordReader.read(); + // converting it into eventData + eventData.put(MESSAGE_FIELD_NAME, ((SimpleGroup) recordReader.read()).toString()); + + // converting eventData into event and consumer accepting it + final Event event = JacksonLog.builder().withData(eventData).build(); + + eventConsumer.accept(new Record<>(event)); + + simpleGroups.add(simpleGroup); + } + } + parquetFileReader.close(); + +// reader.close(); +// for(int i = 0; i < simpleGroups.size(); i++) { +// System.out.println(simpleGroups.get(i)); +// } + + } + + private MessageType createdParquetSchema(ParquetMetadata parquetMetadata) { + MessageType schema = parquetMetadata.getFileMetaData().getSchema(); + return schema; + } + +} diff --git a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/parquetInputCodec/parquetInputCodecTest.java b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/parquetInputCodec/parquetInputCodecTest.java new file mode 100644 index 0000000000..ed68c52b77 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/parquetInputCodec/parquetInputCodecTest.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.parquetInputCodec; + +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.awt.*; +import java.io.InputStream; +import java.util.function.Consumer; + +@ExtendWith(MockitoExtension.class) +public class parquetInputCodecTest { + + @Mock + private Consumer> eventConsumer; + + static InputStream createParquetRandomStream(int numberOfColumns, int numberOfRecords) { + + MessageType schema = MessageTypeParser.parseMessageType("message schema { required int32 column1; required double column2; }"); + + GroupFactory groupFactory = new SimpleGroupFactory(schema); + //ParquetWriter writer = new ParquetWriter<>(); + // + + return null; + } + + + +} diff --git a/settings.gradle b/settings.gradle index 6db995999c..857139af59 100644 --- a/settings.gradle +++ b/settings.gradle @@ -58,4 +58,8 @@ include 'release:docker' include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' +include 'data-prepper-plugins:json-codec' +findProject(':data-prepper-plugins:json-codec')?.name = 'json-codec' +include 'data-prepper-plugins:parquet-codecs' +findProject(':data-prepper-plugins:parquet-codecs')?.name = 'parquet-codecs'