Skip to content

Commit

Permalink
[parquet] Fix that cannot read parquet ROW<DECIMAL> data
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Nov 14, 2024
1 parent 9e4b28a commit f24adb2
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import java.util.Arrays;

/** Heap vector that nullable shared structure. */
public abstract class AbstractHeapVector extends AbstractWritableVector {
public abstract class AbstractHeapVector extends AbstractWritableVector
implements ElementCountable {

public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;

Expand Down Expand Up @@ -116,6 +117,7 @@ public HeapIntVector getDictionaryIds() {
return dictionaryIds;
}

@Override
public int getLen() {
return this.len;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,10 @@
* limitations under the License.
*/

package org.apache.paimon.format.parquet.position;
package org.apache.paimon.data.columnar.heap;

import javax.annotation.Nullable;
/** Container with a known number of elements. */
public interface ElementCountable {

/** To represent struct's position in repeated type. */
public class RowPosition {
@Nullable private final boolean[] isNull;
private final int positionsCount;

public RowPosition(boolean[] isNull, int positionsCount) {
this.isNull = isNull;
this.positionsCount = positionsCount;
}

public boolean[] getIsNull() {
return isNull;
}

public int getPositionsCount() {
return positionsCount;
}
int getLen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -573,6 +574,24 @@ public void testCountStarPK() {
validateCount1NotPushDown(sql);
}

@Test
public void testParquetRowDecimalAndTimestamp() {
sql(
"CREATE TABLE parquet_row_decimal(`row` ROW<f0 DECIMAL(2,1)>) WITH ('file.format' = 'parquet')");
sql("INSERT INTO parquet_row VALUES ( (ROW(1.2)) )");

assertThat(sql("SELECT * FROM parquet_row_decimal"))
.containsExactly(Row.of(Row.of(new BigDecimal("1.2"))));

sql(
"CREATE TABLE parquet_row_timestamp(`row` ROW<f0 TIMESTAMP(0)>) WITH ('file.format' = 'parquet')");
sql("INSERT INTO parquet_row VALUES ( (ROW(TIMESTAMP'2024-11-13 18:00:00')) )");

assertThat(sql("SELECT * FROM parquet_row_timestamp"))
.containsExactly(
Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0))));
}

private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv, sql);
while (!transformation.getInputs().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
Expand Down Expand Up @@ -293,7 +294,10 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] = new ParquetDecimalVector(writableVectors[i]);
vectors[i] =
new ParquetDecimalVector(
writableVectors[i],
((ElementCountable) writableVectors[i]).getLen());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.heap.HeapArrayVector;
import org.apache.paimon.data.columnar.heap.HeapMapVector;
import org.apache.paimon.data.columnar.heap.HeapRowVector;
Expand Down Expand Up @@ -134,7 +135,7 @@ private Pair<LevelDelegation, WritableColumnVector> readRow(
String.format("Row field does not have any children: %s.", field));
}

int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
int len = ((ElementCountable) finalChildrenVectors[0]).getLen();
boolean[] isNull = new boolean[len];
Arrays.fill(isNull, true);
boolean hasNull = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
phiv.vector[i] = ((List<Integer>) valueList).get(i);
}
}
return new ParquetDecimalVector(phiv);
return new ParquetDecimalVector(phiv, total);
case INT64:
HeapLongVector phlv = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
Expand All @@ -505,10 +505,10 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
phlv.vector[i] = ((List<Long>) valueList).get(i);
}
}
return new ParquetDecimalVector(phlv);
return new ParquetDecimalVector(phlv, total);
default:
HeapBytesVector phbv = getHeapBytesVector(total, valueList);
return new ParquetDecimalVector(phbv);
return new ParquetDecimalVector(phbv, total);
}
default:
throw new RuntimeException("Unsupported type in the list: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.data.columnar.Dictionary;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.LongColumnVector;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
Expand All @@ -38,12 +39,18 @@
* {@link DecimalColumnVector} interface.
*/
public class ParquetDecimalVector
implements DecimalColumnVector, WritableLongVector, WritableIntVector, WritableBytesVector {
implements DecimalColumnVector,
WritableLongVector,
WritableIntVector,
WritableBytesVector,
ElementCountable {

private final ColumnVector vector;
private final int len;

public ParquetDecimalVector(ColumnVector vector) {
public ParquetDecimalVector(ColumnVector vector, int len) {
this.vector = vector;
this.len = len;
}

@Override
Expand Down Expand Up @@ -225,4 +232,9 @@ public void fill(long value) {
((WritableLongVector) vector).fill(value);
}
}

@Override
public int getLen() {
return len;
}
}

This file was deleted.

0 comments on commit f24adb2

Please sign in to comment.