Skip to content

Commit

Permalink
GH-2956: Use avro SchemaBuilder API to convert record (#2957)
Browse files Browse the repository at this point in the history
The avro schema builder API is cleaned and more stable. It decreases
chance of using newly introduced avro API in case user run with legacy
avro version

As OPTIONAL converted fields sets null as default, increase consistency
by using [] as default for REPEATED converted fields.
  • Loading branch information
RustedBones authored and Fokko committed Aug 14, 2024
1 parent ca572cb commit 0f3a615
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
Expand Down Expand Up @@ -58,6 +57,7 @@
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
Expand Down Expand Up @@ -296,21 +296,24 @@ Schema convert(GroupType parquetSchema) {
}

private Schema convertFields(String name, List<Type> parquetFields, Map<String, Integer> names) {
String ns = namespace(name, names);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
SchemaBuilder.FieldAssembler<Schema> builder =
SchemaBuilder.builder(namespace(name, names)).record(name).fields();
for (Type parquetType : parquetFields) {
Schema fieldSchema = convertField(parquetType, names);
if (parquetType.isRepetition(REPEATED)) { // If a repeated field is ungrouped, treat as REQUIRED per spec
fields.add(new Schema.Field(parquetType.getName(), Schema.createArray(fieldSchema)));
builder.name(parquetType.getName())
.type()
.array()
.items()
.type(fieldSchema)
.arrayDefault(new ArrayList<>());
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
builder.name(parquetType.getName()).type().optional().type(fieldSchema);
} else { // REQUIRED
fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, (Object) null));
builder.name(parquetType.getName()).type(fieldSchema).noDefault();
}
}
Schema schema = Schema.createRecord(name, null, ns, false);
schema.setFields(fields);
return schema;
return builder.endRecord();
}

private Schema convertField(final Type parquetType, Map<String, Integer> names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,43 @@ public static void setupConf() {
NEW_BEHAVIOR.setBoolean("parquet.avro.write-old-list-structure", false);
}

public static final String ALL_PARQUET_SCHEMA =
"message org.apache.parquet.avro.myrecord {\n" + " required boolean myboolean;\n"
+ " required int32 myint;\n"
+ " required int64 mylong;\n"
+ " required float myfloat;\n"
+ " required double mydouble;\n"
+ " required binary mybytes;\n"
+ " required binary mystring (UTF8);\n"
+ " required group mynestedrecord {\n"
+ " required int32 mynestedint;\n"
+ " }\n"
+ " required binary myenum (ENUM);\n"
+ " required group myarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " optional group myoptionalarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " required group myarrayofoptional (LIST) {\n"
+ " repeated group list {\n"
+ " optional int32 element;\n"
+ " }\n"
+ " }\n"
+ " required group myrecordarray (LIST) {\n"
+ " repeated group array {\n"
+ " required int32 a;\n"
+ " required int32 b;\n"
+ " }\n"
+ " }\n"
+ " required group mymap (MAP) {\n"
+ " repeated group map (MAP_KEY_VALUE) {\n"
+ " required binary key (UTF8);\n"
+ " required int32 value;\n"
+ " }\n"
+ " }\n"
+ " required fixed_len_byte_array(1) myfixed;\n"
+ "}\n";
public static final String ALL_PARQUET_SCHEMA = "message org.apache.parquet.avro.myrecord {\n"
+ " required boolean myboolean;\n"
+ " required int32 myint;\n"
+ " required int64 mylong;\n"
+ " required float myfloat;\n"
+ " required double mydouble;\n"
+ " required binary mybytes;\n"
+ " required binary mystring (UTF8);\n"
+ " required group mynestedrecord {\n"
+ " required int32 mynestedint;\n"
+ " }\n"
+ " required binary myenum (ENUM);\n"
+ " required group myarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " optional group myoptionalarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " required group myarrayofoptional (LIST) {\n"
+ " repeated group list {\n"
+ " optional int32 element;\n"
+ " }\n"
+ " }\n"
+ " required group myrecordarray (LIST) {\n"
+ " repeated group array {\n"
+ " required int32 a;\n"
+ " required int32 b;\n"
+ " }\n"
+ " }\n"
+ " required group mymap (MAP) {\n"
+ " repeated group map (MAP_KEY_VALUE) {\n"
+ " required binary key (UTF8);\n"
+ " required int32 value;\n"
+ " }\n"
+ " }\n"
+ " required fixed_len_byte_array(1) myfixed;\n"
+ "}\n";

private void testAvroToParquetConversion(Schema avroSchema, String schemaString) throws Exception {
testAvroToParquetConversion(new Configuration(false), avroSchema, schemaString);
Expand Down Expand Up @@ -432,7 +432,8 @@ public void testConvertUngroupedRepeatedField() throws Exception {
+ " \"name\": \"SchemaWithRepeatedField\","
+ " \"fields\": [{"
+ " \"name\": \"repeatedField\","
+ " \"type\": {\"type\": \"array\",\"items\": \"int\"}"
+ " \"type\": {\"type\": \"array\",\"items\": \"int\"},"
+ " \"default\": []"
+ " }]"
+ "}"),
"message SchemaWithRepeatedField { repeated int32 repeatedField; }");
Expand Down

0 comments on commit 0f3a615

Please sign in to comment.