Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12480: [Java][Dataset] FileSystemDataset: Support reading from a directory #10114

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,21 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::string uri, std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options) {
// TODO Partitioning support. Dictionary support should be done before that. See
// ARROW-12481.
std::string internal_path;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<fs::FileSystem> filesystem,
arrow::fs::FileSystemFromUri(uri, &internal_path))
fs::FileSystemFromUri(uri, &internal_path))
ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path))
if (file_info.IsDirectory()) {
fs::FileSelector selector;
selector.base_dir = file_info.path();
selector.recursive = true;
return arrow::dataset::FileSystemDatasetFactory::Make(
std::move(filesystem), std::move(selector), std::move(format),
std::move(options));
}
// is a single file
return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
{file_info}, std::move(filesystem), std::move(format), std::move(options)));
}
Expand Down
5 changes: 5 additions & 0 deletions java/dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
<version>${dep.guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private JniWrapper() {
/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
* @param uri file uri to read
*
* @param uri file uri to read, either a file or a directory
* @param fileFormat file format ID
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import org.apache.arrow.util.Preconditions;
import org.apache.avro.Schema;
Expand All @@ -42,13 +43,15 @@ public class ParquetWriteSupport implements AutoCloseable {
private final Schema avroSchema;
private final List<GenericRecord> writtenRecords = new ArrayList<>();
private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder();
private final Random random = new Random();
liyafan82 marked this conversation as resolved.
Show resolved Hide resolved


public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception {
avroSchema = readSchemaFromFile(schemaName);
path = outputFolder.getPath() + File.separator + "generated.parquet";
path = outputFolder.getPath() + File.separator + "generated-" + random.nextLong() + ".parquet";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this change wants to get a unique name for a short period.

How about using System.currentTimeMillis()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't milli cause collisions? ParquetWriteSupport is instantiated rapidly during ut execution.

By the way any special reason to use time rather than random? The files can be dropped after finishing ut. Users should not be aware of the files.

uri = "file://" + path;
writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
writer = AvroParquetWriter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we need this format change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a trivial change to keep code readable.

.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
.withSchema(avroSchema)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -55,6 +60,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TemporaryFolder;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Primitives;

public class TestFileSystemDataset extends TestNativeDataset {

@ClassRule
Expand Down Expand Up @@ -129,6 +137,29 @@ public void testParquetBatchSize() throws Exception {
AutoCloseables.close(datum);
}

@Test
public void testParquetDirectoryRead() throws Exception {
final File outputFolder = TMP.newFolder();
ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder,
1, "a", 2, "b", 3, "c");
ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, outputFolder,
4, "e", 5, "f", 6, "g", 7, "h");
String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"],[3,\"c\"],[4,\"e\"],[5,\"f\"],[6,\"g\"],[7,\"h\"]]";

ScanOptions options = new ScanOptions(new String[0], 1);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, outputFolder.toURI().toString());
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertEquals(7, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);

AutoCloseables.close(datum);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use try (... datum = ...) { ... } at line 153? So, we can remove this line.

Copy link
Member Author

@zhztheplayer zhztheplayer Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that datum is of type List that is not AutoClosable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe place this in a finally block?

}

@Test
public void testEmptyProjectSelectsZeroColumns() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
Expand Down Expand Up @@ -295,6 +326,35 @@ public void testMemoryAllocationOnAssociatedAllocator() throws Exception {
Assert.assertEquals(-expected_diff, finalReservation - reservation);
}

private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
throws IOException {
final ObjectMapper json = new ObjectMapper();
final Set<?> expectedSet = json.readValue(expectedJson, Set.class);
final Set<List<Object>> actualSet = new HashSet<>();
final int fieldCount = schema.getFields().size();
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) {
VectorLoader loader = new VectorLoader(vsr);
for (ArrowRecordBatch batch : actual) {
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < batchRowCount; i++) {
List<Object> row = new ArrayList<>();
for (int j = 0; j < fieldCount; j++) {
Object object = vsr.getVector(j).getObject(i);
if (Primitives.isWrapperType(object.getClass())) {
row.add(object);
} else {
row.add(object.toString());
}
}
actualSet.add(row);
}
}
}
Assert.assertEquals("Mismatched data read from Parquet, actual: " + json.writeValueAsString(actualSet) + ";",
expectedSet, actualSet);
}

private void checkParquetReadResult(Schema schema, List<GenericRecord> expected, List<ArrowRecordBatch> actual) {
assertEquals(expected.size(), actual.stream()
.mapToInt(ArrowRecordBatch::getLength)
Expand All @@ -304,24 +364,20 @@ private void checkParquetReadResult(Schema schema, List<GenericRecord> expected,
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) {
VectorLoader loader = new VectorLoader(vsr);
for (ArrowRecordBatch batch : actual) {
try {
assertEquals(fieldCount, batch.getNodes().size());
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < fieldCount; i++) {
FieldVector vector = vsr.getVector(i);
for (int j = 0; j < batchRowCount; j++) {
Object object = vector.getObject(j);
Object expectedObject = expectedRemovable.get(j).get(i);
assertEquals(Objects.toString(expectedObject),
Objects.toString(object));
}
}
for (int i = 0; i < batchRowCount; i++) {
expectedRemovable.poll();
assertEquals(fieldCount, batch.getNodes().size());
loader.load(batch);
int batchRowCount = vsr.getRowCount();
for (int i = 0; i < fieldCount; i++) {
FieldVector vector = vsr.getVector(i);
for (int j = 0; j < batchRowCount; j++) {
Object object = vector.getObject(j);
Object expectedObject = expectedRemovable.get(j).get(i);
assertEquals(Objects.toString(expectedObject),
Objects.toString(object));
}
} finally {
batch.close();
}
for (int i = 0; i < batchRowCount; i++) {
expectedRemovable.poll();
}
}
assertTrue(expectedRemovable.isEmpty());
Expand Down