Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JNI support for converting Arrow buffers to CUDF ColumnVectors [skip ci] #7222

Merged
merged 17 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@
<version>2.25.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<properties>
Expand All @@ -151,6 +157,7 @@
<GPU_ARCHS>ALL</GPU_ARCHS>
<native.build.path>${project.build.directory}/cmake-build</native.build.path>
<slf4j.version>1.7.30</slf4j.version>
<arrow.version>0.15.1</arrow.version>
</properties>

<profiles>
Expand Down
114 changes: 114 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ArrowColumnBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;

/**
* Column builder from Arrow data. This builder takes in pointers to the Arrow off heap
* memory and allows efficient building of CUDF ColumnVectors from that arrow data.
jlowe marked this conversation as resolved.
Show resolved Hide resolved
* The caller can add multiple batches where each batch corresponds to Arrow data
* and those batches get concatenated together after being converted to CUDF
* ColumnVectors.
*/
public final class ArrowColumnBuilder implements AutoCloseable {
private DType type;
jlowe marked this conversation as resolved.
Show resolved Hide resolved
private ArrayList<Long> data = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these container fields can be declared final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

private ArrayList<Long> dataLength = new ArrayList<>();
private ArrayList<Long> validity = new ArrayList<>();
private ArrayList<Long> validityLength = new ArrayList<>();
jlowe marked this conversation as resolved.
Show resolved Hide resolved
private ArrayList<Long> offsets = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of things getting out of sync, would it be better for these to be an array of objects instead of an object of arrays? How many of these do you expect a user to pass in? And even if it is large is the access pattern for the metadata going to be one batch at a time or all of the offsets followed by all of the nullCounts, ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had briefly considered making a class to hold each Arrow batch data so we didn't have as many lists but honestly didn't think it all the way thru and kind of forgot about it, so glad you brought it up.
You are going to get one of these whenever you hit the row limit while iterating the ColumnBatches in HostColumnToGpu. That is on the Spark side at least.
I'm not sure I follow your last question. you can see how its used below, currently we build a column per entry here and then concatenate all of the column vectors at the end.

one thing about putting this into another class and making it an array of objects is we could then just extend that class to support nested types and this api shouldn't have to change (hopefully?)...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just trying to understand if there was a performance reason to use a object of arrays vs an array of objects. From what I have seen because of the access pattern an array of objects should not be a performance problem, and hopefully will make the code a bit more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I could just have this take in the Arrow ValueVector and have this do the work internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do that then we have to depend on Arrow as a dependency for the API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with how it is, it really was just a nit

private ArrayList<Long> offsetsLength = new ArrayList<>();
private ArrayList<Long> nullCount = new ArrayList<>();
private ArrayList<Long> rows = new ArrayList<>();
private int numBatches = 0;
jlowe marked this conversation as resolved.
Show resolved Hide resolved
private String colName;

public ArrowColumnBuilder(HostColumnVector.DataType type, String name) {
this.type = type.getType();
revans2 marked this conversation as resolved.
Show resolved Hide resolved
this.colName = name;
}

public void addBatch(long rows, long nullCount, long data, long dataLength, long valid,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
long validLength, long offsets, long offsetsLength) {
this.numBatches += 1;
this.rows.add(rows);
this.nullCount.add(nullCount);
revans2 marked this conversation as resolved.
Show resolved Hide resolved
this.data.add(data);
this.dataLength.add(dataLength);
this.validity.add(valid);
this.validityLength.add(validLength);
this.offsets.add(offsets);
this.offsetsLength.add(offsetsLength);
}

/**
* Create the immutable ColumnVector, copied to the device based on the Arrow data.
*/
public final ColumnVector buildAndPutOnDevice() {
ArrayList<ColumnVector> allVecs = new ArrayList<>(this.numBatches);
ColumnVector vecRet;
try {
for (int i = 0; i < this.numBatches; i++) {
allVecs.add(ColumnVector.fromArrow(type, colName, rows.get(i), nullCount.get(i),
data.get(i), dataLength.get(i), validity.get(i), validityLength.get(i),
offsets.get(i), offsetsLength.get(i)));
}
if (this.numBatches == 1) {
vecRet = allVecs.get(0);
} else if (this.numBatches > 1) {
vecRet = ColumnVector.concatenate(allVecs.toArray(new ColumnVector[0]));
} else {
throw new IllegalStateException("Can't build a ColumnVector when no Arrow batches specified");
}
} finally {
// close the vectors that were concatenated
if (this.numBatches > 1) {
for (ColumnVector cv : allVecs) {
cv.close();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use forEach method allVecs.forEach(v -> v.close());

}
}
return vecRet;
}

@Override
public void close() {
// memory buffers owned outside of this
}

@Override
public String toString() {
StringJoiner sj = new StringJoiner(",");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sj is unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks will update

return "ArrowColumnBuilder{" +
"type=" + type +
", data=" + data +
", dataLength=" + dataLength +
", validity=" + validity +
", validityLength=" + validityLength +
", offsets=" + offsets +
", offsetsLength=" + offsetsLength+
", nullCount=" + nullCount +
", rows=" + rows +
", populatedRows=" + rows +
'}';
}
}
35 changes: 35 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,37 @@ public BaseDeviceMemoryBuffer getDeviceBufferFor(BufferType type) {
return srcBuffer;
}

/**
* Create a ColumnVector from the off heap Apache Arrow buffers passed in.
jlowe marked this conversation as resolved.
Show resolved Hide resolved
* @param type - type of the column
* @param colName - Name of the column
jlowe marked this conversation as resolved.
Show resolved Hide resolved
* @param numRows - Number of rows in the arrow column
* @param nullCount - Null count
* @param data - address of the Arrow data buffer
* @param dataLength - size of the Arrow data buffer
* @param validity - address of the Arrow validity buffer
* @param validityLength - size of the Arrow validity buffer
* @param offsets - address of the Arrow offsets buffer
* @param offsetsLength - size of the Arrow offsets buffer
* @return - new ColumnVector
*/
public static ColumnVector fromArrow(
DType type,
String colName,
long numRows,
long nullCount,
long data,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API would be more Java-like if it took ByteBuffer instances (which can be DirectByteBuffer for off-heap). Not sure if that makes it easier to wield for the intended use-case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing me to the jni bytebuffer api, I wasn't aware of a way to get the address and size

long dataLength,
long validity,
long validityLength,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
long offsets,
long offsetsLength) {
long columnHandle = fromArrow(type.typeId.getNativeId(), colName, numRows, nullCount, data,
dataLength, validity, validityLength, offsets, offsetsLength);
ColumnVector vec = new ColumnVector(columnHandle);
return vec;
}

/**
* Create a new vector of length rows, where each row is filled with the Scalar's
* value
Expand Down Expand Up @@ -615,6 +646,10 @@ public ColumnVector castTo(DType type) {

private static native long sequence(long initialValue, long step, int rows);

private static native long fromArrow(int type, String col_name, long col_length,
long null_count, long data, long data_size, long validity, long validity_size,
long offsets, long offsets_size) throws CudfException;

private static native long fromScalar(long scalarHandle, int rowCount) throws CudfException;

private static native long makeList(long[] handles, long typeHandle, int scale, long rows)
Expand Down
64 changes: 64 additions & 0 deletions java/src/main/native/src/ColumnVectorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
* limitations under the License.
*/

#include <arrow/api.h>
#include <cudf/column/column_factories.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/filling.hpp>
#include <cudf/interop.hpp>
#include <cudf/hashing.hpp>
#include <cudf/reshape.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/detail/interop.hpp>
#include <cudf/lists/detail/concatenate.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/scalar/scalar_factories.hpp>
Expand Down Expand Up @@ -50,6 +53,67 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnVector_sequence(JNIEnv *env, j
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnVector_fromArrow(JNIEnv *env, jclass,
jint j_type,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
jstring j_col_name,
jlong j_col_length,
jlong j_null_count,
jlong j_data,
jlong j_data_size,
jlong j_validity,
jlong j_validity_size,
jlong j_offsets,
jlong j_offsets_size) {
try {
cudf::jni::auto_set_device(env);
cudf::type_id n_type = static_cast<cudf::type_id>(j_type);

auto data_buffer = arrow::Buffer::Wrap(reinterpret_cast<const char *>(j_data), static_cast<int>(j_data_size));
auto null_buffer = arrow::Buffer::Wrap(reinterpret_cast<const char *>(j_validity), static_cast<int>(j_validity_size));
// offsets buffer only used for certain types, can be 0
auto offsets_buffer = arrow::Buffer::Wrap(reinterpret_cast<const char *>(j_offsets), static_cast<int>(j_offsets_size));

cudf::jni::native_jlongArray outcol_handles(env, 1);
std::shared_ptr<arrow::Array> arrow_array;
switch (n_type) {
case cudf::type_id::DECIMAL32:
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Don't support converting DECIMAL32 yet", 0);
jlowe marked this conversation as resolved.
Show resolved Hide resolved
break;
case cudf::type_id::DECIMAL64:
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Don't support converting DECIMAL64 yet", 0);
break;
case cudf::type_id::STRUCT:
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Don't support converting STRUCT yet", 0);
break;
case cudf::type_id::LIST:
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Don't support converting LIST yet", 0);
break;
case cudf::type_id::DICTIONARY32:
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Don't support converting DICTIONARY32 yet", 0);
break;
case cudf::type_id::STRING:
arrow_array = std::make_shared<arrow::StringArray>(j_col_length, offsets_buffer, data_buffer, null_buffer, j_null_count);
break;
default:
// this handles the primitive types
arrow_array = cudf::detail::to_arrow_array(n_type, j_col_length, data_buffer, null_buffer, j_null_count);
}
cudf::jni::native_jstring col_name(env, j_col_name);
auto struct_meta = cudf::column_metadata{col_name.get()};
jlowe marked this conversation as resolved.
Show resolved Hide resolved
auto name_and_type = arrow::field(struct_meta.name, arrow_array->type());
std::vector<std::shared_ptr<arrow::Field>> fields = {name_and_type};
std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
auto arrow_table = arrow::Table::Make(schema, std::vector<std::shared_ptr<arrow::Array>>{arrow_array});
std::unique_ptr<cudf::table> table_result = cudf::from_arrow(*(arrow_table));
std::vector<std::unique_ptr<cudf::column>> retCols = table_result->release();
if (retCols.size() != 1) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Must result in one column", 0);
}
return reinterpret_cast<jlong>(retCols[0].release());
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnVector_makeList(JNIEnv *env, jobject j_object,
jlongArray handles,
jlong j_type,
Expand Down
Loading