Skip to content

Commit

Permalink
Merge pull request #1 from mahesh724/parquetCodec
Browse files Browse the repository at this point in the history
Parquet codec code update
  • Loading branch information
mahesh724 authored Feb 14, 2023
2 parents 9009741 + ff8867c commit 52ea369
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.model.codec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;

public interface InputCodec {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for the source plugin(e.g. S3, Http, RssFeed etc) object
* @param eventConsumer The consumer which handles each event from the stream
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.opensearch.dataprepper.jsonCodec;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "json", pluginType = InputCodec.class)
public class JsonInputCodec implements InputCodec {

private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {

Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, eventConsumer);
}
}

}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson);
eventConsumer.accept(record);
}
}

private Record<Event> createRecord(final Map<String, Object> json) {
final JacksonEvent event = JacksonLog.builder()
.withData(json)
.build();

return new Record<>(event);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package org.opensearch.dataprepper.parquetInputCodec;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "parquet", pluginType = InputCodec.class)
public class parquetInputCodec implements InputCodec {

private static final String MESSAGE_FIELD_NAME = "message";

private static final Logger LOG = LoggerFactory.getLogger(parquetInputCodec.class);

@DataPrepperPluginConstructor
public parquetInputCodec() {

}

@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {

Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

parseParquetStream(inputStream, eventConsumer);

}

private void parseParquetStream(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {

// extracting parquet data in temporary file
File tempFile = File.createTempFile("parquet-data", ".parquet");
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);

// ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(tempFile.toURI())).build();

ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build());

ParquetMetadata footer = parquetFileReader.getFooter();
MessageType schema = createdParquetSchema(footer);
List<Type> fields = schema.getFields();

// for (Type field : fields) {
// System.out.println(field.getName());
// System.out.println(field.asPrimitiveType().getPrimitiveTypeName());
// }

List<SimpleGroup> simpleGroups = new ArrayList<>();

PageReadStore pages;
while ((pages = parquetFileReader.readNextRowGroup()) != null) {
long rows = pages.getRowCount();
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));

// code to convert rows into events
final Map<String, String> eventData = new HashMap<>();


for (int i = 0; i < rows; i++) {
SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
// converting it into eventData
eventData.put(MESSAGE_FIELD_NAME, ((SimpleGroup) recordReader.read()).toString());

// converting eventData into event and consumer accepting it
final Event event = JacksonLog.builder().withData(eventData).build();

eventConsumer.accept(new Record<>(event));

simpleGroups.add(simpleGroup);
}
}
parquetFileReader.close();

// reader.close();
// for(int i = 0; i < simpleGroups.size(); i++) {
// System.out.println(simpleGroups.get(i));
// }

}

private MessageType createdParquetSchema(ParquetMetadata parquetMetadata) {
MessageType schema = parquetMetadata.getFileMetaData().getSchema();
return schema;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.dataprepper.parquetInputCodec;

import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.awt.*;
import java.io.InputStream;
import java.util.function.Consumer;

@ExtendWith(MockitoExtension.class)
public class parquetInputCodecTest {

@Mock
private Consumer<Record<Event>> eventConsumer;

static InputStream createParquetRandomStream(int numberOfColumns, int numberOfRecords) {

MessageType schema = MessageTypeParser.parseMessageType("message schema { required int32 column1; required double column2; }");

GroupFactory groupFactory = new SimpleGroupFactory(schema);
//ParquetWriter<Group> writer = new ParquetWriter<>();
//

return null;
}



}
4 changes: 4 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ include 'release:docker'
include 'release:maven'
include 'e2e-test:peerforwarder'
include 'rss-source'
include 'data-prepper-plugins:json-codec'
findProject(':data-prepper-plugins:json-codec')?.name = 'json-codec'
include 'data-prepper-plugins:parquet-codecs'
findProject(':data-prepper-plugins:parquet-codecs')?.name = 'parquet-codecs'

0 comments on commit 52ea369

Please sign in to comment.