From e12a4545bdc5a8683c8dfdbb0468922d444c0500 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 17 Jan 2022 15:42:03 +0100 Subject: [PATCH] ARROW-12480: [Java][Dataset] FileSystemDataset: Support reading from a directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #10114 from zhztheplayer/ARROW-12480 Authored-by: Hongze Zhang Signed-off-by: Krisztián Szűcs --- cpp/src/arrow/dataset/discovery.cc | 13 ++- java/dataset/pom.xml | 5 ++ .../apache/arrow/dataset/file/JniWrapper.java | 3 +- .../arrow/dataset/ParquetWriteSupport.java | 7 +- .../dataset/file/TestFileSystemDataset.java | 90 +++++++++++++++---- 5 files changed, 97 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 0f9d479b9d6b1..f8382aa405203 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -210,10 +210,21 @@ Result> FileSystemDatasetFactory::Make( Result> FileSystemDatasetFactory::Make( std::string uri, std::shared_ptr format, FileSystemFactoryOptions options) { + // TODO Partitioning support. Dictionary support should be done before that. See + // ARROW-12481. std::string internal_path; ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, - arrow::fs::FileSystemFromUri(uri, &internal_path)) + fs::FileSystemFromUri(uri, &internal_path)) ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) + if (file_info.IsDirectory()) { + fs::FileSelector selector; + selector.base_dir = file_info.path(); + selector.recursive = true; + return arrow::dataset::FileSystemDatasetFactory::Make( + std::move(filesystem), std::move(selector), std::move(format), + std::move(options)); + } + // is a single file return std::shared_ptr(new FileSystemDatasetFactory( {file_info}, std::move(filesystem), std::move(format), std::move(options))); } diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index 513aeac868e9e..fb80c8750a51f 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -98,6 +98,11 @@ ${dep.guava.version} test + + com.fasterxml.jackson.core + jackson-databind + test + diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 1af307aac3816..f69d8205192c0 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -37,7 +37,8 @@ private JniWrapper() { /** * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a * intermediate shared_ptr of the factory instance. - * @param uri file uri to read + * + * @param uri file uri to read, either a file or a directory * @param fileFormat file format ID * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. * @see FileFormat diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java index c6299d135a030..efdaf46dd562a 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Random; import org.apache.arrow.util.Preconditions; import org.apache.avro.Schema; @@ -42,13 +43,15 @@ public class ParquetWriteSupport implements AutoCloseable { private final Schema avroSchema; private final List writtenRecords = new ArrayList<>(); private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder(); + private final Random random = new Random(); public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception { avroSchema = readSchemaFromFile(schemaName); - path = outputFolder.getPath() + File.separator + "generated.parquet"; + path = outputFolder.getPath() + File.separator + "generated-" + random.nextLong() + ".parquet"; uri = "file://" + path; - writer = AvroParquetWriter.builder(new org.apache.hadoop.fs.Path(path)) + writer = AvroParquetWriter + .builder(new org.apache.hadoop.fs.Path(path)) .withSchema(avroSchema) .build(); } diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 2b99f8283deb0..83d57c7421bcf 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -22,11 +22,16 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -55,6 +60,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.rules.TemporaryFolder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.primitives.Primitives; + public class TestFileSystemDataset extends TestNativeDataset { @ClassRule @@ -129,6 +137,29 @@ public void testParquetBatchSize() throws Exception { AutoCloseables.close(datum); } + @Test + public void testParquetDirectoryRead() throws Exception { + final File outputFolder = TMP.newFolder(); + ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder, + 1, "a", 2, "b", 3, "c"); + ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder, + 4, "e", 5, "f", 6, "g", 7, "h"); + String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"],[3,\"c\"],[4,\"e\"],[5,\"f\"],[6,\"g\"],[7,\"h\"]]"; + + ScanOptions options = new ScanOptions(new String[0], 1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, outputFolder.toURI().toString()); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(7, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, expectedJsonUnordered, datum); + + AutoCloseables.close(datum); + } + @Test public void testEmptyProjectSelectsZeroColumns() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); @@ -295,6 +326,35 @@ public void testMemoryAllocationOnAssociatedAllocator() throws Exception { Assert.assertEquals(-expected_diff, finalReservation - reservation); } + private void checkParquetReadResult(Schema schema, String expectedJson, List actual) + throws IOException { + final ObjectMapper json = new ObjectMapper(); + final Set expectedSet = json.readValue(expectedJson, Set.class); + final Set> actualSet = new HashSet<>(); + final int fieldCount = schema.getFields().size(); + try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) { + VectorLoader loader = new VectorLoader(vsr); + for (ArrowRecordBatch batch : actual) { + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < batchRowCount; i++) { + List row = new ArrayList<>(); + for (int j = 0; j < fieldCount; j++) { + Object object = vsr.getVector(j).getObject(i); + if (Primitives.isWrapperType(object.getClass())) { + row.add(object); + } else { + row.add(object.toString()); + } + } + actualSet.add(row); + } + } + } + Assert.assertEquals("Mismatched data read from Parquet, actual: " + json.writeValueAsString(actualSet) + ";", + expectedSet, actualSet); + } + private void checkParquetReadResult(Schema schema, List expected, List actual) { assertEquals(expected.size(), actual.stream() .mapToInt(ArrowRecordBatch::getLength) @@ -304,24 +364,20 @@ private void checkParquetReadResult(Schema schema, List expected, try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) { VectorLoader loader = new VectorLoader(vsr); for (ArrowRecordBatch batch : actual) { - try { - assertEquals(fieldCount, batch.getNodes().size()); - loader.load(batch); - int batchRowCount = vsr.getRowCount(); - for (int i = 0; i < fieldCount; i++) { - FieldVector vector = vsr.getVector(i); - for (int j = 0; j < batchRowCount; j++) { - Object object = vector.getObject(j); - Object expectedObject = expectedRemovable.get(j).get(i); - assertEquals(Objects.toString(expectedObject), - Objects.toString(object)); - } - } - for (int i = 0; i < batchRowCount; i++) { - expectedRemovable.poll(); + assertEquals(fieldCount, batch.getNodes().size()); + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < fieldCount; i++) { + FieldVector vector = vsr.getVector(i); + for (int j = 0; j < batchRowCount; j++) { + Object object = vector.getObject(j); + Object expectedObject = expectedRemovable.get(j).get(i); + assertEquals(Objects.toString(expectedObject), + Objects.toString(object)); } - } finally { - batch.close(); + } + for (int i = 0; i < batchRowCount; i++) { + expectedRemovable.poll(); } } assertTrue(expectedRemovable.isEmpty());