Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop leaking Avro objects from parser #12828

Merged
merged 4 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@

import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -164,7 +166,7 @@ private Object transformValue(final Object field)
} else if (field instanceof Utf8) {
return field.toString();
} else if (field instanceof List) {
return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
return ((List<?>) field).stream().filter(Objects::nonNull).map(this::transformValue).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could possibly generate new output for a list which has ByteBuffer in it. But I think if anyone is using this then their data might look absurd since doing a toString on ByteBuffer doesn't return the backed bytes.

} else if (field instanceof GenericEnumSymbol) {
return field.toString();
} else if (field instanceof GenericFixed) {
Expand All @@ -173,6 +175,20 @@ private Object transformValue(final Object field)
} else {
return ((GenericFixed) field).bytes();
}
} else if (field instanceof Map) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should use like if (avroJsonProvider.isArray(field)) ... andif (avroJsonProvider.isMap(field)) ...
There are also methods like avroJsonProvider.length, avroJsonProvider.getArrayIndex, avroJsonProvider.getPropertyKeys, avroJsonProvider.getMapValue etc. which could be used to extract stuff, though the array one doesn't seem that useful since it just handles List anyway.

LinkedHashMap<String, Object> retVal = new LinkedHashMap<>();
Map<?, ?> fieldMap = (Map<?, ?>) field;
for (Map.Entry<?, ?> entry : fieldMap.entrySet()) {
retVal.put(String.valueOf(entry.getKey()), transformValue(entry.getValue()));
}
return retVal;
} else if (field instanceof GenericRecord) {
LinkedHashMap<String, Object> retVal = new LinkedHashMap<>();
GenericRecord record = (GenericRecord) field;
for (Schema.Field key : record.getSchema().getFields()) {
retVal.put(key.name(), transformValue(record.get(key.pos())));
}
return retVal;
}
return field;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -325,7 +327,46 @@ static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimens
inputRow.getDimension("someStringArray")
);

final Object someRecordArrayObj = inputRow.getRaw("someRecordArray");
Assert.assertNotNull(someRecordArrayObj);
Assert.assertTrue(someRecordArrayObj instanceof List);
Assert.assertEquals(1, ((List) someRecordArrayObj).size());
final Object recordArrayElementObj = ((List) someRecordArrayObj).get(0);
Assert.assertNotNull(recordArrayElementObj);
Assert.assertTrue(recordArrayElementObj instanceof LinkedHashMap);
LinkedHashMap recordArrayElement = (LinkedHashMap) recordArrayElementObj;
Assert.assertEquals("string in record", recordArrayElement.get("nestedString"));
}

final Object someIntValueMapObj = inputRow.getRaw("someIntValueMap");
Assert.assertNotNull(someIntValueMapObj);
Assert.assertTrue(someIntValueMapObj instanceof LinkedHashMap);
LinkedHashMap someIntValueMap = (LinkedHashMap) someIntValueMapObj;
Assert.assertEquals(4, someIntValueMap.size());
Assert.assertEquals(1, someIntValueMap.get("1"));
Assert.assertEquals(2, someIntValueMap.get("2"));
Assert.assertEquals(4, someIntValueMap.get("4"));
Assert.assertEquals(8, someIntValueMap.get("8"));


final Object someStringValueMapObj = inputRow.getRaw("someStringValueMap");
Assert.assertNotNull(someStringValueMapObj);
Assert.assertTrue(someStringValueMapObj instanceof LinkedHashMap);
LinkedHashMap someStringValueMap = (LinkedHashMap) someStringValueMapObj;
Assert.assertEquals(4, someStringValueMap.size());
Assert.assertEquals("1", someStringValueMap.get("1"));
Assert.assertEquals("2", someStringValueMap.get("2"));
Assert.assertEquals("4", someStringValueMap.get("4"));
Assert.assertEquals("8", someStringValueMap.get("8"));


