diff --git a/ci/scripts/java_jni_macos_build.sh b/ci/scripts/java_jni_macos_build.sh index 97d6c4cf6cb1b..c38f072709718 100755 --- a/ci/scripts/java_jni_macos_build.sh +++ b/ci/scripts/java_jni_macos_build.sh @@ -72,6 +72,7 @@ cmake \ -DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS} \ -DARROW_CSV=${ARROW_DATASET} \ -DARROW_DATASET=${ARROW_DATASET} \ + -DARROW_SUBSTRAIT=${ARROW_DATASET} \ -DARROW_DEPENDENCY_USE_SHARED=OFF \ -DARROW_GANDIVA=${ARROW_GANDIVA} \ -DARROW_GANDIVA_STATIC_LIBSTDCPP=ON \ diff --git a/ci/scripts/java_jni_manylinux_build.sh b/ci/scripts/java_jni_manylinux_build.sh index 211dff8cf2bba..4e1192a4dbad2 100755 --- a/ci/scripts/java_jni_manylinux_build.sh +++ b/ci/scripts/java_jni_manylinux_build.sh @@ -72,6 +72,7 @@ cmake \ -DARROW_BUILD_TESTS=ON \ -DARROW_CSV=${ARROW_DATASET} \ -DARROW_DATASET=${ARROW_DATASET} \ + -DARROW_SUBSTRAIT=${ARROW_DATASET} \ -DARROW_DEPENDENCY_SOURCE="VCPKG" \ -DARROW_DEPENDENCY_USE_SHARED=OFF \ -DARROW_GANDIVA_PC_CXX_FLAGS=${GANDIVA_CXX_FLAGS} \ diff --git a/ci/scripts/java_jni_windows_build.sh b/ci/scripts/java_jni_windows_build.sh index 954c04050fa4a..778ee9696790e 100755 --- a/ci/scripts/java_jni_windows_build.sh +++ b/ci/scripts/java_jni_windows_build.sh @@ -61,6 +61,7 @@ cmake \ -DARROW_BUILD_TESTS=ON \ -DARROW_CSV=${ARROW_DATASET} \ -DARROW_DATASET=${ARROW_DATASET} \ + -DARROW_SUBSTRAIT=${ARROW_DATASET} \ -DARROW_DEPENDENCY_USE_SHARED=OFF \ -DARROW_ORC=${ARROW_ORC} \ -DARROW_PARQUET=${ARROW_PARQUET} \ diff --git a/docs/source/java/index.rst b/docs/source/java/index.rst index a1e924f9c0975..9b555e297b0f9 100644 --- a/docs/source/java/index.rst +++ b/docs/source/java/index.rst @@ -37,6 +37,7 @@ on the Arrow format and other language bindings see the :doc:`parent documentati flight_sql flight_sql_jdbc_driver dataset + substrait cdata jdbc Reference (javadoc) diff --git a/docs/source/java/substrait.rst b/docs/source/java/substrait.rst new file mode 100644 index 0000000000000..41effedbf01d9 --- /dev/null +++ b/docs/source/java/substrait.rst @@ -0,0 +1,107 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero <../cpp/streaming_execution>` +query engine. + +Executing Substrait Plans +========================= + +Plans can reference data in files via URIs, or "named tables" that must be provided along with the plan. + +Here is an example of a Java program that queries a Parquet file using Java Substrait +(this example use `Substrait Java`_ project to compile a SQL query to a Substrait plan): + +.. code-block:: Java + + import com.google.common.collect.ImmutableList; + import io.substrait.isthmus.SqlToSubstrait; + import io.substrait.proto.Plan; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.calcite.sql.parser.SqlParseException; + + import java.nio.ByteBuffer; + import java.util.HashMap; + import java.util.Map; + + public class ClientSubstrait { + public static void main(String[] args) { + String uri = "file:///data/tpch_parquet/nation.parquet"; + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + BufferAllocator allocator = new RootAllocator(); + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), + FileFormat.PARQUET, uri); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + // map table to reader + Map mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("NATION", reader); + // get binary plan + Plan plan = getPlan(); + ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length); + substraitPlan.put(plan.toByteArray()); + // run query + try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery( + substraitPlan, + mapTableToArrowReader + )) { + while (arrowReader.loadNextBatch()) { + System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString()); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + static Plan getPlan() throws SqlParseException { + String sql = "SELECT * from nation"; + String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + + "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; + SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); + Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation)); + return plan; + } + } + +.. code-block:: text + + // Results example: + FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3) + 0 ALGERIA 0 haggle. carefully final deposits detect slyly agai + 1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon + +.. _`Substrait`: https://substrait.io/ +.. _`Substrait Java`: https://github.com/substrait-io/substrait-java +.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html \ No newline at end of file diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index 315163a537cdd..ede3ee7330d21 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -16,6 +16,7 @@ # under the License. find_package(ArrowDataset REQUIRED) +find_package(ArrowSubstrait REQUIRED) include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR} ${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR}) @@ -26,14 +27,18 @@ add_jar(arrow_java_jni_dataset_jar src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java + src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java GENERATE_NATIVE_HEADERS arrow_java_jni_dataset_headers) add_library(arrow_java_jni_dataset SHARED src/main/cpp/jni_wrapper.cc src/main/cpp/jni_util.cc) set_property(TARGET arrow_java_jni_dataset PROPERTY OUTPUT_NAME "arrow_dataset_jni") -target_link_libraries(arrow_java_jni_dataset arrow_java_jni_dataset_headers jni - ArrowDataset::arrow_dataset_static) +target_link_libraries(arrow_java_jni_dataset + arrow_java_jni_dataset_headers + jni + ArrowDataset::arrow_dataset_static + ArrowSubstrait::arrow_substrait_static) if(BUILD_TESTING) add_executable(arrow-java-jni-dataset-test src/main/cpp/jni_util_test.cc diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index b3b5fe18c7960..48191eac495d0 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include "arrow/array.h" #include "arrow/array/concatenate.h" @@ -24,12 +25,14 @@ #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" +#include "arrow/engine/substrait/util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" #include "jni_util.h" #include "org_apache_arrow_dataset_file_JniWrapper.h" #include "org_apache_arrow_dataset_jni_JniWrapper.h" #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h" +#include "org_apache_arrow_dataset_substrait_JniWrapper.h" namespace { @@ -261,6 +264,52 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { default_memory_pool_id = -1L; } +/// Unpack the named tables passed through JNI. +/// +/// Named tables are encoded as a string array, where every two elements +/// encode (1) the table name and (2) the address of an ArrowArrayStream +/// containing the table data. This function will eagerly read all +/// tables into Tables. +std::unordered_map> LoadNamedTables(JNIEnv* env, const jobjectArray& str_array) { + std::unordered_map> map_table_to_record_batch_reader; + int length = env->GetArrayLength(str_array); + if (length % 2 != 0) { + JniThrow("Can not map odd number of array elements to key/value pairs"); + } + std::shared_ptr output_table; + for (int pos = 0; pos < length; pos++) { + auto j_string_key = reinterpret_cast(env->GetObjectArrayElement(str_array, pos)); + pos++; + auto j_string_value = reinterpret_cast(env->GetObjectArrayElement(str_array, pos)); + uintptr_t memory_address = 0; + try { + memory_address = std::stol(JStringToCString(env, j_string_value)); + } catch(const std::exception& ex) { + JniThrow("Failed to parse memory address from string value. Error: " + std::string(ex.what())); + } catch (...) { + JniThrow("Failed to parse memory address from string value."); + } + auto* arrow_stream_in = reinterpret_cast(memory_address); + std::shared_ptr readerIn = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in)); + output_table = JniGetOrThrow(readerIn->ToTable()); + map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] = output_table; + } + return map_table_to_record_batch_reader; +} + +/// Find the arrow Table associated with a given table name +std::shared_ptr GetTableByName(const std::vector& names, + const std::unordered_map>& tables) { + if (names.size() != 1) { + JniThrow("Tables with hierarchical names are not supported"); + } + const auto& it = tables.find(names[0]); + if (it == tables.end()) { + JniThrow("Table is referenced, but not provided: " + names[0]); + } + return it->second; +} + /* * Class: org_apache_arrow_dataset_jni_NativeMemoryPool * Method: getDefaultMemoryPool @@ -578,3 +627,72 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlan + * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL + Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlan__Ljava_lang_String_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, + jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::unordered_map> map_table_to_reader = + LoadNamedTables(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = + [&map_table_to_reader](const std::vector& names, const arrow::Schema&) { + std::shared_ptr output_table = GetTableByName(names, map_table_to_reader); + std::shared_ptr options = + std::make_shared(std::move(output_table)); + return arrow::acero::Declaration("table_source", {}, options, "java_source"); + }; + arrow::engine::ConversionOptions conversion_options; + conversion_options.named_table_provider = std::move(table_provider); + // execute plan + std::shared_ptr buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan( + JStringToCString(env, plan))); + std::shared_ptr reader_out = + JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options)); + auto* arrow_stream_out = reinterpret_cast(memory_address_output); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlan + * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL + Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlan__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jobject plan, jobjectArray table_to_memory_address_input, + jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::unordered_map> map_table_to_reader = + LoadNamedTables(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = + [&map_table_to_reader](const std::vector& names, const arrow::Schema&) { + std::shared_ptr output_table = GetTableByName(names, map_table_to_reader); + std::shared_ptr options = + std::make_shared(std::move(output_table)); + return arrow::acero::Declaration("table_source", {}, options, "java_source"); + }; + arrow::engine::ConversionOptions conversion_options; + conversion_options.named_table_provider = std::move(table_provider); + // mapping arrow::Buffer + auto *buff = reinterpret_cast(env->GetDirectBufferAddress(plan)); + int length = env->GetDirectBufferCapacity(plan); + std::shared_ptr buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); + std::memcpy(buffer->mutable_data(), buff, length); + // execute plan + std::shared_ptr reader_out = + JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options)); + auto* arrow_stream_out = reinterpret_cast(memory_address_output); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out)); + JNI_METHOD_END() +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java new file mode 100644 index 0000000000000..d5a29ad4e93f3 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan + * in Plan format (JSON) or Binary format (ByteBuffer). + */ +public final class AceroSubstraitConsumer { + private final BufferAllocator allocator; + + public AceroSubstraitConsumer(BufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Run Substrait plan. + * + * @param plan The JSON Substrait plan. + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(String plan) throws Exception { + return runQuery(plan, Collections.emptyMap()); + } + + /** + * Run Substrait plan. + * + * @param plan The JSON Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + *
{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}
+ * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(String plan, Map namedTables) throws Exception { + return execute(plan, namedTables); + } + + /** + * Run Substrait plan. + * + * @param plan the binary Substrait plan. + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(ByteBuffer plan) throws Exception { + return runQuery(plan, Collections.emptyMap()); + } + + /** + * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * + * @param plan the binary Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + *
{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}
+ * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(ByteBuffer plan, Map namedTables) throws Exception { + return execute(plan, namedTables); + } + + private ArrowReader execute(String plan, Map namedTables) throws Exception { + List arrowArrayStream = new ArrayList<>(); + try ( + ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator) + ) { + String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, arrowArrayStream); + JniWrapper.get().executeSerializedPlan( + plan, + mapTableToMemoryAddress, + streamOutput.memoryAddress() + ); + return Data.importArrayStream(this.allocator, streamOutput); + } finally { + AutoCloseables.close(arrowArrayStream); + } + } + + private ArrowReader execute(ByteBuffer plan, Map namedTables) throws Exception { + List arrowArrayStream = new ArrayList<>(); + try ( + ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator) + ) { + String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, arrowArrayStream); + JniWrapper.get().executeSerializedPlan( + plan, + mapTableToMemoryAddress, + streamOutput.memoryAddress() + ); + return Data.importArrayStream(this.allocator, streamOutput); + } finally { + AutoCloseables.close(arrowArrayStream); + } + } + + private String[] getMapTableToMemoryAddress(Map mapTableToArrowReader, + List listStreamInput) { + String[] mapTableToMemoryAddress = new String[mapTableToArrowReader.size() * 2]; + ArrowArrayStream streamInput; + int pos = 0; + for (Map.Entry entries : mapTableToArrowReader.entrySet()) { + streamInput = ArrowArrayStream.allocateNew(this.allocator); + listStreamInput.add(streamInput); + Data.exportArrayStream(this.allocator, entries.getValue(), streamInput); + mapTableToMemoryAddress[pos] = entries.getKey(); + mapTableToMemoryAddress[pos + 1] = String.valueOf(streamInput.memoryAddress()); + pos += 2; + } + return mapTableToMemoryAddress; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java new file mode 100644 index 0000000000000..236d1d5616061 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; + +import org.apache.arrow.dataset.jni.JniLoader; + +/** + * Class that contains Native methods to call Acero C++ Substrait API. It internally depends on C++ function + * arrow::engine::ExecuteSerializedPlan. Currently supported input parameters supported are: + *
+ * - arrow::Buffer: Susbtrait Plan (JSON or Binary format).
+ * - arrow::engine::ConversionOptions: Mapping for arrow::engine::NamedTableProvider.
+ * 
+ */ +final class JniWrapper { + private static final JniWrapper INSTANCE = new JniWrapper(); + + private JniWrapper() { + } + + public static JniWrapper get() { + JniLoader.get().ensureLoaded(); + return INSTANCE; + } + + /** + * Consume the JSON Substrait Plan that contains Named Tables and export the RecordBatchReader into + * C-Data Interface ArrowArrayStream. + * + * @param planInput the JSON Substrait plan. + * @param mapTableToMemoryAddressInput the mapping name of Tables Name on position `i` and theirs Memory Address + * representation on `i+1` position linearly. + *
{@code String[] mapTableToMemoryAddress = new String[2];
+   * mapTableToMemoryAddress[0]="NATION";
+   * mapTableToMemoryAddress[1]="140650250895360";}
+ * @param memoryAddressOutput the memory address where RecordBatchReader is exported. + * + */ + public native void executeSerializedPlan(String planInput, String[] mapTableToMemoryAddressInput, + long memoryAddressOutput); + + /** + * Consume the binary Substrait Plan that contains Named Tables and export the RecordBatchReader into + * C-Data Interface ArrowArrayStream. + * + * @param planInput the binary Substrait plan. + * @param mapTableToMemoryAddressInput the mapping name of Tables Name on position `i` and theirs Memory Address + * representation on `i+1` position linearly. + *
{@code String[] mapTableToMemoryAddress = new String[2];
+   * mapTableToMemoryAddress[0]="NATION";
+   * mapTableToMemoryAddress[1]="140650250895360";}
+ * @param memoryAddressOutput the memory address where RecordBatchReader is exported. + */ + public native void executeSerializedPlan(ByteBuffer planInput, String[] mapTableToMemoryAddressInput, + long memoryAddressOutput); +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java index 2516c409593ba..af2abeee2145f 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -31,6 +31,7 @@ import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.VectorSchemaRoot; @@ -41,8 +42,9 @@ import org.junit.After; import org.junit.Before; + public abstract class TestDataset { - private RootAllocator allocator = null; + private BufferAllocator allocator = null; @Before public void setUp() { @@ -54,7 +56,7 @@ public void tearDown() { allocator.close(); } - protected RootAllocator rootAllocator() { + protected BufferAllocator rootAllocator() { return allocator; } diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java new file mode 100644 index 0000000000000..c23b7e002880a --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystemDatasetFactory; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAceroSubstraitConsumer extends TestDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + public static final String AVRO_SCHEMA_USER = "user.avsc"; + + @Test + public void testRunQueryLocalFiles() throws Exception { + //Query: + //SELECT id, name FROM Users + //Isthmus: + //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users" + //VARCHAR(150) -> is mapping to -> {ARROW:extension:name=varchar, ARROW:extension:metadata=varchar{length:150}} + Map metadataName = new HashMap<>(); + metadataName.put("ARROW:extension:name", "varchar"); + metadataName.put("ARROW:extension:metadata", "varchar{length:150}"); + final Schema schema = new Schema(Arrays.asList( + Field.nullable("ID", new ArrowType.Int(32, true)), + new Field("NAME", new FieldType(true, new ArrowType.Utf8(), null, metadataName), null) + ), Collections.emptyMap()); + ParquetWriteSupport writeSupport = ParquetWriteSupport + .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c"); + try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()) + .runQuery( + new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader() + .getResource("substrait/local_files_users.json").toURI()))).replace("FILENAME_PLACEHOLDER", + writeSupport.getOutputURI()) + ) + ) { + assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema()); + int rowcount = 0; + while (arrowReader.loadNextBatch()) { + rowcount += arrowReader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowcount); + } + } + + @Test + public void testRunQueryNamedTableNation() throws Exception { + //Query: + //SELECT id, name FROM Users + //Isthmus: + //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users" + final Schema schema = new Schema(Arrays.asList( + Field.nullable("ID", new ArrowType.Int(32, true)), + Field.nullable("NAME", new ArrowType.Utf8()) + ), Collections.emptyMap()); + ParquetWriteSupport writeSupport = ParquetWriteSupport + .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c"); + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + Map mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("USERS", reader); + try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery( + new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader() + .getResource("substrait/named_table_users.json").toURI()))), + mapTableToArrowReader + )) { + assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema()); + assertEquals(arrowReader.getVectorSchemaRoot().getSchema(), schema); + int rowcount = 0; + while (arrowReader.loadNextBatch()) { + rowcount += arrowReader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowcount); + } + } + } + + @Test(expected = RuntimeException.class) + public void testRunQueryNamedTableNationWithException() throws Exception { + //Query: + //SELECT id, name FROM Users + //Isthmus: + //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users" + final Schema schema = new Schema(Arrays.asList( + Field.nullable("ID", new ArrowType.Int(32, true)), + Field.nullable("NAME", new ArrowType.Utf8()) + ), Collections.emptyMap()); + ParquetWriteSupport writeSupport = ParquetWriteSupport + .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c"); + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + Map mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("USERS_INVALID_MAP", reader); + try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery( + new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader() + .getResource("substrait/named_table_users.json").toURI()))), + mapTableToArrowReader + )) { + assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema()); + int rowcount = 0; + while (arrowReader.loadNextBatch()) { + rowcount += arrowReader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowcount); + } + } + } + + @Test + public void testRunBinaryQueryNamedTableNation() throws Exception { + //Query: + //SELECT id, name FROM Users + //Isthmus: + //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users" + final Schema schema = new Schema(Arrays.asList( + Field.nullable("ID", new ArrowType.Int(32, true)), + Field.nullable("NAME", new ArrowType.Utf8()) + ), Collections.emptyMap()); + // Base64.getEncoder().encodeToString(plan.toByteArray()); + String binaryPlan = + "Gl8SXQpROk8KBhIECgICAxIvCi0KAgoAEh4KAklECgROQU1FEhIKBCoCEAEKC" + + "LIBBQiWARgBGAI6BwoFVVNFUlMaCBIGCgISACIAGgoSCAoEEgIIASIAEgJJRBIETkFNRQ=="; + ParquetWriteSupport writeSupport = ParquetWriteSupport + .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c"); + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + // map table to reader + Map mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("USERS", reader); + // get binary plan + byte[] plan = Base64.getDecoder().decode(binaryPlan); + ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.length); + substraitPlan.put(plan); + // run query + try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery( + substraitPlan, + mapTableToArrowReader + )) { + assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema()); + int rowcount = 0; + while (arrowReader.loadNextBatch()) { + rowcount += arrowReader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowcount); + } + } + } +} diff --git a/java/dataset/src/test/resources/avroschema/user.avsc b/java/dataset/src/test/resources/avroschema/user.avsc index 072b643912b81..5a4635b6dce7d 100644 --- a/java/dataset/src/test/resources/avroschema/user.avsc +++ b/java/dataset/src/test/resources/avroschema/user.avsc @@ -18,7 +18,7 @@ { "namespace": "org.apache.arrow.dataset", "type": "record", - "name": "User", + "name": "Users", "fields": [ {"name": "id", "type": ["int", "null"]}, {"name": "name", "type": ["string", "null"]} diff --git a/java/dataset/src/test/resources/substrait/local_files_users.json b/java/dataset/src/test/resources/substrait/local_files_users.json new file mode 100644 index 0000000000000..a2f5af1b3b80c --- /dev/null +++ b/java/dataset/src/test/resources/substrait/local_files_users.json @@ -0,0 +1,75 @@ +{ + "extensionUris": [], + "extensions": [], + "relations": [{ + "root": { + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [2, 3] + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["ID", "NAME"], + "struct": { + "types": [{ + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "varchar": { + "length": 150, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "FILENAME_PLACEHOLDER", + "parquet": {} + } + ] + } + } + }, + "expressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }] + } + }, + "names": ["ID", "NAME"] + } + }], + "expectedTypeUrls": [] +} \ No newline at end of file diff --git a/java/dataset/src/test/resources/substrait/named_table_users.json b/java/dataset/src/test/resources/substrait/named_table_users.json new file mode 100644 index 0000000000000..629eebd059776 --- /dev/null +++ b/java/dataset/src/test/resources/substrait/named_table_users.json @@ -0,0 +1,70 @@ +{ + "extensionUris": [], + "extensions": [], + "relations": [{ + "root": { + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [2, 3] + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["ID", "NAME"], + "struct": { + "types": [{ + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "varchar": { + "length": 150, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["USERS"] + } + } + }, + "expressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }] + } + }, + "names": ["ID", "NAME"] + } + }], + "expectedTypeUrls": [] +} \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 05a895d5d5a7e..2a7a3b49205e7 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -151,6 +151,7 @@ **/*.tbl **/*.iml **/flight.properties + **/*.idea/**