Skip to content

Commit

Permalink
[arrow] Introduce paimon-arrow module to provide utils to convert pai…
Browse files Browse the repository at this point in the history
…mon Row to Arrow FieldVector
  • Loading branch information
yuzelin committed Aug 15, 2024
1 parent 0af4feb commit f11768e
Show file tree
Hide file tree
Showing 10 changed files with 1,450 additions and 0 deletions.
64 changes: 64 additions & 0 deletions paimon-arrow/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.9-SNAPSHOT</version>
</parent>

<artifactId>paimon-arrow</artifactId>
<name>Paimon : Arrow</name>

<properties>
<arrow.version>14.0.0</arrow.version>
</properties>


<dependencies>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.IntArrayList;

import org.apache.arrow.vector.VectorSchemaRoot;

import javax.annotation.Nullable;

/** To convert {@link VectorizedColumnBatch} to Arrow format. */
public class ArrowBatchWriter extends ArrowWriter {

private VectorizedColumnBatch batch;
private @Nullable int[] pickedInColumn;
private int totalNumRows;
private int startIndex;

public ArrowBatchWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
super(root, fieldWriters);
}

@Override
public void doWrite(int maxBatchRows) {
int batchRows = Math.min(maxBatchRows, totalNumRows - startIndex);
ColumnVector[] columns = batch.columns;
for (int i = 0; i < columns.length; i++) {
fieldWriters[i].write(columns[i], pickedInColumn, startIndex, batchRows);
}
root.setRowCount(batchRows);

startIndex += batchRows;
if (startIndex >= totalNumRows) {
releaseIterator();
}
}

public void reset(ApplyDeletionFileRecordIterator iterator) {
this.iterator = iterator;

FileRecordIterator<InternalRow> innerIterator = iterator.iterator();
this.batch = ((VectorizedRecordIterator) innerIterator).batch();

long firstReturnedPosition = innerIterator.returnedPosition() + 1;
DeletionVector deletionVector = iterator.deletionVector();
int originNumRows = this.batch.getNumRows();
IntArrayList picked = new IntArrayList(originNumRows);
for (int i = 0; i < originNumRows; i++) {
long returnedPosition = firstReturnedPosition + i;
if (!deletionVector.isDeleted(returnedPosition)) {
picked.add(i);
}
}
if (picked.size() == originNumRows) {
this.pickedInColumn = null;
this.totalNumRows = originNumRows;
} else {
this.pickedInColumn = picked.toArray();
this.totalNumRows = this.pickedInColumn.length;
}

this.startIndex = 0;
}

public void reset(VectorizedRecordIterator iterator) {
this.iterator = iterator;
this.batch = iterator.batch();
this.pickedInColumn = null;
this.totalNumRows = this.batch.getNumRows();
this.startIndex = 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow;

import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.columnar.ColumnVector;

import org.apache.arrow.vector.FieldVector;

import javax.annotation.Nullable;

/** A reusable writer to convert a field into Arrow {@link FieldVector}. */
public abstract class ArrowFieldWriter {

// reusable
protected final FieldVector fieldVector;

public ArrowFieldWriter(FieldVector fieldVector) {
this.fieldVector = fieldVector;
}

/** Reset the state of the writer to write the next batch of fields. */
public void reset() {
fieldVector.reset();
}

/**
* Write all data of a {@link ColumnVector}.
*
* @param columnVector Which holds the paimon data.
* @param pickedInColumn Which rows is picked to write. Pick all if null. This is used to adapt
* deletion vector.
* @param startIndex From where to start writing.
* @param batchRows How many rows to write.
*/
public void write(
ColumnVector columnVector,
@Nullable int[] pickedInColumn,
int startIndex,
int batchRows) {
doWrite(columnVector, pickedInColumn, startIndex, batchRows);
fieldVector.setValueCount(batchRows);
}

protected abstract void doWrite(
ColumnVector columnVector,
@Nullable int[] pickedInColumn,
int startIndex,
int batchRows);

/** Get the value from the row at the given position and write to specified row index. */
public void write(int rowIndex, DataGetters getters, int pos) {
if (getters.isNullAt(pos)) {
fieldVector.setNull(rowIndex);
} else {
doWrite(rowIndex, getters, pos);
}
fieldVector.setValueCount(fieldVector.getValueCount() + 1);
}

protected abstract void doWrite(int rowIndex, DataGetters getters, int pos);

protected int getRowNumber(int startIndex, int currentIndex, @Nullable int[] pickedInColumn) {
int row = currentIndex + startIndex;
if (pickedInColumn != null) {
row = pickedInColumn[row];
}
return row;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow;

import org.apache.arrow.vector.FieldVector;

/** Factory to create {@link ArrowFieldWriter}. */
@FunctionalInterface
public interface ArrowFieldWriterFactory {

ArrowFieldWriter create(FieldVector fieldVector);
}
Loading

0 comments on commit f11768e

Please sign in to comment.