final Object someRecordObj = inputRow.getRaw("someRecord");
Assert.assertNotNull(someRecordObj);
Assert.assertTrue(someRecordObj instanceof LinkedHashMap);
LinkedHashMap someRecord = (LinkedHashMap) someRecordObj;
Assert.assertEquals(4892, someRecord.get("subInt"));
Assert.assertEquals(1543698L, someRecord.get("subLong"));

// towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality
Assert.assertEquals(1, inputRow.getDimension("someIntValueMap").size());
Assert.assertEquals(
Expand Down Expand Up @@ -369,7 +410,7 @@ public Integer apply(@Nullable String input)
);
Assert.assertEquals(Collections.singletonList(String.valueOf(MyEnum.ENUM1)), inputRow.getDimension("someEnum"));
Assert.assertEquals(
Collections.singletonList(String.valueOf(SOME_RECORD_VALUE)),
Collections.singletonList(ImmutableMap.of("subInt", 4892, "subLong", 1543698L).toString()),
inputRow.getDimension("someRecord")
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.data.input.avro;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
Expand All @@ -29,10 +30,12 @@
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AvroFlattenerMakerTest
{
Expand Down Expand Up @@ -214,8 +217,13 @@ private void getRootField_common(final SomeAvroDatum record, final AvroFlattener
record.getSomeEnum().toString(),
flattener.getRootField(record, "someEnum")
);
Map<String, Object> map = new HashMap<>();
record.getSomeRecord()
.getSchema()
.getFields()
.forEach(field -> map.put(field.name(), record.getSomeRecord().get(field.name())));
Assert.assertEquals(
record.getSomeRecord(),
map,
flattener.getRootField(record, "someRecord")
);
Assert.assertEquals(
Expand All @@ -230,8 +238,17 @@ private void getRootField_common(final SomeAvroDatum record, final AvroFlattener
record.getSomeFloat(),
flattener.getRootField(record, "someFloat")
);
Assert.assertEquals(
record.getSomeRecordArray(),
List<Map<String, Object>> list = new ArrayList<>();
for (GenericRecord genericRecord : record.getSomeRecordArray()) {
Map<String, Object> map1 = new HashMap<>();
genericRecord
.getSchema()
.getFields()
.forEach(field -> map1.put(field.name(), genericRecord.get(field.name())));
list.add(map1);
}
Assert.assertEquals(
list,
flattener.getRootField(record, "someRecordArray")
);
}
Expand Down Expand Up @@ -328,8 +345,13 @@ private void makeJsonPathExtractor_common(final SomeAvroDatum record, final Avro
record.getSomeEnum().toString(),
flattener.makeJsonPathExtractor("$.someEnum").apply(record)
);
Map<String, Object> map = new HashMap<>();
record.getSomeRecord()
.getSchema()
.getFields()
.forEach(field -> map.put(field.name(), record.getSomeRecord().get(field.name())));
Assert.assertEquals(
record.getSomeRecord(),
map,
flattener.makeJsonPathExtractor("$.someRecord").apply(record)
);
Assert.assertEquals(
Expand All @@ -344,8 +366,19 @@ private void makeJsonPathExtractor_common(final SomeAvroDatum record, final Avro
record.getSomeFloat(),
flattener.makeJsonPathExtractor("$.someFloat").apply(record)
);

List<Map<String, Object>> list = new ArrayList<>();
for (GenericRecord genericRecord : record.getSomeRecordArray()) {
Map<String, Object> map1 = new HashMap<>();
genericRecord
.getSchema()
.getFields()
.forEach(field -> map1.put(field.name(), genericRecord.get(field.name())));
list.add(map1);
}

Assert.assertEquals(
record.getSomeRecordArray(),
list,
flattener.makeJsonPathExtractor("$.someRecordArray").apply(record)
);

Expand All @@ -355,7 +388,7 @@ private void makeJsonPathExtractor_common(final SomeAvroDatum record, final Avro
);

Assert.assertEquals(
record.getSomeRecordArray(),
list,
flattener.makeJsonPathExtractor("$.someRecordArray[?(@.nestedString)]").apply(record)
);

Expand Down