Skip to content

Commit

Permalink
ARROW-12480: [Java][Dataset] FileSystemDataset: Support reading from …
Browse files Browse the repository at this point in the history
…a directory

Closes apache#10114 from zhztheplayer/ARROW-12480

Authored-by: Hongze Zhang <[email protected]>
Signed-off-by: Krisztián Szűcs <[email protected]>
  • Loading branch information
zhztheplayer authored and kszucs committed Jan 17, 2022
1 parent bbbe668 commit e12a454
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 21 deletions.
13 changes: 12 additions & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,21 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::string uri, std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options) {
// TODO Partitioning support. Dictionary support should be done before that. See
// ARROW-12481.
std::string internal_path;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<fs::FileSystem> filesystem,
arrow::fs::FileSystemFromUri(uri, &internal_path))
fs::FileSystemFromUri(uri, &internal_path))
ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path))
if (file_info.IsDirectory()) {
fs::FileSelector selector;
selector.base_dir = file_info.path();
selector.recursive = true;
return arrow::dataset::FileSystemDatasetFactory::Make(
std::move(filesystem), std::move(selector), std::move(format),
std::move(options));
}
// is a single file
return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
{file_info}, std::move(filesystem), std::move(format), std::move(options)));
}
Expand Down
5 changes: 5 additions & 0 deletions java/dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
<version>${dep.guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private JniWrapper() {
/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
* @param uri file uri to read
*
* @param uri file uri to read, either a file or a directory
* @param fileFormat file format ID
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import org.apache.arrow.util.Preconditions;
import org.apache.avro.Schema;
Expand All @@ -42,13 +43,15 @@ public class ParquetWriteSupport implements AutoCloseable {
private final Schema avroSchema;
private final List<GenericRecord> writtenRecords = new ArrayList<>();
private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder();
private final Random random = new Random();


public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception {
avroSchema = readSchemaFromFile(schemaName);
path = outputFolder.getPath() + File.separator + "generated.parquet";
path = outputFolder.getPath() + File.separator + "generated-" + random.nextLong() + ".parquet";
uri = "file://" + path;
writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
writer = AvroParquetWriter
.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
.withSchema(avroSchema)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -55,6 +60,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TemporaryFolder;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Primitives;

public class TestFileSystemDataset extends TestNativeDataset {

@ClassRule
Expand Down Expand Up @@ -129,6 +137,29 @@ public void testParquetBatchSize() throws Exception {
AutoCloseables.close(datum);
}

@Test
public void testParquetDirectoryRead() throws Exception {
final File outputFolder = TMP.newFolder();
ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder,
1, "a", 2, "b", 3, "c");
ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder,
4, "e", 5, "f", 6, "g", 7, "h");
String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"],[3,\"c\"],[4,\"e\"],[5,\"f\"],[6,\"g\"],[7,\"h\"]]";

ScanOptions options = new ScanOptions(new String[0], 1);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, outputFolder.toURI().toString());
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertEquals(7, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);

AutoCloseables.close(datum);
}

@Test
public void testEmptyProjectSelectsZeroColumns() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
Expand Down Expand Up @@ -295,6 +326,35 @@ public void testMemoryAllocationOnAssociatedAllocator() throws Exception {
Assert.assertEquals(-expected_diff, finalReservation - reservation);
}

private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
throws IOException {
final ObjectMapper json = new ObjectMapper();
final Set<?> expectedSet = json.readValue(expectedJson, Set.class);
final Set<List<Object>> actualSet = new HashSet<>();
final int fieldCount = schema.getFields().size();
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) {
VectorLoader loader = new VectorLoader(vsr);
for (ArrowRecordBatch batch : actual) {
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < batchRowCount; i++) {
List<Object> row = new ArrayList<>();
for (int j = 0; j < fieldCount; j++) {
Object object = vsr.getVector(j).getObject(i);
if (Primitives.isWrapperType(object.getClass())) {
row.add(object);
} else {
row.add(object.toString());
}
}
actualSet.add(row);
}
}
}
Assert.assertEquals("Mismatched data read from Parquet, actual: " + json.writeValueAsString(actualSet) + ";",
expectedSet, actualSet);
}

private void checkParquetReadResult(Schema schema, List<GenericRecord> expected, List<ArrowRecordBatch> actual) {
assertEquals(expected.size(), actual.stream()
.mapToInt(ArrowRecordBatch::getLength)
Expand All @@ -304,24 +364,20 @@ private void checkParquetReadResult(Schema schema, List<GenericRecord> expected,
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) {
VectorLoader loader = new VectorLoader(vsr);
for (ArrowRecordBatch batch : actual) {
try {
assertEquals(fieldCount, batch.getNodes().size());
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < fieldCount; i++) {
FieldVector vector = vsr.getVector(i);
for (int j = 0; j < batchRowCount; j++) {
Object object = vector.getObject(j);
Object expectedObject = expectedRemovable.get(j).get(i);
assertEquals(Objects.toString(expectedObject),
Objects.toString(object));
}
}
for (int i = 0; i < batchRowCount; i++) {
expectedRemovable.poll();
assertEquals(fieldCount, batch.getNodes().size());
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < fieldCount; i++) {
FieldVector vector = vsr.getVector(i);
for (int j = 0; j < batchRowCount; j++) {
Object object = vector.getObject(j);
Object expectedObject = expectedRemovable.get(j).get(i);
assertEquals(Objects.toString(expectedObject),
Objects.toString(object));
}
} finally {
batch.close();
}
for (int i = 0; i < batchRowCount; i++) {
expectedRemovable.poll();
}
}
assertTrue(expectedRemovable.isEmpty());
Expand Down

0 comments on commit e12a454

Please sign in to comment.