diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index 8526a04c9125..3a57d033343d 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -36,7 +36,9 @@ import java.nio.ByteBuffer; import java.util.EnumSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Function; @@ -164,7 +166,7 @@ private Object transformValue(final Object field) } else if (field instanceof Utf8) { return field.toString(); } else if (field instanceof List) { - return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList()); + return ((List) field).stream().filter(Objects::nonNull).map(this::transformValue).collect(Collectors.toList()); } else if (field instanceof GenericEnumSymbol) { return field.toString(); } else if (field instanceof GenericFixed) { @@ -173,6 +175,20 @@ private Object transformValue(final Object field) } else { return ((GenericFixed) field).bytes(); } + } else if (field instanceof Map) { + LinkedHashMap retVal = new LinkedHashMap<>(); + Map fieldMap = (Map) field; + for (Map.Entry entry : fieldMap.entrySet()) { + retVal.put(String.valueOf(entry.getKey()), transformValue(entry.getValue())); + } + return retVal; + } else if (field instanceof GenericRecord) { + LinkedHashMap retVal = new LinkedHashMap<>(); + GenericRecord record = (GenericRecord) field; + for (Schema.Field key : record.getSchema().getFields()) { + retVal.put(key.name(), transformValue(record.get(key.pos()))); + } + return retVal; } return field; } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java index 6a37a0bbb548..b04bcca61bbe 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.avro.Schema; @@ -64,6 +65,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -325,7 +327,46 @@ static void assertInputRowCorrect(InputRow inputRow, List expectedDimens inputRow.getDimension("someStringArray") ); + final Object someRecordArrayObj = inputRow.getRaw("someRecordArray"); + Assert.assertNotNull(someRecordArrayObj); + Assert.assertTrue(someRecordArrayObj instanceof List); + Assert.assertEquals(1, ((List) someRecordArrayObj).size()); + final Object recordArrayElementObj = ((List) someRecordArrayObj).get(0); + Assert.assertNotNull(recordArrayElementObj); + Assert.assertTrue(recordArrayElementObj instanceof LinkedHashMap); + LinkedHashMap recordArrayElement = (LinkedHashMap) recordArrayElementObj; + Assert.assertEquals("string in record", recordArrayElement.get("nestedString")); } + + final Object someIntValueMapObj = inputRow.getRaw("someIntValueMap"); + Assert.assertNotNull(someIntValueMapObj); + Assert.assertTrue(someIntValueMapObj instanceof LinkedHashMap); + LinkedHashMap someIntValueMap = (LinkedHashMap) someIntValueMapObj; + Assert.assertEquals(4, someIntValueMap.size()); + Assert.assertEquals(1, someIntValueMap.get("1")); + Assert.assertEquals(2, someIntValueMap.get("2")); + Assert.assertEquals(4, someIntValueMap.get("4")); + Assert.assertEquals(8, someIntValueMap.get("8")); + + + final Object someStringValueMapObj = inputRow.getRaw("someStringValueMap"); + Assert.assertNotNull(someStringValueMapObj); + Assert.assertTrue(someStringValueMapObj instanceof LinkedHashMap); + LinkedHashMap someStringValueMap = (LinkedHashMap) someStringValueMapObj; + Assert.assertEquals(4, someStringValueMap.size()); + Assert.assertEquals("1", someStringValueMap.get("1")); + Assert.assertEquals("2", someStringValueMap.get("2")); + Assert.assertEquals("4", someStringValueMap.get("4")); + Assert.assertEquals("8", someStringValueMap.get("8")); + + + final Object someRecordObj = inputRow.getRaw("someRecord"); + Assert.assertNotNull(someRecordObj); + Assert.assertTrue(someRecordObj instanceof LinkedHashMap); + LinkedHashMap someRecord = (LinkedHashMap) someRecordObj; + Assert.assertEquals(4892, someRecord.get("subInt")); + Assert.assertEquals(1543698L, someRecord.get("subLong")); + // towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality Assert.assertEquals(1, inputRow.getDimension("someIntValueMap").size()); Assert.assertEquals( @@ -369,7 +410,7 @@ public Integer apply(@Nullable String input) ); Assert.assertEquals(Collections.singletonList(String.valueOf(MyEnum.ENUM1)), inputRow.getDimension("someEnum")); Assert.assertEquals( - Collections.singletonList(String.valueOf(SOME_RECORD_VALUE)), + Collections.singletonList(ImmutableMap.of("subInt", 4892, "subLong", 1543698L).toString()), inputRow.getDimension("someRecord") ); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java index e7d503ab9715..1174d2684fad 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.avro; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; @@ -29,10 +30,12 @@ import org.junit.Test; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; public class AvroFlattenerMakerTest { @@ -214,8 +217,13 @@ private void getRootField_common(final SomeAvroDatum record, final AvroFlattener record.getSomeEnum().toString(), flattener.getRootField(record, "someEnum") ); + Map map = new HashMap<>(); + record.getSomeRecord() + .getSchema() + .getFields() + .forEach(field -> map.put(field.name(), record.getSomeRecord().get(field.name()))); Assert.assertEquals( - record.getSomeRecord(), + map, flattener.getRootField(record, "someRecord") ); Assert.assertEquals( @@ -230,8 +238,17 @@ private void getRootField_common(final SomeAvroDatum record, final AvroFlattener record.getSomeFloat(), flattener.getRootField(record, "someFloat") ); - Assert.assertEquals( - record.getSomeRecordArray(), + List> list = new ArrayList<>(); + for (GenericRecord genericRecord : record.getSomeRecordArray()) { + Map map1 = new HashMap<>(); + genericRecord + .getSchema() + .getFields() + .forEach(field -> map1.put(field.name(), genericRecord.get(field.name()))); + list.add(map1); + } + Assert.assertEquals( + list, flattener.getRootField(record, "someRecordArray") ); } @@ -328,8 +345,13 @@ private void makeJsonPathExtractor_common(final SomeAvroDatum record, final Avro record.getSomeEnum().toString(), flattener.makeJsonPathExtractor("$.someEnum").apply(record) ); + Map map = new HashMap<>(); + record.getSomeRecord() + .getSchema() + .getFields() + .forEach(field -> map.put(field.name(), record.getSomeRecord().get(field.name()))); Assert.assertEquals( - record.getSomeRecord(), + map, flattener.makeJsonPathExtractor("$.someRecord").apply(record) ); Assert.assertEquals( @@ -344,8 +366,19 @@ private void makeJsonPathExtractor_common(final SomeAvroDatum record, final Avro record.getSomeFloat(), flattener.makeJsonPathExtractor("$.someFloat").apply(record) ); + + List> list = new ArrayList<>(); + for (GenericRecord genericRecord : record.getSomeRecordArray()) { + Map map1 = new HashMap<>(); + genericRecord + .getSchema() + .getFields() + .forEach(field -> map1.put(field.name(), genericRecord.get(field.name()))); + list.add(map1); + } + Assert.assertEquals( - record.getSomeRecordArray(), + list, flattener.makeJsonPathExtractor("$.someRecordArray").apply(record) ); @@ -355,7 +388,7 @@ private void makeJsonPathExtractor_common(final SomeAvroDatum record, final Avro ); Assert.assertEquals( - record.getSomeRecordArray(), + list, flattener.makeJsonPathExtractor("$.someRecordArray[?(@.nestedString)]").apply(record) );