From cd42895a0a8b6fa21be5bc06d13d771bef0d4fb3 Mon Sep 17 00:00:00 2001 From: NoahFournier <63198198+NoahFournier@users.noreply.github.com> Date: Fri, 2 Jun 2023 13:49:55 +0100 Subject: [PATCH] GH-35033: [Java] [Datasets] Add support for multi-file datasets from Java (#35034) I'd like to add support for multi-file datasets (in different directories) in the Java bindings. This PR adds to the existing Datasets API to allow an array of URIs to be passed in. The logic for whether the URIs passed are valid or not exists in the JNI layer. * Closes: #35033 Lead-authored-by: NoahFournier Co-authored-by: NoahFournier <63198198+NoahFournier@users.noreply.github.com> Signed-off-by: David Li --- java/dataset/src/main/cpp/jni_wrapper.cc | 48 ++++++++++++++++++- .../file/FileSystemDatasetFactory.java | 9 ++++ .../apache/arrow/dataset/file/JniWrapper.java | 13 ++++- .../dataset/file/TestFileSystemDataset.java | 43 +++++++++++++++++ 4 files changed, 111 insertions(+), 2 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 48191eac495d0..cba2b4a0dbf42 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include "arrow/array.h" @@ -25,6 +26,7 @@ #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" #include "arrow/engine/substrait/util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" @@ -569,7 +571,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe * Signature: (Ljava/lang/String;II)J */ JNIEXPORT jlong JNICALL -Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( JNIEnv* env, jobject, jstring uri, jint file_format_id) { JNI_METHOD_START std::shared_ptr file_format = @@ -582,6 +584,50 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: ([Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( + JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { + JNI_METHOD_START + + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + + std::vector uri_vec = ToStringVector(env, uris); + if (uri_vec.size() == 0) { + JniThrow("No URIs provided."); + } + + // If not all URIs, throw exception + if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri); + elem != uri_vec.end()) { + JniThrow("Unrecognized file type in URI: " + *elem); + } + + std::vector output_paths; + std::string first_path; + // We know that uri_vec isn't empty, from the conditional above + auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path)); + output_paths.push_back(first_path); + + std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths), + [&](const auto& s) -> std::string { + auto result = JniGetOrThrow(fs->PathFromUri(s)); + return std::move(result); + }); + + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + std::move(fs), std::move(output_paths), file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index 1268d11fe10a9..aa315690592ee 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo super(allocator, memoryPool, createNative(format, uri)); } + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, + String[] uris) { + super(allocator, memoryPool, createNative(format, uris)); + } + private static long createNative(FileFormat format, String uri) { return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); } + private static long createNative(FileFormat format, String[] uris) { + return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); + } + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 18560a46a5c8d..c3a1a4e58a140 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -25,7 +25,7 @@ public class JniWrapper { private static final JniWrapper INSTANCE = new JniWrapper(); - + public static JniWrapper get() { JniLoader.get().ensureLoaded(); return INSTANCE; @@ -45,6 +45,17 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * + * @param uris List of file uris to read, each path pointing to an individual file + * @param fileFormat file format ID + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); + /** * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally * depends on C++ write API: FileSystemDataset::Write. diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index b8a13937a8aad..735b3ae61106d 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -101,6 +101,49 @@ public void testBaseParquetRead() throws Exception { AutoCloseables.close(factory); } + @Test + public void testMultipleParquetReadFromUris() throws Exception { + ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a"); + ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 2, "b"); + String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]"; + + ScanOptions options = new ScanOptions(1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()}); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertScanBatchesProduced(factory, options); + assertEquals(2, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, expectedJsonUnordered, datum); + + AutoCloseables.close(datum); + AutoCloseables.close(factory); + } + + + @Test + public void testMultipleParquetInvalidUri() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"})); + Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage()); + } + + @Test + public void testMultipleParquetMultipleFilesystemTypes() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" })); + Assertions.assertTrue( + exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3" + ) + ); + } + @Test public void testParquetProjectSingleColumn() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");