Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Codecs | Avro Codec follow-on PR #2715

Merged
merged 5 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions data-prepper-plugins/avro-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(path: ':data-prepper-api')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,10 @@ private void parseAvroStream(final InputStream inputStream, final Consumer<Recor

while (stream.hasNext()) {

final Map<String, Object> eventData = new HashMap<>();
GenericRecord avroRecord= stream.next();

for(Schema.Field field : schema.getFields()) {
Object value=decodeValueIfEncoded(avroRecord.get(field.name()));
eventData.put(field.name(), value);
final Map<String, Object> eventData = convertRecordToMap(avroRecord, schema);

}
final Event event = JacksonLog.builder().withData(eventData).build();
eventConsumer.accept(new Record<>(event));
}
Expand All @@ -78,20 +74,31 @@ private void parseAvroStream(final InputStream inputStream, final Consumer<Recor
}
}

private static Object decodeValueIfEncoded(Object rawValue){
try{
if(rawValue instanceof Utf8){
byte[] utf8Bytes = rawValue.toString().getBytes("UTF-8");
return new String(utf8Bytes, "UTF-8");
private static Map<String, Object> convertRecordToMap(GenericRecord record, Schema schema) throws Exception {

final Map<String, Object> eventData = new HashMap<>();

for(Schema.Field field : schema.getFields()){

Object value = record.get(field.name());

if(value instanceof GenericRecord){
Schema schemaOfNestedRecord = ((GenericRecord) value).getSchema();
value = convertRecordToMap((GenericRecord) value, schemaOfNestedRecord);
}
else if(rawValue instanceof GenericEnumSymbol || rawValue instanceof GenericData.EnumSymbol || rawValue instanceof GenericFixed || rawValue instanceof GenericRecord){
throw new Exception("The Avro codec does not support this data type presently");

else if(value instanceof GenericEnumSymbol || value instanceof GenericData.EnumSymbol){
value = value.toString();
}
return rawValue;
}
catch (Exception e){
return rawValue;

else if(value instanceof Utf8){
byte[] utf8Bytes = value.toString().getBytes("UTF-8");
value = new String(utf8Bytes, "UTF-8");
}

eventData.put(field.name(), value);
}
return eventData;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -166,18 +168,40 @@ private static Object decodeOutputIfEncoded(Map encodedOutput, Object key){
}
}

private static Event getEvent(int index){
private static Event getEvent(int index) throws Exception {
List<GenericRecord> recordList=generateRecords(parseSchema(),numberOfRecords);
GenericRecord record=recordList.get(index);
Schema schema=parseSchema();
Schema schema = record.getSchema();
final Map<String, Object> eventData = convertRecordToMap(record, schema);
final Event event = JacksonLog.builder().withData(eventData).build();
return event;
}

private static Map<String, Object> convertRecordToMap(GenericRecord record, Schema schema) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are duplicating the logic from AvroInputCodec in the test. This is not a valid test scenario. You should start with known data coming in and verify it against expected data going out.

Please remove this entire function - convertRecordToMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is done @dlvenable !


final Map<String, Object> eventData = new HashMap<>();
for(Schema.Field field : schema.getFields()) {

eventData.put(field.name(), record.get(field.name()));
for(Schema.Field field : schema.getFields()){

Object value = record.get(field.name());

if(value instanceof GenericRecord){
Schema schemaOfNestedRecord = ((GenericRecord) value).getSchema();
value = convertRecordToMap((GenericRecord) value, schemaOfNestedRecord);
}

else if(value instanceof GenericEnumSymbol){
value = value.toString();
}

else if(value instanceof Utf8){
byte[] utf8Bytes = value.toString().getBytes("UTF-8");
value = new String(utf8Bytes, "UTF-8");
}

eventData.put(field.name(), value);
}
final Event event = JacksonLog.builder().withData(eventData).build();
return event;
return eventData;
}


Expand Down Expand Up @@ -211,9 +235,13 @@ private static List<GenericRecord> generateRecords(Schema schema, int numberOfRe
for(int rows = 0; rows < numberOfRecords; rows++){

GenericRecord record = new GenericData.Record(schema);
GenericRecord innerRecord = new GenericData.Record(parseInnerSchemaForNestedRecord());
innerRecord.put("firstFieldInNestedRecord", "testString"+rows);
innerRecord.put("secondFieldInNestedRecord", rows);

record.put("name", "Person"+rows);
record.put("age", rows);
record.put("nestedRecord", innerRecord);
recordList.add((record));

}
Expand All @@ -224,14 +252,29 @@ private static List<GenericRecord> generateRecords(Schema schema, int numberOfRe

private static Schema parseSchema() {

Schema innerSchema=parseInnerSchemaForNestedRecord();
return SchemaBuilder.record("Person")
.fields()
.name("name").type().stringType().noDefault()
.name("age").type().intType().noDefault()
.name("nestedRecord").type(innerSchema).noDefault()
.endRecord();

}

private static Schema parseInnerSchemaForNestedRecord(){
return SchemaBuilder
.record("InnerRecord")
.fields()
.name("firstFieldInNestedRecord")
.type(Schema.create(Schema.Type.STRING))
.noDefault()
.name("secondFieldInNestedRecord")
.type(Schema.create(Schema.Type.INT))
.noDefault()
.endRecord();
}

private static InputStream createInvalidAvroStream() {
return new ByteArrayInputStream(INVALID_AVRO_INPUT_STREAM.getBytes());
}
Expand Down