Skip to content

Commit

Permalink
Dataset Java API: Allow passing dataset read options when creating fi…
Browse files Browse the repository at this point in the history
…le format (apache#30)
  • Loading branch information
zhztheplayer authored Aug 9, 2021
1 parent 816d602 commit 397ef98
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 63 deletions.
64 changes: 49 additions & 15 deletions cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,9 @@ void JniAssertOkOrThrow(arrow::Status status) {
void JniThrow(std::string message) { ThrowPendingException(message); }

arrow::Result<std::shared_ptr<arrow::dataset::FileFormat>> GetFileFormat(
jint file_format_id) {
switch (file_format_id) {
case 0:
return std::make_shared<arrow::dataset::ParquetFileFormat>();
case 1:
return std::make_shared<arrow::dataset::CsvFileFormat>();
default:
std::string error_message =
"illegal file format id: " + std::to_string(file_format_id);
return arrow::Status::Invalid(error_message);
}
jlong file_format_id) {
return arrow::jniutil::RetrieveNativeInstance<arrow::dataset::FileFormat>(
file_format_id);
}

class ReserveFromJava : public arrow::jniutil::ReservationListener {
Expand Down Expand Up @@ -710,11 +702,11 @@ Java_org_apache_arrow_dataset_file_JniWrapper_newJniMethodReference(JNIEnv* env,
/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
* Signature: (Ljava/lang/String;II)J
* Signature: (Ljava/lang/String;JJJ)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
JNIEnv* env, jobject, jstring uri, jint file_format_id,
JNIEnv* env, jobject, jstring uri, jlong file_format_id,
jlong start_offset, jlong length) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
Expand All @@ -731,11 +723,11 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: writeFromScannerToFile
* Signature:
* (Lorg/apache/arrow/dataset/jni/NativeSerializedRecordBatchIterator;[BILjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
* (Lorg/apache/arrow/dataset/jni/NativeSerializedRecordBatchIterator;[BJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
*/
JNIEXPORT void JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jint file_format_id,
JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jlong file_format_id,
jstring uri, jobjectArray partition_columns, jint max_partitions,
jstring base_name_template) {
JNI_METHOD_START
Expand Down Expand Up @@ -763,3 +755,45 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
JNI_METHOD_END()
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: createParquetFileFormat
* Signature: ([Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_createParquetFileFormat
(JNIEnv* env, jobject, jobjectArray dict_columns) {
JNI_METHOD_START
auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
auto dict_column_vector = ToStringVector(env, dict_columns);
format->reader_options.dict_columns = std::unordered_set<std::string>(
dict_column_vector.begin(), dict_column_vector.end());
return arrow::jniutil::CreateNativeRef(format);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: createCsvFileFormat
* Signature: (C)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_createCsvFileFormat
(JNIEnv* env, jobject, jchar delimiter){
JNI_METHOD_START
auto format = std::make_shared<arrow::dataset::CsvFileFormat>();
format->parse_options.delimiter = static_cast<char>(delimiter);
return arrow::jniutil::CreateNativeRef(format);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: releaseFileFormatInstance
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_releaseFileFormatInstance
(JNIEnv* env, jobject, jlong instance_id){
JNI_METHOD_START
arrow::jniutil::ReleaseNativeRef<arrow::dataset::FileFormat>(instance_id);
JNI_METHOD_END()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.file.format.FileFormat;
import org.apache.arrow.dataset.jni.NativeSerializedRecordBatchIterator;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.util.AutoCloseables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.file.format.FileFormat;
import org.apache.arrow.dataset.jni.NativeDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.memory.BufferAllocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public native long newJniMethodReference(String classSignature, String methodNam
* intermediate shared_ptr of the factory instance.
*
* @param uri file uri to read
* @param fileFormat file format ID
* @param fileFormat file format instance ID
* @param startOffset random read position. -1 for reading from start.
* @param length reading length. -1 for reading all bytes of the file.
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
* @see org.apache.arrow.dataset.file.format.FileFormat
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat, long startOffset, long length);
public native long makeFileSystemDatasetFactory(String uri, long fileFormat, long startOffset, long length);

/**
* Write all record batches in a {@link NativeSerializedRecordBatchIterator} into files. This internally
Expand All @@ -79,6 +79,12 @@ public native long newJniMethodReference(String classSignature, String methodNam
* ID around all written files.
*/
public native void writeFromScannerToFile(NativeSerializedRecordBatchIterator itr, byte[] schema,
int fileFormat, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate);
long fileFormat, String uri, String[] partitionColumns, int maxPartitions,
String baseNameTemplate);


// todo javadoc
public native long createParquetFileFormat(String[] dictColumns);
public native long createCsvFileFormat(char delimiter);
public native void releaseFileFormatInstance(long nativeInstanceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file.format;

import org.apache.arrow.dataset.file.JniWrapper;
import org.apache.arrow.util.Preconditions;

import java.util.Collections;
import java.util.Map;

public class CsvFileFormat extends FileFormatBase {
public CsvFileFormat(char delimiter) {
super(createCsvFileFormat(delimiter));
}

// Typically for Spark config parsing
public static CsvFileFormat create(Map<String, String> options) {
String delimiter = getOptionValue(options, "delimiter", ",");
Preconditions.checkArgument(delimiter.length() == 1, "Parameter \"delimiter\" must have length 1");
return new CsvFileFormat(delimiter.charAt(0));
}

public static CsvFileFormat createDefault() {
return create(Collections.emptyMap());
}

private static long createCsvFileFormat(char delimiter) {
return JniWrapper.get().createCsvFileFormat(delimiter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,10 @@
* limitations under the License.
*/

package org.apache.arrow.dataset.file;
package org.apache.arrow.dataset.file.format;

/**
* File format definitions.
*/
public enum FileFormat {
PARQUET(0),
CSV(1),
NONE(-1);

private final int id;

FileFormat(int id) {
this.id = id;
}
import java.io.Closeable;

public int id() {
return id;
}
public interface FileFormat extends Closeable {
long id();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file.format;

import org.apache.arrow.dataset.file.JniWrapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;

public abstract class FileFormatBase implements FileFormat {

private final long nativeInstanceId;
private boolean closed = false;

public FileFormatBase(long nativeInstanceId) {
this.nativeInstanceId = nativeInstanceId;
}

protected static String getOptionValue(Map<String, String> options, String key, String fallbackValue) {
return options.getOrDefault(key, fallbackValue);
}

protected static String[] parseStringArray(String commaSeparated) {
final StringTokenizer tokenizer = new StringTokenizer(commaSeparated, ",");
final List<String> tokens = new ArrayList<>();
while (tokenizer.hasMoreTokens()) {
tokens.add(tokenizer.nextToken());
}
return tokens.toArray(new String[0]);
}

@Override
public long id() {
return nativeInstanceId;
}

@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
JniWrapper.get().releaseFileFormatInstance(nativeInstanceId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file.format;

import org.apache.arrow.dataset.file.JniWrapper;

import java.util.Collections;
import java.util.Map;

public class ParquetFileFormat extends FileFormatBase {
private final String[] dictColumns;

public ParquetFileFormat(String[] dictColumns) {
super(createParquetFileFormat(dictColumns));
this.dictColumns = dictColumns;
}

// Typically for Spark config parsing
public static ParquetFileFormat create(Map<String, String> options) {
return new ParquetFileFormat(parseStringArray(getOptionValue(options, "dictColumns", ""))
);
}

public static ParquetFileFormat createDefault() {
return create(Collections.emptyMap());
}

private static long createParquetFileFormat(String[] dictColumns) {
return JniWrapper.get().createParquetFileFormat(dictColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.arrow.dataset.ParquetWriteSupport;
import org.apache.arrow.dataset.TestDataset;
import org.apache.arrow.dataset.file.format.ParquetFileFormat;
import org.apache.arrow.dataset.filter.Filter;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
Expand Down Expand Up @@ -58,14 +59,14 @@ public void testParquetWriteSimple() throws Exception {
1, "a", 2, "b", 3, "c", 2, "d");
String sampleParquet = writeSupport.getOutputURI();
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, sampleParquet);
ParquetFileFormat.createDefault(), sampleParquet);
ScanOptions options = new ScanOptions(new String[0], Filter.EMPTY, 100);
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
final File writtenFolder = TMP.newFolder();
final String writtenParquet = writtenFolder.toURI().toString();
try {
DatasetFileWriter.write(scanner, FileFormat.PARQUET, writtenParquet);
DatasetFileWriter.write(scanner, ParquetFileFormat.createDefault(), writtenParquet);
assertParquetFileEquals(sampleParquet, Objects.requireNonNull(writtenFolder.listFiles())[0].toURI().toString());
} finally {
AutoCloseables.close(factory, scanner, dataset);
Expand All @@ -78,14 +79,14 @@ public void testParquetWriteWithPartitions() throws Exception {
1, "a", 2, "b", 3, "c", 2, "d");
String sampleParquet = writeSupport.getOutputURI();
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, sampleParquet);
ParquetFileFormat.createDefault(), sampleParquet);
ScanOptions options = new ScanOptions(new String[0], Filter.EMPTY, 100);
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
final File writtenFolder = TMP.newFolder();
final String writtenParquet = writtenFolder.toURI().toString();
try {
DatasetFileWriter.write(scanner, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}");
DatasetFileWriter.write(scanner, ParquetFileFormat.createDefault(), writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}");
final Set<String> expectedOutputFiles = new HashSet<>(
Arrays.asList("id=1/name=a/dat_0", "id=2/name=b/dat_1", "id=3/name=c/dat_2", "id=2/name=d/dat_3"));
final Set<String> outputFiles = FileUtils.listFiles(writtenFolder, null, true)
Expand All @@ -102,11 +103,11 @@ public void testParquetWriteWithPartitions() throws Exception {

private void assertParquetFileEquals(String expectedURI, String actualURI) throws Exception {
final FileSystemDatasetFactory expectedFactory = new FileSystemDatasetFactory(
rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, expectedURI);
rootAllocator(), NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), expectedURI);
List<ArrowRecordBatch> expectedBatches = collectResultFromFactory(expectedFactory,
new ScanOptions(new String[0], Filter.EMPTY, 100));
final FileSystemDatasetFactory actualFactory = new FileSystemDatasetFactory(
rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, actualURI);
rootAllocator(), NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), actualURI);
List<ArrowRecordBatch> actualBatches = collectResultFromFactory(actualFactory,
new ScanOptions(new String[0], Filter.EMPTY, 100));
// fast-fail by comparing metadata
Expand Down
Loading

0 comments on commit 397ef98

Please sign in to comment.