Skip to content

Commit

Permalink
fix(java): Move schema caching to unsafe trait to avoid issues when u…
Browse files Browse the repository at this point in the history
…sing non-inferred schema. (#1944)



## What does this PR do?

This PR removes the java specific `ExtField` class from the schema and
moves the extData mechanism to the internal UnsafeTrait class. This is
necessary because `ExtField` is only created internally from
`inferSchema` method used by java, and we would potentially import or
derive schemas from other sources (e.g XLANG, handwritten schema,
arrow-native source) -- those schemas will be incompatible when we
attempt to retrieve the cached schema. Removing schema-caching will fix
the issue, but create allocations, so after some discussion, we decided
to move the mechanism to the internal UnsafeTrait class.

This implementation makes changes internal API:
- Derived classes from `UnsafeTrait` need to initialize the `extData`
cache and define the number of extData slots needed.
- The internal `getStruct` method needs to define which slot we use to
retrieve `extData`.

Other:

- REVERTED: pom.xml for fury-format will automatically run tests with
appropriate --add-opens flag for arrow

## Related issues

<!--
Is there any related issue? Please attach here.

- #xxxx0
- #xxxx1
- #xxxx2
-->

## Does this PR introduce any user-facing change?
N/A

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?

## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
wywen authored Nov 19, 2024
1 parent fb2172b commit 5b22ccd
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public BinaryArray(Field field) {
} else {
this.elementSize = width;
}
initializeExtData(1); // Only require at most one slot to cache the schema for array type.
}

public void pointTo(MemoryBuffer buffer, int offset, int sizeInBytes) {
Expand Down Expand Up @@ -135,7 +136,7 @@ public BigDecimal getDecimal(int ordinal) {

@Override
public BinaryRow getStruct(int ordinal) {
return getStruct(ordinal, field.getChildren().get(0));
return getStruct(ordinal, field.getChildren().get(0), 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public BinaryRow(Schema schema) {
this.numFields = schema.getFields().size();
Preconditions.checkArgument(numFields > 0);
this.bitmapWidthInBytes = BitUtils.calculateBitmapWidthInBytes(numFields);
initializeExtData(numFields);
}

public void pointTo(MemoryBuffer buffer, int offset, int sizeInBytes) {
Expand Down Expand Up @@ -155,7 +156,7 @@ public BigDecimal getDecimal(int ordinal) {

@Override
public BinaryRow getStruct(int ordinal) {
return getStruct(ordinal, schema.getFields().get(ordinal));
return getStruct(ordinal, schema.getFields().get(ordinal), ordinal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.DecimalUtility;
import org.apache.fury.format.row.Getters;
import org.apache.fury.format.row.Setters;
Expand All @@ -35,6 +36,7 @@

/** Internal to binary row format to reuse code, don't use it in anywhere else. */
abstract class UnsafeTrait implements Getters, Setters {
private Object[] extData;

abstract MemoryBuffer getBuffer();

Expand All @@ -55,6 +57,10 @@ public MemoryBuffer getBuffer(int ordinal) {

abstract int getOffset(int ordinal);

void initializeExtData(int numSlots) {
extData = new Object[numSlots];
}

// ###########################################################
// ####################### getters #######################
// ###########################################################
Expand Down Expand Up @@ -143,14 +149,25 @@ BigDecimal getDecimal(int ordinal, ArrowType.Decimal decimalType) {
return decimal;
}

BinaryRow getStruct(int ordinal, Field field) {
/**
* Gets the field at a specific ordinal as a struct.
*
* @param ordinal the ordinal position of this field.
* @param field the Arrow field corresponding to this struct.
* @param extDataSlot the ext data slot used to cache the schema for the struct.
* @return the binary row representation of the struct.
*/
BinaryRow getStruct(int ordinal, Field field, int extDataSlot) {
if (isNullAt(ordinal)) {
return null;
}
final long offsetAndSize = getInt64(ordinal);
final int relativeOffset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
BinaryRow row = new BinaryRow(DataTypes.createSchema(field));
if (extData[extDataSlot] == null) {
extData[extDataSlot] = DataTypes.createSchema(field);
}
BinaryRow row = new BinaryRow((Schema) extData[extDataSlot]);
row.pointTo(getBuffer(), getBaseOffset() + relativeOffset, size);
return row;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public static Field field(String name, ArrowType type, Field... children) {
}

public static Field field(String name, FieldType fieldType, List<Field> children) {
return new ExtField(name, fieldType, children);
return new Field(name, fieldType, children);
}

public static Field notNullField(String name, ArrowType type, Field... children) {
Expand Down Expand Up @@ -396,19 +396,11 @@ public static Field mapField(String name, Field keyField, Field itemField) {
}

public static Field keyFieldForMap(Field mapField) {
Field field = mapField.getChildren().get(0).getChildren().get(0);
if (field.getClass() != ExtField.class) {
return new ExtField(field.getName(), field.getFieldType(), field.getChildren());
}
return field;
return mapField.getChildren().get(0).getChildren().get(0);
}

public static Field itemFieldForMap(Field mapField) {
Field field = mapField.getChildren().get(0).getChildren().get(1);
if (field.getClass() != ExtField.class) {
return new ExtField(field.getName(), field.getFieldType(), field.getChildren());
}
return field;
return mapField.getChildren().get(0).getChildren().get(1);
}

public static Field keyArrayFieldForMap(Field mapField) {
Expand All @@ -425,24 +417,7 @@ public static Schema schemaFromStructField(Field structField) {
}

public static Schema createSchema(Field field) {
if (field.getClass() != ExtField.class) {
throw new IllegalArgumentException(
String.format("Field %s got wrong type %s", field, field.getClass()));
}
ExtField extField = (ExtField) field;
Object extData = extField.extData;
if (extData == null) {
extField.extData = extData = new Schema(field.getChildren(), field.getMetadata());
}
return (Schema) extData;
}

static class ExtField extends Field {
Object extData;

public ExtField(String name, FieldType fieldType, List<Field> children) {
super(name, fieldType, children);
}
return new Schema(field.getChildren(), field.getMetadata());
}

public static Field structField(boolean nullable, Field... fields) {
Expand Down

0 comments on commit 5b22ccd

Please sign in to comment.