Skip to content

Commit

Permalink
apacheGH-35033: [Java] [Datasets] Add support for multi-file datasets…
Browse files Browse the repository at this point in the history
… from Java (apache#35034)

I'd like to add support for multi-file datasets (in different directories) in the Java bindings.
This PR adds to the existing Datasets API to allow an array of URIs to be passed in. The logic for whether the URIs passed are valid or not exists in the JNI layer.
* Closes: apache#35033

Lead-authored-by: NoahFournier <[email protected]>
Co-authored-by: NoahFournier <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
noahfrn authored Jun 2, 2023
1 parent 018e7d3 commit cd42895
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 2 deletions.
48 changes: 47 additions & 1 deletion java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <mutex>
#include <utility>
#include <unordered_map>

#include "arrow/array.h"
Expand All @@ -25,6 +26,7 @@
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/engine/substrait/util.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
Expand Down Expand Up @@ -569,7 +571,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe
* Signature: (Ljava/lang/String;II)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I(
JNIEnv* env, jobject, jstring uri, jint file_format_id) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
Expand All @@ -582,6 +584,50 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
* Signature: ([Ljava/lang/String;II)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I(
JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) {
JNI_METHOD_START

std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
arrow::dataset::FileSystemFactoryOptions options;

std::vector<std::string> uri_vec = ToStringVector(env, uris);
if (uri_vec.size() == 0) {
JniThrow("No URIs provided.");
}

// If not all URIs, throw exception
if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri);
elem != uri_vec.end()) {
JniThrow("Unrecognized file type in URI: " + *elem);
}

std::vector<std::string> output_paths;
std::string first_path;
// We know that uri_vec isn't empty, from the conditional above
auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path));
output_paths.push_back(first_path);

std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths),
[&](const auto& s) -> std::string {
auto result = JniGetOrThrow(fs->PathFromUri(s));
return std::move(result);
});

std::shared_ptr<arrow::dataset::DatasetFactory> d =
JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(
std::move(fs), std::move(output_paths), file_format, options));
return CreateNativeRef(d);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: writeFromScannerToFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo
super(allocator, memoryPool, createNative(format, uri));
}

public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format,
String[] uris) {
super(allocator, memoryPool, createNative(format, uris));
}

private static long createNative(FileFormat format, String uri) {
return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id());
}

private static long createNative(FileFormat format, String[] uris) {
return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class JniWrapper {

private static final JniWrapper INSTANCE = new JniWrapper();

public static JniWrapper get() {
JniLoader.get().ensureLoaded();
return INSTANCE;
Expand All @@ -45,6 +45,17 @@ private JniWrapper() {
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat);

/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
*
* @param uris List of file uris to read, each path pointing to an individual file
* @param fileFormat file format ID
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
*/
public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat);

/**
* Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally
* depends on C++ write API: FileSystemDataset::Write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,49 @@ public void testBaseParquetRead() throws Exception {
AutoCloseables.close(factory);
}

@Test
public void testMultipleParquetReadFromUris() throws Exception {
ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
1, "a");
ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
2, "b");
String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]";

ScanOptions options = new ScanOptions(1);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()});
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertScanBatchesProduced(factory, options);
assertEquals(2, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);

AutoCloseables.close(datum);
AutoCloseables.close(factory);
}


@Test
public void testMultipleParquetInvalidUri() throws Exception {
RuntimeException exc = assertThrows(RuntimeException.class,
() -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"}));
Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage());
}

@Test
public void testMultipleParquetMultipleFilesystemTypes() throws Exception {
RuntimeException exc = assertThrows(RuntimeException.class,
() -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" }));
Assertions.assertTrue(
exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3"
)
);
}

@Test
public void testParquetProjectSingleColumn() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
Expand Down

0 comments on commit cd42895

Please sign in to comment.