From d52909c495e4bfd22fd68e2341b59d1f3c705f4b Mon Sep 17 00:00:00 2001 From: JiaKe Date: Fri, 9 Sep 2022 01:29:55 +0000 Subject: [PATCH] Support parquet write from scanner to file (#146) --- cpp/src/jni/dataset/jni_wrapper.cc | 168 +++++++++++++++++- java/dataset/CMakeLists.txt | 1 + .../arrow/dataset/file/DatasetFileWriter.java | 75 ++++++++ .../apache/arrow/dataset/file/JniWrapper.java | 18 ++ .../dataset/file/NativeScannerAdaptor.java | 35 ++++ .../file/NativeScannerAdaptorImpl.java | 146 +++++++++++++++ .../jni/NativeRecordBatchIterator.java | 32 ++++ 7 files changed, 473 insertions(+), 2 deletions(-) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index ee2008c60e83b..964cf09194eae 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -20,15 +20,19 @@ #include "arrow/array.h" #include "arrow/array/concatenate.h" #include "arrow/dataset/api.h" +#include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/ipc/api.h" +#include "arrow/c/helpers.h" #include "arrow/util/iterator.h" #include "jni/dataset/jni_util.h" #include "org_apache_arrow_dataset_file_JniWrapper.h" #include "org_apache_arrow_dataset_jni_JniWrapper.h" #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h" +#include + namespace { jclass illegal_access_exception_class; @@ -36,9 +40,12 @@ jclass illegal_argument_exception_class; jclass runtime_exception_class; jclass java_reservation_listener_class; +jclass native_record_batch_iterator_class; jmethodID reserve_memory_method; jmethodID unreserve_memory_method; +jmethodID native_record_batch_iterator_hasNext; +jmethodID native_record_batch_iterator_next; jlong default_memory_pool_id = -1L; @@ -151,6 +158,116 @@ class DisposableScannerAdaptor { } }; +/// \brief Simple fragment implementation that is constructed directly +/// from a record batch iterator. +class SimpleIteratorFragment : public arrow::dataset::Fragment { + public: + explicit SimpleIteratorFragment(arrow::RecordBatchIterator itr) + : arrow::dataset::Fragment() { + itr_ = std::move(itr); + } + + static arrow::Result> Make( + arrow::RecordBatchIterator itr) { + return std::make_shared(std::move(itr)); + } + + arrow::Result ScanBatchesAsync( + const std::shared_ptr& options) override { + struct State { + State(std::shared_ptr fragment) + : fragment(std::move(fragment)) {} + + std::shared_ptr Next() { return cur_rb; } + + bool Finished() { + arrow::Result> next = fragment->itr_.Next(); + if (IsIterationEnd(next)) { + cur_rb = nullptr; + + return true; + } else { + cur_rb = next.ValueOrDie(); + return false; + } + } + + std::shared_ptr fragment; + std::shared_ptr cur_rb = nullptr; + }; + + struct Generator { + Generator(std::shared_ptr fragment) + : state(std::make_shared(std::move(fragment))) {} + + arrow::Future> operator()() { + while (!state->Finished()) { + auto next = state->Next(); + if (next) { + return arrow::Future>::MakeFinished( + std::move(next)); + } + } + return arrow::AsyncGeneratorEnd>(); + } + + std::shared_ptr state; + }; + return Generator(arrow::internal::checked_pointer_cast( + shared_from_this())); + } + + std::string type_name() const override { return "simple_iterator"; } + + arrow::Result> ReadPhysicalSchemaImpl() override { + return arrow::Status::NotImplemented("No physical schema is readable"); + } + + private: + arrow::RecordBatchIterator itr_; + bool used_ = false; +}; + +/// \brief Create scanner that scans over Java dataset API's components. +/// +/// Currently, we use a NativeRecordBatchIterator as the underlying +/// Java object to do scanning. Which means, only one single task will +/// be produced from C++ code. +arrow::Result> MakeJavaDatasetScanner( + JavaVM* vm, jobject iter, std::shared_ptr schema) { + arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator( + [vm, iter, schema]() -> arrow::Result> { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return arrow::Status::Invalid("JNIEnv was not attached to current thread"); + } + if (!env->CallBooleanMethod(iter, native_record_batch_iterator_hasNext)) { + return nullptr; // stream ended + } + + auto bytes = + (jbyteArray)env->CallObjectMethod(iter, native_record_batch_iterator_next); + auto byte_array = env->GetByteArrayElements(bytes, 0); + int64_t memory_address; + std::memcpy(&memory_address, byte_array, sizeof(int64_t)); + + std::shared_ptr rb = JniGetOrThrow( + arrow::dataset::jni::ImportRecordBatch(env, schema, memory_address)); + // Release the ArrowArray + auto c_array = reinterpret_cast(memory_address); + ArrowArrayRelease(c_array); + return rb; + }); + + ARROW_ASSIGN_OR_RAISE(auto fragment, SimpleIteratorFragment::Make(std::move(itr))) + + arrow::dataset::ScannerBuilder scanner_builder( + std::move(schema), fragment, std::make_shared()); + // Use default memory pool is enough as native allocation is ideally + // not being called during scanning Java-based fragments. + RETURN_NOT_OK(scanner_builder.Pool(arrow::default_memory_pool())); + return scanner_builder.Finish(); +} } // namespace using arrow::dataset::jni::CreateGlobalClassReference; @@ -199,6 +316,15 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { unreserve_memory_method = JniGetOrThrow( GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V")); + native_record_batch_iterator_class = + CreateGlobalClassReference(env, + "Lorg/apache/arrow/" + "dataset/jni/NativeRecordBatchIterator;"); + native_record_batch_iterator_hasNext = JniGetOrThrow( + GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); + native_record_batch_iterator_next = + JniGetOrThrow(GetMethodID(env, native_record_batch_iterator_class, "next", "()[B")); + default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); return JNI_VERSION; @@ -212,6 +338,7 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(illegal_argument_exception_class); env->DeleteGlobalRef(runtime_exception_class); env->DeleteGlobalRef(java_reservation_listener_class); + env->DeleteGlobalRef(native_record_batch_iterator_class); default_memory_pool_id = -1L; } @@ -476,8 +603,8 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe */ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( - JNIEnv* env, jobject, jstring uri, jint file_format_id, - jlong start_offset, jlong length) { + JNIEnv* env, jobject, jstring uri, jint file_format_id, jlong start_offset, + jlong length) { JNI_METHOD_START std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); @@ -488,3 +615,40 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( return CreateNativeRef(d); JNI_METHOD_END(-1L) } + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: writeFromScannerToFile + * Signature: + * (Lorg/apache/arrow/dataset/jni/NativeRecordBatchIterator;[BJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( + JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jlong file_format_id, + jstring uri, jobjectArray partition_columns, jint max_partitions, + jstring base_name_template) { + JNI_METHOD_START + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + JniThrow("Unable to get JavaVM instance"); + } + auto schema = JniGetOrThrow(FromSchemaByteArray(env, schema_bytes)); + auto scanner = JniGetOrThrow(MakeJavaDatasetScanner(vm, itr, schema)); + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemDatasetWriteOptions options; + std::string output_path; + auto filesystem = JniGetOrThrow( + arrow::fs::FileSystemFromUri(JStringToCString(env, uri), &output_path)); + std::vector partition_column_vector = + ToStringVector(env, partition_columns); + options.file_write_options = file_format->DefaultWriteOptions(); + options.filesystem = filesystem; + options.base_dir = output_path; + options.basename_template = JStringToCString(env, base_name_template); + options.partitioning = std::make_shared( + arrow::dataset::SchemaFromColumnNames(schema, partition_column_vector)); + options.max_partitions = max_partitions; + JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); + JNI_METHOD_END() +} diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index 5b6e4a9ce241a..ddb150549f20f 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -36,6 +36,7 @@ add_jar(arrow_dataset_java src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java + src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java GENERATE_NATIVE_HEADERS arrow_dataset_java-native DESTINATION diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java new file mode 100644 index 0000000000000..00da9d1ef3765 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.util.SchemaUtility; + +/** + * JNI-based utility to write datasets into files. It internally depends on C++ static method + * FileSystemDataset::Write. + */ +public class DatasetFileWriter { + + /** + * Scan over an input {@link Scanner} then write all record batches to file. + * + * @param scanner the source scanner for writing + * @param format target file format + * @param uri target file uri + * @param maxPartitions maximum partitions to be included in written files + * @param partitionColumns columns used to partition output files. Empty to disable partitioning + * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition + * ID around all written files. + */ + public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, + String[] partitionColumns, int maxPartitions, String baseNameTemplate) { + final NativeScannerAdaptorImpl adaptor = new NativeScannerAdaptorImpl(scanner, allocator); + final NativeRecordBatchIterator itr = adaptor.scan(); + RuntimeException throwableWrapper = null; + try { + JniWrapper.get().writeFromScannerToFile(itr, SchemaUtility.serialize(scanner.schema()), + format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); + } catch (Throwable t) { + throwableWrapper = new RuntimeException(t); + throw throwableWrapper; + } finally { + try { + AutoCloseables.close(itr); + } catch (Exception e) { + if (throwableWrapper != null) { + throwableWrapper.addSuppressed(e); + } + } + } + } + + /** + * Scan over an input {@link Scanner} then write all record batches to file, with default partitioning settings. + * + * @param scanner the source scanner for writing + * @param format target file format + * @param uri target file uri + */ + public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri) { + write(allocator, scanner, format, uri, new String[0], 1024, "dat_{i}"); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 9f387c18e330b..df4872750f4fe 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -18,6 +18,7 @@ package org.apache.arrow.dataset.file; import org.apache.arrow.dataset.jni.JniLoader; +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; /** * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations. @@ -47,4 +48,21 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat, long startOffset, long length); + /** + * Write all record batches in a {@link NativeRecordBatchIterator} into files. This internally + * depends on C++ write API: FileSystemDataset::Write. + * + * @param itr iterator to be used for writing + * @param schema serialized schema of output files + * @param fileFormat target file format (ID) + * @param uri target file uri + * @param partitionColumns columns used to partition output files + * @param maxPartitions maximum partitions to be included in written files + * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition + * ID around all written files. + */ + public native void writeFromScannerToFile(NativeRecordBatchIterator itr, byte[] schema, + long fileFormat, String uri, String[] partitionColumns, int maxPartitions, + String baseNameTemplate); + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java new file mode 100644 index 0000000000000..da3ec75b2c295 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; + +/** + * A short path comparing to {@link org.apache.arrow.dataset.scanner.Scanner} for being called from C++ side + * via JNI, to minimize JNI call overhead. + */ +public interface NativeScannerAdaptor { + + /** + * Scan with the delegated scanner. + * + * @return a iterator outputting JNI-friendly flatbuffers-serialized + * {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} instances. + */ + NativeRecordBatchIterator scan(); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java new file mode 100644 index 0000000000000..d6c4d05e89554 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.Data; +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Default implementation of {@link NativeScannerAdaptor}. + */ +public class NativeScannerAdaptorImpl implements NativeScannerAdaptor, AutoCloseable { + + private final Scanner scanner; + private final BufferAllocator allocator; + + /** + * Constructor. + * + * @param scanner the delegated scanner. + */ + public NativeScannerAdaptorImpl(Scanner scanner, BufferAllocator allocator) { + this.scanner = scanner; + this.allocator = allocator; + } + + @Override + public NativeRecordBatchIterator scan() { + final Iterable tasks = scanner.scan(); + return new IteratorImpl(tasks, allocator); + } + + @Override + public void close() throws Exception { + scanner.close(); + } + + private static class IteratorImpl implements NativeRecordBatchIterator { + + private final Iterator taskIterator; + + private ScanTask currentTask = null; + private ArrowReader reader = null; + + private BufferAllocator allocator = null; + + public IteratorImpl(Iterable tasks, + BufferAllocator allocator) { + this.taskIterator = tasks.iterator(); + this.allocator = allocator; + } + + @Override + public void close() throws Exception { + closeCurrent(); + } + + private void closeCurrent() throws Exception { + if (currentTask == null) { + return; + } + currentTask.close(); + reader.close(); + } + + private boolean advance() { + if (!taskIterator.hasNext()) { + return false; + } + try { + closeCurrent(); + } catch (Exception e) { + throw new RuntimeException(e); + } + currentTask = taskIterator.next(); + reader = currentTask.execute(); + return true; + } + @Override + public boolean hasNext() { + + if (currentTask == null) { + if (!advance()) { + return false; + } + } + try { + if (!reader.loadNextBatch()) { + if (!advance()) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + byte[] longtoBytes(long data) { + return new byte[]{ + (byte) ((data >> 0) & 0xff), + (byte) ((data >> 8) & 0xff), + (byte) ((data >> 16) & 0xff), + (byte) ((data >> 24) & 0xff), + (byte) ((data >> 32) & 0xff), + (byte) ((data >> 40) & 0xff), + (byte) ((data >> 48) & 0xff), + (byte) ((data >> 56) & 0xff), + }; + } + + @Override + public byte[] next() { + ArrowArray arrowArray = ArrowArray.allocateNew(allocator); + try { + Data.exportVectorSchemaRoot(allocator, reader.getVectorSchemaRoot(), reader, arrowArray); + return longtoBytes(arrowArray.memoryAddress()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java new file mode 100644 index 0000000000000..cc4144677e666 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.Iterator; +/** + * Iterate on the memory address of ArrowArray + * next() should be called from C++ scanner to read the memory address of ArrowArray. + */ +public interface NativeRecordBatchIterator extends Iterator, AutoCloseable { + + /** + * Return next {@link org.apache.arrow.c.ArrowArray} memory address. + */ + @Override + byte[] next(); +}