Skip to content

Commit

Permalink
Support parquet write from scanner to file (apache#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored Sep 9, 2022
1 parent 37cac97 commit d52909c
Show file tree
Hide file tree
Showing 7 changed files with 473 additions and 2 deletions.
168 changes: 166 additions & 2 deletions cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,32 @@
#include "arrow/array.h"
#include "arrow/array/concatenate.h"
#include "arrow/dataset/api.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/ipc/api.h"
#include "arrow/c/helpers.h"
#include "arrow/util/iterator.h"
#include "jni/dataset/jni_util.h"
#include "org_apache_arrow_dataset_file_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"

#include <iostream>

namespace {

jclass illegal_access_exception_class;
jclass illegal_argument_exception_class;
jclass runtime_exception_class;

jclass java_reservation_listener_class;
jclass native_record_batch_iterator_class;

jmethodID reserve_memory_method;
jmethodID unreserve_memory_method;
jmethodID native_record_batch_iterator_hasNext;
jmethodID native_record_batch_iterator_next;

jlong default_memory_pool_id = -1L;

Expand Down Expand Up @@ -151,6 +158,116 @@ class DisposableScannerAdaptor {
}
};

/// \brief Simple fragment implementation that is constructed directly
/// from a record batch iterator.
class SimpleIteratorFragment : public arrow::dataset::Fragment {
public:
explicit SimpleIteratorFragment(arrow::RecordBatchIterator itr)
: arrow::dataset::Fragment() {
itr_ = std::move(itr);
}

static arrow::Result<std::shared_ptr<SimpleIteratorFragment>> Make(
arrow::RecordBatchIterator itr) {
return std::make_shared<SimpleIteratorFragment>(std::move(itr));
}

arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<arrow::dataset::ScanOptions>& options) override {
struct State {
State(std::shared_ptr<SimpleIteratorFragment> fragment)
: fragment(std::move(fragment)) {}

std::shared_ptr<arrow::RecordBatch> Next() { return cur_rb; }

bool Finished() {
arrow::Result<std::shared_ptr<arrow::RecordBatch>> next = fragment->itr_.Next();
if (IsIterationEnd(next)) {
cur_rb = nullptr;

return true;
} else {
cur_rb = next.ValueOrDie();
return false;
}
}

std::shared_ptr<SimpleIteratorFragment> fragment;
std::shared_ptr<arrow::RecordBatch> cur_rb = nullptr;
};

struct Generator {
Generator(std::shared_ptr<SimpleIteratorFragment> fragment)
: state(std::make_shared<State>(std::move(fragment))) {}

arrow::Future<std::shared_ptr<arrow::RecordBatch>> operator()() {
while (!state->Finished()) {
auto next = state->Next();
if (next) {
return arrow::Future<std::shared_ptr<arrow::RecordBatch>>::MakeFinished(
std::move(next));
}
}
return arrow::AsyncGeneratorEnd<std::shared_ptr<arrow::RecordBatch>>();
}

std::shared_ptr<State> state;
};
return Generator(arrow::internal::checked_pointer_cast<SimpleIteratorFragment>(
shared_from_this()));
}

std::string type_name() const override { return "simple_iterator"; }

arrow::Result<std::shared_ptr<arrow::Schema>> ReadPhysicalSchemaImpl() override {
return arrow::Status::NotImplemented("No physical schema is readable");
}

private:
arrow::RecordBatchIterator itr_;
bool used_ = false;
};

/// \brief Create scanner that scans over Java dataset API's components.
///
/// Currently, we use a NativeRecordBatchIterator as the underlying
/// Java object to do scanning. Which means, only one single task will
/// be produced from C++ code.
arrow::Result<std::shared_ptr<arrow::dataset::Scanner>> MakeJavaDatasetScanner(
JavaVM* vm, jobject iter, std::shared_ptr<arrow::Schema> schema) {
arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator(
[vm, iter, schema]() -> arrow::Result<std::shared_ptr<arrow::RecordBatch>> {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
}
if (!env->CallBooleanMethod(iter, native_record_batch_iterator_hasNext)) {
return nullptr; // stream ended
}

auto bytes =
(jbyteArray)env->CallObjectMethod(iter, native_record_batch_iterator_next);
auto byte_array = env->GetByteArrayElements(bytes, 0);
int64_t memory_address;
std::memcpy(&memory_address, byte_array, sizeof(int64_t));

std::shared_ptr<arrow::RecordBatch> rb = JniGetOrThrow(
arrow::dataset::jni::ImportRecordBatch(env, schema, memory_address));
// Release the ArrowArray
auto c_array = reinterpret_cast<struct ArrowArray*>(memory_address);
ArrowArrayRelease(c_array);
return rb;
});

ARROW_ASSIGN_OR_RAISE(auto fragment, SimpleIteratorFragment::Make(std::move(itr)))

arrow::dataset::ScannerBuilder scanner_builder(
std::move(schema), fragment, std::make_shared<arrow::dataset::ScanOptions>());
// Use default memory pool is enough as native allocation is ideally
// not being called during scanning Java-based fragments.
RETURN_NOT_OK(scanner_builder.Pool(arrow::default_memory_pool()));
return scanner_builder.Finish();
}
} // namespace

