From 47628154cb98958dd87b532c46d66c68f7c98534 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 21 Apr 2021 01:03:26 +0800 Subject: [PATCH 1/6] ARROW-12480: [Java][Dataset] FileSystemDataset: Support reading from a directory --- cpp/src/arrow/dataset/discovery.cc | 16 ++++- java/dataset/pom.xml | 5 ++ .../apache/arrow/dataset/file/JniWrapper.java | 3 +- .../arrow/dataset/ParquetWriteSupport.java | 7 +- .../dataset/file/TestFileSystemDataset.java | 64 +++++++++++++++++++ 5 files changed, 90 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 0f9d479b9d6b1..a51be745b59e0 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -210,10 +210,22 @@ Result> FileSystemDatasetFactory::Make( Result> FileSystemDatasetFactory::Make( std::string uri, std::shared_ptr format, FileSystemFactoryOptions options) { + // TODO Partitioning support. Dictionary support should be done before that. See + // ARROW-12481. std::string internal_path; - ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, + ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, arrow::fs::FileSystemFromUri(uri, &internal_path)) - ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) + ARROW_ASSIGN_OR_RAISE(arrow::fs::FileInfo file_info, + filesystem->GetFileInfo(internal_path)); + if (file_info.IsDirectory()) { + arrow::fs::FileSelector selector; + selector.base_dir = file_info.path(); + selector.recursive = true; + return arrow::dataset::FileSystemDatasetFactory::Make( + std::move(filesystem), std::move(selector), std::move(format), + std::move(options)); + } + // is a single file return std::shared_ptr(new FileSystemDatasetFactory( {file_info}, std::move(filesystem), std::move(format), std::move(options))); } diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index 4eeadb1df3b50..2a0719200cc20 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -98,6 +98,11 @@ ${dep.guava.version} test + + com.fasterxml.jackson.core + jackson-databind + test + diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 1af307aac3816..f69d8205192c0 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -37,7 +37,8 @@ private JniWrapper() { /** * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a * intermediate shared_ptr of the factory instance. - * @param uri file uri to read + * + * @param uri file uri to read, either a file or a directory * @param fileFormat file format ID * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. * @see FileFormat diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java index c6299d135a030..efdaf46dd562a 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Random; import org.apache.arrow.util.Preconditions; import org.apache.avro.Schema; @@ -42,13 +43,15 @@ public class ParquetWriteSupport implements AutoCloseable { private final Schema avroSchema; private final List writtenRecords = new ArrayList<>(); private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder(); + private final Random random = new Random(); public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception { avroSchema = readSchemaFromFile(schemaName); - path = outputFolder.getPath() + File.separator + "generated.parquet"; + path = outputFolder.getPath() + File.separator + "generated-" + random.nextLong() + ".parquet"; uri = "file://" + path; - writer = AvroParquetWriter.builder(new org.apache.hadoop.fs.Path(path)) + writer = AvroParquetWriter + .builder(new org.apache.hadoop.fs.Path(path)) .withSchema(avroSchema) .build(); } diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 2b99f8283deb0..af3338aff0305 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -22,11 +22,16 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -55,6 +60,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.rules.TemporaryFolder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.primitives.Primitives; + public class TestFileSystemDataset extends TestNativeDataset { @ClassRule @@ -129,6 +137,29 @@ public void testParquetBatchSize() throws Exception { AutoCloseables.close(datum); } + @Test + public void testParquetDirectoryRead() throws Exception { + final File outputFolder = TMP.newFolder(); + ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder, + 1, "a", 2, "b", 3, "c"); + ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder, + 4, "e", 5, "f", 6, "g", 7, "h"); + String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"],[3,\"c\"],[4,\"e\"],[5,\"f\"],[6,\"g\"],[7,\"h\"]]"; + + ScanOptions options = new ScanOptions(new String[0], 1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, outputFolder.toURI().toString()); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(7, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, expectedJsonUnordered, datum); + + AutoCloseables.close(datum); + } + @Test public void testEmptyProjectSelectsZeroColumns() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); @@ -295,6 +326,39 @@ public void testMemoryAllocationOnAssociatedAllocator() throws Exception { Assert.assertEquals(-expected_diff, finalReservation - reservation); } + private void checkParquetReadResult(Schema schema, String expectedJson, List actual) + throws IOException { + final ObjectMapper json = new ObjectMapper(); + final Set expectedSet = json.readValue(expectedJson, Set.class); + final Set> actualSet = new HashSet<>(); + final int fieldCount = schema.getFields().size(); + try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) { + VectorLoader loader = new VectorLoader(vsr); + for (ArrowRecordBatch batch : actual) { + try { + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < batchRowCount; i++) { + List row = new ArrayList<>(); + for (int j = 0; j < fieldCount; j++) { + Object object = vsr.getVector(j).getObject(i); + if (Primitives.isWrapperType(object.getClass())) { + row.add(object); + } else { + row.add(object.toString()); + } + } + actualSet.add(row); + } + } finally { + batch.close(); + } + } + } + Assert.assertEquals("Mismatched data read from Parquet, actual: " + json.writeValueAsString(actualSet) + ";", + expectedSet, actualSet); + } + private void checkParquetReadResult(Schema schema, List expected, List actual) { assertEquals(expected.size(), actual.stream() .mapToInt(ArrowRecordBatch::getLength) From 0234893cecda1e897827b529a862c2cb434fdd7d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 3 Dec 2021 15:26:53 +0800 Subject: [PATCH 2/6] style --- cpp/src/arrow/dataset/discovery.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index a51be745b59e0..6bea4b8393103 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -213,8 +213,8 @@ Result> FileSystemDatasetFactory::Make( // TODO Partitioning support. Dictionary support should be done before that. See // ARROW-12481. std::string internal_path; - ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, - arrow::fs::FileSystemFromUri(uri, &internal_path)) + ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, + fs::FileSystemFromUri(uri, &internal_path)) ARROW_ASSIGN_OR_RAISE(arrow::fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)); if (file_info.IsDirectory()) { From ecf6a9891a5529984fe8e7949287da0903eedbc0 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 15 Dec 2021 11:29:32 +0800 Subject: [PATCH 3/6] style --- cpp/src/arrow/dataset/discovery.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 6bea4b8393103..1f049e8a75648 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -215,10 +215,10 @@ Result> FileSystemDatasetFactory::Make( std::string internal_path; ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, fs::FileSystemFromUri(uri, &internal_path)) - ARROW_ASSIGN_OR_RAISE(arrow::fs::FileInfo file_info, + ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)); if (file_info.IsDirectory()) { - arrow::fs::FileSelector selector; + fs::FileSelector selector; selector.base_dir = file_info.path(); selector.recursive = true; return arrow::dataset::FileSystemDatasetFactory::Make( From 1a6353a3de342a580e1dcf74b892632231fd32ed Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 17 Dec 2021 11:43:47 +0800 Subject: [PATCH 4/6] fix --- .../dataset/file/TestFileSystemDataset.java | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index af3338aff0305..0e0d0b784ff93 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -368,24 +368,20 @@ private void checkParquetReadResult(Schema schema, List expected, try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) { VectorLoader loader = new VectorLoader(vsr); for (ArrowRecordBatch batch : actual) { - try { - assertEquals(fieldCount, batch.getNodes().size()); - loader.load(batch); - int batchRowCount = vsr.getRowCount(); - for (int i = 0; i < fieldCount; i++) { - FieldVector vector = vsr.getVector(i); - for (int j = 0; j < batchRowCount; j++) { - Object object = vector.getObject(j); - Object expectedObject = expectedRemovable.get(j).get(i); - assertEquals(Objects.toString(expectedObject), - Objects.toString(object)); - } + assertEquals(fieldCount, batch.getNodes().size()); + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < fieldCount; i++) { + FieldVector vector = vsr.getVector(i); + for (int j = 0; j < batchRowCount; j++) { + Object object = vector.getObject(j); + Object expectedObject = expectedRemovable.get(j).get(i); + assertEquals(Objects.toString(expectedObject), + Objects.toString(object)); } - for (int i = 0; i < batchRowCount; i++) { - expectedRemovable.poll(); - } - } finally { - batch.close(); + } + for (int i = 0; i < batchRowCount; i++) { + expectedRemovable.poll(); } } assertTrue(expectedRemovable.isEmpty()); From f4544163ca7d6b858e6c7da7b55b5a8225b9fc00 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 17 Dec 2021 12:13:15 +0800 Subject: [PATCH 5/6] style --- cpp/src/arrow/dataset/discovery.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 1f049e8a75648..f8382aa405203 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -215,8 +215,7 @@ Result> FileSystemDatasetFactory::Make( std::string internal_path; ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, fs::FileSystemFromUri(uri, &internal_path)) - ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, - filesystem->GetFileInfo(internal_path)); + ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) if (file_info.IsDirectory()) { fs::FileSelector selector; selector.base_dir = file_info.path(); From 192ea1d68168d30b332dc69145ad3d46f85a2bc0 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 17 Dec 2021 22:03:55 +0800 Subject: [PATCH 6/6] fix --- .../dataset/file/TestFileSystemDataset.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 0e0d0b784ff93..83d57c7421bcf 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -335,23 +335,19 @@ private void checkParquetReadResult(Schema schema, String expectedJson, List row = new ArrayList<>(); - for (int j = 0; j < fieldCount; j++) { - Object object = vsr.getVector(j).getObject(i); - if (Primitives.isWrapperType(object.getClass())) { - row.add(object); - } else { - row.add(object.toString()); - } + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < batchRowCount; i++) { + List row = new ArrayList<>(); + for (int j = 0; j < fieldCount; j++) { + Object object = vsr.getVector(j).getObject(i); + if (Primitives.isWrapperType(object.getClass())) { + row.add(object); + } else { + row.add(object.toString()); } - actualSet.add(row); } - } finally { - batch.close(); + actualSet.add(row); } } }