Skip to content

Commit

Permalink
ARROW-12480: [Java][Dataset] FileSystemDataset: Support reading from …
Browse files Browse the repository at this point in the history
…a directory
  • Loading branch information
zhztheplayer committed Oct 9, 2021
1 parent 60f49f1 commit e96e1d1
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 12 deletions.
16 changes: 14 additions & 2 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,22 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::string uri, std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options) {
// TODO Partitioning support. Dictionary support should be done before that. See
// ARROW-12481.
std::string internal_path;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<fs::FileSystem> filesystem,
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
arrow::fs::FileSystemFromUri(uri, &internal_path))
ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path))
ARROW_ASSIGN_OR_RAISE(arrow::fs::FileInfo file_info,
filesystem->GetFileInfo(internal_path));
if (file_info.IsDirectory()) {
arrow::fs::FileSelector selector;
selector.base_dir = file_info.path();
selector.recursive = true;
return arrow::dataset::FileSystemDatasetFactory::Make(
std::move(filesystem), std::move(selector), std::move(format),
std::move(options));
}
// is a single file
return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
{file_info}, std::move(filesystem), std::move(format), std::move(options)));
}
Expand Down
5 changes: 5 additions & 0 deletions java/dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
<version>${dep.guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private JniWrapper() {
/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
* @param uri file uri to read
*
* @param uri file uri to read, either a file or a directory
* @param fileFormat file format ID
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import org.apache.arrow.util.Preconditions;
import org.apache.avro.Schema;
Expand All @@ -42,13 +43,15 @@ public class ParquetWriteSupport implements AutoCloseable {
private final Schema avroSchema;
private final List<GenericRecord> writtenRecords = new ArrayList<>();
private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder();
private final Random random = new Random();


public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception {
avroSchema = readSchemaFromFile(schemaName);
path = outputFolder.getPath() + File.separator + "generated.parquet";
path = outputFolder.getPath() + File.separator + "generated-" + random.nextLong() + ".parquet";
uri = "file://" + path;
writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
writer = AvroParquetWriter
.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
.withSchema(avroSchema)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
Expand All @@ -40,7 +45,6 @@
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
Expand All @@ -54,6 +58,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TemporaryFolder;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Primitives;

public class TestFileSystemDataset extends TestNativeDataset {

@ClassRule
Expand Down Expand Up @@ -128,6 +135,29 @@ public void testParquetBatchSize() throws Exception {
AutoCloseables.close(datum);
}

@Test
public void testParquetDirectoryRead() throws Exception {
final File outputFolder = TMP.newFolder();
ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder,
1, "a", 2, "b", 3, "c");
ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder,
4, "e", 5, "f", 6, "g", 7, "h");
String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"],[3,\"c\"],[4,\"e\"],[5,\"f\"],[6,\"g\"],[7,\"h\"]]";

ScanOptions options = new ScanOptions(new String[0], 1);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, outputFolder.toURI().toString());
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertEquals(7, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);

AutoCloseables.close(datum);
}

@Test
public void testCloseAgain() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
Expand Down Expand Up @@ -234,21 +264,54 @@ public void testScanAfterClose3() throws Exception {
public void testMemoryAllocation() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
FileFormat.PARQUET, writeSupport.getOutputURI());
ScanOptions options = new ScanOptions(new String[0], 100);
long initReservation = rootAllocator().getAllocatedMemory();
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
final long expected_diff = datum.stream()
.flatMapToLong(batch -> batch.getBuffers()
.stream()
.mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum();
.flatMapToLong(batch -> batch.getBuffers()
.stream()
.mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum();
long reservation = rootAllocator().getAllocatedMemory();
AutoCloseables.close(datum);
long finalReservation = rootAllocator().getAllocatedMemory();
Assert.assertEquals(expected_diff, reservation - initReservation);
Assert.assertEquals(-expected_diff, finalReservation - reservation);
}

private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
throws IOException {
final ObjectMapper json = new ObjectMapper();
final Set<?> expectedSet = json.readValue(expectedJson, Set.class);
final Set<List<Object>> actualSet = new HashSet<>();
final int fieldCount = schema.getFields().size();
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) {
VectorLoader loader = new VectorLoader(vsr);
for (ArrowRecordBatch batch : actual) {
try {
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < batchRowCount; i++) {
List<Object> row = new ArrayList<>();
for (int j = 0; j < fieldCount; j++) {
Object object = vsr.getVector(j).getObject(i);
if (Primitives.isWrapperType(object.getClass())) {
row.add(object);
} else {
row.add(object.toString());
}
}
actualSet.add(row);
}
} finally {
batch.close();
}
}
}
Assert.assertEquals("Mismatched data read from Parquet, actual: " + json.writeValueAsString(actualSet) + ";",
expectedSet, actualSet);
}

private void checkParquetReadResult(Schema schema, List<GenericRecord> expected, List<ArrowRecordBatch> actual) {
assertEquals(expected.size(), actual.stream()
.mapToInt(ArrowRecordBatch::getLength)
Expand All @@ -263,9 +326,8 @@ private void checkParquetReadResult(Schema schema, List<GenericRecord> expected,
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < fieldCount; i++) {
FieldVector vector = vsr.getVector(i);
for (int j = 0; j < batchRowCount; j++) {
Object object = vector.getObject(j);
Object object = vsr.getVector(i).getObject(j);
Object expectedObject = expectedRemovable.get(j).get(i);
assertEquals(Objects.toString(expectedObject),
Objects.toString(object));
Expand Down

0 comments on commit e96e1d1

Please sign in to comment.