using arrow::dataset::jni::CreateGlobalClassReference;
Expand Down Expand Up @@ -199,6 +316,15 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
unreserve_memory_method = JniGetOrThrow(
GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V"));

native_record_batch_iterator_class =
CreateGlobalClassReference(env,
"Lorg/apache/arrow/"
"dataset/jni/NativeRecordBatchIterator;");
native_record_batch_iterator_hasNext = JniGetOrThrow(
GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z"));
native_record_batch_iterator_next =
JniGetOrThrow(GetMethodID(env, native_record_batch_iterator_class, "next", "()[B"));

default_memory_pool_id = reinterpret_cast<jlong>(arrow::default_memory_pool());

return JNI_VERSION;
Expand All @@ -212,6 +338,7 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
env->DeleteGlobalRef(illegal_argument_exception_class);
env->DeleteGlobalRef(runtime_exception_class);
env->DeleteGlobalRef(java_reservation_listener_class);
env->DeleteGlobalRef(native_record_batch_iterator_class);

default_memory_pool_id = -1L;
}
Expand Down Expand Up @@ -476,8 +603,8 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
JNIEnv* env, jobject, jstring uri, jint file_format_id,
jlong start_offset, jlong length) {
JNIEnv* env, jobject, jstring uri, jint file_format_id, jlong start_offset,
jlong length) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
Expand All @@ -488,3 +615,40 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
return CreateNativeRef(d);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: writeFromScannerToFile
* Signature:
* (Lorg/apache/arrow/dataset/jni/NativeRecordBatchIterator;[BJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
*/
JNIEXPORT void JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jlong file_format_id,
jstring uri, jobjectArray partition_columns, jint max_partitions,
jstring base_name_template) {
JNI_METHOD_START
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
JniThrow("Unable to get JavaVM instance");
}
auto schema = JniGetOrThrow(FromSchemaByteArray(env, schema_bytes));
auto scanner = JniGetOrThrow(MakeJavaDatasetScanner(vm, itr, schema));
std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
arrow::dataset::FileSystemDatasetWriteOptions options;
std::string output_path;
auto filesystem = JniGetOrThrow(
arrow::fs::FileSystemFromUri(JStringToCString(env, uri), &output_path));
std::vector<std::string> partition_column_vector =
ToStringVector(env, partition_columns);
options.file_write_options = file_format->DefaultWriteOptions();
options.filesystem = filesystem;
options.base_dir = output_path;
options.basename_template = JStringToCString(env, base_name_template);
options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
arrow::dataset::SchemaFromColumnNames(schema, partition_column_vector));
options.max_partitions = max_partitions;
JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
JNI_METHOD_END()
}
1 change: 1 addition & 0 deletions java/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ add_jar(arrow_dataset_java
src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java
GENERATE_NATIVE_HEADERS
arrow_dataset_java-native
DESTINATION
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.jni.NativeRecordBatchIterator;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.util.SchemaUtility;

/**
* JNI-based utility to write datasets into files. It internally depends on C++ static method
* FileSystemDataset::Write.
*/
public class DatasetFileWriter {

/**
* Scan over an input {@link Scanner} then write all record batches to file.
*
* @param scanner the source scanner for writing
* @param format target file format
* @param uri target file uri
* @param maxPartitions maximum partitions to be included in written files
* @param partitionColumns columns used to partition output files. Empty to disable partitioning
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
*/
public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri,
String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
final NativeScannerAdaptorImpl adaptor = new NativeScannerAdaptorImpl(scanner, allocator);
final NativeRecordBatchIterator itr = adaptor.scan();
RuntimeException throwableWrapper = null;
try {
JniWrapper.get().writeFromScannerToFile(itr, SchemaUtility.serialize(scanner.schema()),
format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
} catch (Throwable t) {
throwableWrapper = new RuntimeException(t);
throw throwableWrapper;
} finally {
try {
AutoCloseables.close(itr);
} catch (Exception e) {
if (throwableWrapper != null) {
throwableWrapper.addSuppressed(e);
}
}
}
}

/**
* Scan over an input {@link Scanner} then write all record batches to file, with default partitioning settings.
*
* @param scanner the source scanner for writing
* @param format target file format
* @param uri target file uri
*/
public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri) {
write(allocator, scanner, format, uri, new String[0], 1024, "dat_{i}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.jni.JniLoader;
import org.apache.arrow.dataset.jni.NativeRecordBatchIterator;

/**
* JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations.
Expand Down Expand Up @@ -47,4 +48,21 @@ private JniWrapper() {
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat, long startOffset, long length);

/**
* Write all record batches in a {@link NativeRecordBatchIterator} into files. This internally
* depends on C++ write API: FileSystemDataset::Write.
*
* @param itr iterator to be used for writing
* @param schema serialized schema of output files
* @param fileFormat target file format (ID)
* @param uri target file uri
* @param partitionColumns columns used to partition output files
* @param maxPartitions maximum partitions to be included in written files
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
*/
public native void writeFromScannerToFile(NativeRecordBatchIterator itr, byte[] schema,
long fileFormat, String uri, String[] partitionColumns, int maxPartitions,
String baseNameTemplate);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.jni.NativeRecordBatchIterator;

/**
* A short path comparing to {@link org.apache.arrow.dataset.scanner.Scanner} for being called from C++ side
* via JNI, to minimize JNI call overhead.
*/
public interface NativeScannerAdaptor {

/**
* Scan with the delegated scanner.
*
* @return a iterator outputting JNI-friendly flatbuffers-serialized
* {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} instances.
*/
NativeRecordBatchIterator scan();
}
Loading

0 comments on commit d52909c

Please sign in to comment.