Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Codecs | Avro Codec follow-on PR #2715

Merged
merged 5 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions data-prepper-plugins/avro-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(path: ':data-prepper-api')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericFixed;

import org.apache.avro.file.DataFileStream;
import org.apache.avro.util.Utf8;
Expand Down Expand Up @@ -60,14 +59,10 @@ private void parseAvroStream(final InputStream inputStream, final Consumer<Recor

while (stream.hasNext()) {

final Map<String, Object> eventData = new HashMap<>();
GenericRecord avroRecord= stream.next();

for(Schema.Field field : schema.getFields()) {
Object value=decodeValueIfEncoded(avroRecord.get(field.name()));
eventData.put(field.name(), value);
final Map<String, Object> eventData = convertRecordToMap(avroRecord, schema);

}
final Event event = JacksonLog.builder().withData(eventData).build();
eventConsumer.accept(new Record<>(event));
}
Expand All @@ -78,20 +73,31 @@ private void parseAvroStream(final InputStream inputStream, final Consumer<Recor
}
}

private static Object decodeValueIfEncoded(Object rawValue){
try{
if(rawValue instanceof Utf8){
byte[] utf8Bytes = rawValue.toString().getBytes("UTF-8");
return new String(utf8Bytes, "UTF-8");
private static Map<String, Object> convertRecordToMap(GenericRecord record, Schema schema) throws Exception {

final Map<String, Object> eventData = new HashMap<>();

for(Schema.Field field : schema.getFields()){

Object value = record.get(field.name());

if(value instanceof GenericRecord){
Schema schemaOfNestedRecord = ((GenericRecord) value).getSchema();
value = convertRecordToMap((GenericRecord) value, schemaOfNestedRecord);
}
else if(rawValue instanceof GenericEnumSymbol || rawValue instanceof GenericData.EnumSymbol || rawValue instanceof GenericFixed || rawValue instanceof GenericRecord){
throw new Exception("The Avro codec does not support this data type presently");

else if(value instanceof GenericEnumSymbol || value instanceof GenericData.EnumSymbol){
value = value.toString();
}
return rawValue;
}
catch (Exception e){
return rawValue;

else if(value instanceof Utf8){
byte[] utf8Bytes = value.toString().getBytes("UTF-8");
value = new String(utf8Bytes, "UTF-8");
}

eventData.put(field.name(), value);
}
return eventData;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -139,45 +136,28 @@ public void test_HappyCaseAvroInputStream_then_callsConsumerWithParsedEvents(fin
assertThat(actualRecord.getData().getMetadata(),notNullValue());
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));

Map expectedMap=getEvent(index).toMap();
assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
Map<String,Object> expectedMap=new HashMap<>();
GenericRecord record=generateRecords(parseSchema(),numberOfRecords).get(index);
for(Schema.Field field:record.getSchema().getFields()){
expectedMap.put(field.name(),record.get(field.name()));
}

for(Object key: actualRecord.getData().toMap().keySet()){
Object decodedOutput = decodeOutputIfEncoded(actualRecord.getData().toMap() , key);
Object expectedOutput = getEvent(index).toMap().get(key.toString());
assertThat(decodedOutput, equalTo(expectedOutput));
for(String key: expectedMap.keySet()){
Object actualRecordValue=actualRecord.getData().toMap().get(key);
if(!(actualRecordValue instanceof Map))
assertThat(actualRecord.getData().toMap().get(key), equalTo(expectedMap.get(key)));
else{
GenericRecord expectedInnerRecord= (GenericRecord) expectedMap.get(key);
Schema innerSchema=expectedInnerRecord.getSchema();
for(Schema.Field innerField : innerSchema.getFields()){
assertThat(((Map)actualRecordValue).get(innerField.name()),equalTo(expectedInnerRecord.get(innerField.name())));
}
}
}
index++;
}
fileInputStream.close();
Files.delete(path);

}

private static Object decodeOutputIfEncoded(Map encodedOutput, Object key){
try{
JSONObject outputJson = new JSONObject(encodedOutput);
Map innerJson= (Map) outputJson.get(key.toString());
byte[] encodedString=(byte[]) innerJson.get("bytes");
return new String(encodedString, StandardCharsets.UTF_8);

}catch (Exception e){
return encodedOutput.get(key);
}
}

private static Event getEvent(int index){
List<GenericRecord> recordList=generateRecords(parseSchema(),numberOfRecords);
GenericRecord record=recordList.get(index);
Schema schema=parseSchema();
final Map<String, Object> eventData = new HashMap<>();
for(Schema.Field field : schema.getFields()) {

eventData.put(field.name(), record.get(field.name()));

}
final Event event = JacksonLog.builder().withData(eventData).build();
return event;
}


Expand Down Expand Up @@ -211,9 +191,13 @@ private static List<GenericRecord> generateRecords(Schema schema, int numberOfRe
for(int rows = 0; rows < numberOfRecords; rows++){

GenericRecord record = new GenericData.Record(schema);
GenericRecord innerRecord = new GenericData.Record(parseInnerSchemaForNestedRecord());
innerRecord.put("firstFieldInNestedRecord", "testString"+rows);
innerRecord.put("secondFieldInNestedRecord", rows);

record.put("name", "Person"+rows);
record.put("age", rows);
record.put("nestedRecord", innerRecord);
recordList.add((record));

}
Expand All @@ -224,14 +208,29 @@ private static List<GenericRecord> generateRecords(Schema schema, int numberOfRe

private static Schema parseSchema() {

Schema innerSchema=parseInnerSchemaForNestedRecord();
return SchemaBuilder.record("Person")
.fields()
.name("name").type().stringType().noDefault()
.name("age").type().intType().noDefault()
.name("nestedRecord").type(innerSchema).noDefault()
.endRecord();

}

private static Schema parseInnerSchemaForNestedRecord(){
return SchemaBuilder
.record("InnerRecord")
.fields()
.name("firstFieldInNestedRecord")
.type(Schema.create(Schema.Type.STRING))
.noDefault()
.name("secondFieldInNestedRecord")
.type(Schema.create(Schema.Type.INT))
.noDefault()
.endRecord();
}

private static InputStream createInvalidAvroStream() {
return new ByteArrayInputStream(INVALID_AVRO_INPUT_STREAM.getBytes());
}
Expand Down