diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt index 432fd2f9a800e..e11b3bd0ab25e 100644 --- a/cpp/examples/arrow/CMakeLists.txt +++ b/cpp/examples/arrow/CMakeLists.txt @@ -23,8 +23,6 @@ endif() if(ARROW_SUBSTRAIT) add_arrow_example(engine_substrait_consumption EXTRA_LINK_LIBS arrow_substrait_shared) - add_arrow_example(tpch_q6_demo EXTRA_LINK_LIBS arrow_substrait_shared) - add_arrow_example(tpch_q1_demo EXTRA_LINK_LIBS arrow_substrait_shared) endif() if(ARROW_COMPUTE AND ARROW_CSV) diff --git a/cpp/examples/arrow/tpch_q1_demo.cc b/cpp/examples/arrow/tpch_q1_demo.cc deleted file mode 100644 index 5a546b6a10f59..0000000000000 --- a/cpp/examples/arrow/tpch_q1_demo.cc +++ /dev/null @@ -1,1525 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include - -#include -#include -#include -#include -#include -#include "arrow/util/async_generator.h" - -namespace eng = arrow::engine; -namespace cp = arrow::compute; - -#define ABORT_ON_FAILURE(expr) \ - do { \ - arrow::Status status_ = (expr); \ - if (!status_.ok()) { \ - std::cerr << status_.message() << std::endl; \ - abort(); \ - } \ - } while (0); - -arrow::Future> GetSubstraitPlanStage0() { - std::string substrait_json = R"({ - "extensions": [ - { - "extensionFunction": { - "functionAnchor": 6, - "name": "sum:opt_fp64" - } - }, - { - "extensionFunction": { - "name": "is_not_null:date" - } - }, - { - "extensionFunction": { - "functionAnchor": 1, - "name": "lte:date_date" - } - }, - { - "extensionFunction": { - "functionAnchor": 3, - "name": "subtract:opt_fp64_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 2, - "name": "and:bool_bool" - } - }, - { - "extensionFunction": { - "functionAnchor": 5, - "name": "add:opt_fp64_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 7, - "name": "avg:opt_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 4, - "name": "multiply:opt_fp64_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 8, - "name": "count:opt_i32" - } - } - ], - "relations": [ - { - "root": { - "input": { - "aggregate": { - "common": { - "direct": {} - }, - "input": { - "project": { - "common": { - "direct": {} - }, - "input": { - "project": { - "common": { - "direct": {} - }, - "input": { - "filter": { - "common": { - "direct": {} - }, - "input": { - "read": { - "common": { - "direct": {} - }, - "baseSchema": { - "names": [ - "l_quantity", - "l_extendedprice", - "l_discount", - "l_tax", - "l_returnflag", - "l_linestatus", - "l_shipdate" - ], - "struct": { - "types": [ - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "string": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "string": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "date": { - "nullability": "NULLABILITY_NULLABLE" - } - } - ] - } - }, - "filter": { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - }, - { - "literal": { - "date": 10470 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - "localFiles": { - "items": [ - { - "uriFile": "file:///home/rong/benchmark/data/tpch1_d4d/lineitem/part-00000-678b839d-2286-4750-923f-653adc66ed74-c000.snappy.parquet", - "format": "FILE_FORMAT_PARQUET", - "length": "206349871" - } - ] - } - } - }, - "condition": { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - }, - { - "literal": { - "date": 10470 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - } - }, - "expressions": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 4 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 5 - } - } - } - } - ] - } - }, - "expressions": [ - { - "selection": { - "directReference": { - "structField": { - "field": 4 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 5 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "scalarFunction": { - "functionReference": 4, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "literal": { - "fp64": 1 - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 4, - "args": [ - { - "scalarFunction": { - "functionReference": 4, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "literal": { - "fp64": 1 - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 5, - "args": [ - { - "literal": { - "fp64": 1 - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - }, - { - "literal": { - "i32": 1 - } - } - ] - } - }, - "groupings": [ - { - "groupingExpressions": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - } - ] - } - ], - "measures": [ - { - "measure": { - "functionReference": 6, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 6, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 6, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 4 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 6, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 5 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 7, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 7, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 7 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 7, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 8 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 8, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 9 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "i64": { - "nullability": "NULLABILITY_REQUIRED" - } - } - } - } - ] - } - }, - "names": [ - "l_returnflag", - "l_linestatus", - "sum", - "sum", - "sum", - "sum", - "sum", - "count", - "sum", - "count", - "sum", - "count", - "count" - ] - } - } - ] - })"; - return eng::internal::SubstraitFromJSON("Plan", substrait_json); -} - -arrow::Future> GetSubstraitPlanStage1() { - std::string substrait_json = R"({ - "extensions": [ - { - "extensionFunction": { - "name": "sum:opt_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 3, - "name": "alias:fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 4, - "name": "alias:i64" - } - }, - { - "extensionFunction": { - "functionAnchor": 1, - "name": "avg:opt_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 2, - "name": "count:opt_i32" - } - } - ], - "relations": [ - { - "root": { - "input": { - "project": { - "common": { - "direct": {} - }, - "input": { - "aggregate": { - "common": { - "direct": {} - }, - "input": { - "read": { - "common": { - "direct": {} - }, - "baseSchema": { - "names": [ - "l_returnflag", - "l_linestatus", - "sum", - "sum", - "sum", - "sum", - "sum", - "count", - "sum", - "count", - "sum", - "count", - "count" - ], - "struct": { - "types": [ - { - "string": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "string": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "i64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "i64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "i64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "i64": { - "nullability": "NULLABILITY_REQUIRED" - } - } - ] - } - }, - "localFiles": { - "items": [ - { - "uriFile": "iterator:0" - } - ] - } - } - }, - "groupings": [ - { - "groupingExpressions": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - } - ] - } - ], - "measures": [ - { - "measure": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 4 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 5 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 7 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 8 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 9 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 10 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 11 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "measure": { - "functionReference": 2, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 12 - } - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "i64": { - "nullability": "NULLABILITY_REQUIRED" - } - } - } - } - ] - } - }, - "expressions": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 4 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 5 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 6 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 7 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 8 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 4, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 9 - } - } - } - } - ], - "outputType": { - "i64": { - "nullability": "NULLABILITY_REQUIRED" - } - } - } - } - ] - } - } - } - } - ] - })"; - return eng::internal::SubstraitFromJSON("Plan", substrait_json); -} - -void ReplaceSourceDecls(std::vector source_decls, - arrow::compute::Declaration* decl) { - std::vector visited; - std::vector source_indexes; - visited.push_back(decl); - - while (!visited.empty()) { - auto top = visited.back(); - visited.pop_back(); - for (auto& input : top->inputs) { - auto& input_decl = arrow::util::get(input); - if (input_decl.factory_name == "source_index") { - source_indexes.push_back(&input_decl); - } else { - visited.push_back(&input_decl); - } - } - } - - assert(source_indexes.size() == source_decls.size()); - for (auto& source_index : source_indexes) { - auto index = - arrow::internal::checked_pointer_cast( - source_index->options) - ->index; - *source_index = std::move(source_decls[index]); - } -} - -void ReadSink( - std::shared_ptr output_schema, - arrow::AsyncGenerator> sink_gen) { - // Call sink_reader Iterator interface - std::shared_ptr sink_reader = cp::MakeGeneratorReader( - std::move(output_schema), std::move(sink_gen), arrow::default_memory_pool()); - std::cout << std::string(50, '#') << " consuming batches:" << std::endl; - int batch_cnt = 0; - while (true) { - auto maybe_batch = sink_reader->Next(); - ABORT_ON_FAILURE(maybe_batch.status()); - auto batch = std::move(maybe_batch).ValueOrDie(); - if (!batch) break; - std::cout << "Batch #" << batch_cnt++ << " Num output rows: " << batch->num_rows() - << std::endl; - std::cout << "Result: " << batch->ToString() << std::endl; - } -} - -int main(int argc, char** argv) { - std::shared_ptr input_schema; - arrow::AsyncGenerator> gen; - - std::vector decls0; - // ------------------------ Stage 0 ------------------------- - { - auto maybe_serialized_plan = GetSubstraitPlanStage0().result(); - ABORT_ON_FAILURE(maybe_serialized_plan.status()); - std::shared_ptr serialized_plan = - std::move(maybe_serialized_plan).ValueOrDie(); - - // Print the received plan to stdout as JSON - arrow::Result maybe_plan_json = - eng::internal::SubstraitToJSON("Plan", *serialized_plan); - ABORT_ON_FAILURE(maybe_plan_json.status()); - std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; - std::cout << maybe_plan_json.ValueOrDie() << std::endl; - - // Deserialize each relation tree in the substrait plan to an Arrow compute - // Declaration - arrow::Result> maybe_decls = - eng::DeserializePlan(*serialized_plan); - ABORT_ON_FAILURE(maybe_decls.status()); - // std::vector decls = std::move(maybe_decls).ValueOrDie(); - decls0 = std::move(maybe_decls).ValueOrDie(); - - // It's safe to drop the serialized plan; we don't leave references to its memory - serialized_plan.reset(); - - auto& decl = decls0[0]; - - // Construct an empty plan (note: configure Function registry and ThreadPool here) - arrow::Result> maybe_plan = cp::ExecPlan::Make(); - ABORT_ON_FAILURE(maybe_plan.status()); - std::shared_ptr plan = std::move(maybe_plan).ValueOrDie(); - - auto maybe_node = decl.AddToPlan(plan.get()); - ABORT_ON_FAILURE(maybe_node.status()); - auto node = std::move(maybe_node).ValueOrDie(); - std::cout << "schema: " << node->output_schema()->ToString() << std::endl; - - input_schema = node->output_schema(); - - // add sink node - auto sink_node = arrow::compute::MakeExecNode("sink", plan.get(), {node}, - arrow::compute::SinkNodeOptions{&gen}); - ABORT_ON_FAILURE(sink_node.status()); - - // Validate the plan and print it to stdout - ABORT_ON_FAILURE(plan->Validate()); - std::cout << std::string(50, '#') << " produced arrow::ExecPlan:" << std::endl; - std::cout << plan->ToString() << std::endl; - - // Start the plan... - ABORT_ON_FAILURE(plan->StartProducing()); - - // ReadSink(node->output_schema(), std::move(gen)); - - ABORT_ON_FAILURE(plan->finished().status()); - } - - // ------------------------ Stage 1 ------------------------- - - { - auto maybe_serialized_plan = GetSubstraitPlanStage1().result(); - ABORT_ON_FAILURE(maybe_serialized_plan.status()); - std::shared_ptr serialized_plan = - std::move(maybe_serialized_plan).ValueOrDie(); - - // Print the received plan to stdout as JSON - arrow::Result maybe_plan_json = - eng::internal::SubstraitToJSON("Plan", *serialized_plan); - ABORT_ON_FAILURE(maybe_plan_json.status()); - std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; - std::cout << maybe_plan_json.ValueOrDie() << std::endl; - - // Deserialize each relation tree in the substrait plan to an Arrow compute - // Declaration - arrow::Result> maybe_decls = - eng::DeserializePlan(*serialized_plan); - ABORT_ON_FAILURE(maybe_decls.status()); - std::vector decls = std::move(maybe_decls).ValueOrDie(); - - // It's safe to drop the serialized plan; we don't leave references to its memory - serialized_plan.reset(); - - // Add source node - std::vector source = { - arrow::compute::Declaration{"source", cp::SourceNodeOptions{input_schema, gen}}}; - ReplaceSourceDecls(std::move(source), &decls[0]); - auto& decl = decls[0]; - - // Construct an empty plan (note: configure Function registry and ThreadPool here) - arrow::Result> maybe_plan = cp::ExecPlan::Make(); - ABORT_ON_FAILURE(maybe_plan.status()); - std::shared_ptr plan = std::move(maybe_plan).ValueOrDie(); - - // Add decls to plan (note: configure ExecNode registry before this point) - auto* node = decl.AddToPlan(plan.get()).ValueOrDie(); - std::cout << "schema: " << node->output_schema()->ToString() << std::endl; - - auto output_schema = node->output_schema(); - - // add sink node - arrow::AsyncGenerator> sink_gen; - auto sink_node = arrow::compute::MakeExecNode( - "sink", plan.get(), {node}, arrow::compute::SinkNodeOptions{&sink_gen}); - ABORT_ON_FAILURE(sink_node.status()); - - // Validate the plan and print it to stdout - ABORT_ON_FAILURE(plan->Validate()); - std::cout << std::string(50, '#') << " produced arrow::ExecPlan:" << std::endl; - std::cout << plan->ToString() << std::endl; - - // Start the plan... - std::cout << std::string(50, '#') << " consuming batches:" << std::endl; - ABORT_ON_FAILURE(plan->StartProducing()); - - ReadSink(std::move(output_schema), std::move(sink_gen)); - - // ... and wait for it to finish - ABORT_ON_FAILURE(plan->finished().status()); - } - - return EXIT_SUCCESS; -} diff --git a/cpp/examples/arrow/tpch_q6_demo.cc b/cpp/examples/arrow/tpch_q6_demo.cc deleted file mode 100644 index 63db191159038..0000000000000 --- a/cpp/examples/arrow/tpch_q6_demo.cc +++ /dev/null @@ -1,1088 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include - -#include -#include -#include -#include -#include -#include "arrow/util/async_generator.h" -#include "arrow/util/checked_cast.h" - -namespace eng = arrow::engine; -namespace cp = arrow::compute; - -#define ABORT_ON_FAILURE(expr) \ - do { \ - arrow::Status status_ = (expr); \ - if (!status_.ok()) { \ - std::cerr << status_.message() << std::endl; \ - abort(); \ - } \ - } while (0); - -arrow::Future> GetSubstraitPlanStage0() { - std::string substrait_json = R"({ - "extensions": [ - { - "extensionFunction": { - "functionAnchor": 6, - "name": "lte:fp64_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 9, - "name": "sum:opt_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 4, - "name": "lt:date_date" - } - }, - { - "extensionFunction": { - "name": "is_not_null:date" - } - }, - { - "extensionFunction": { - "functionAnchor": 3, - "name": "gte:date_date" - } - }, - { - "extensionFunction": { - "functionAnchor": 7, - "name": "lt:fp64_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 1, - "name": "is_not_null:fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 2, - "name": "and:bool_bool" - } - }, - { - "extensionFunction": { - "functionAnchor": 5, - "name": "gte:fp64_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 8, - "name": "multiply:opt_fp64_fp64" - } - } - ], - "relations": [ - { - "root": { - "input": { - "aggregate": { - "common": { - "direct": {} - }, - "input": { - "project": { - "common": { - "direct": {} - }, - "input": { - "project": { - "common": { - "direct": {} - }, - "input": { - "filter": { - "common": { - "direct": {} - }, - "input": { - "read": { - "common": { - "direct": {} - }, - "baseSchema": { - "names": [ - "l_quantity", - "l_extendedprice", - "l_discount", - "l_shipdate" - ], - "struct": { - "types": [ - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - }, - { - "date": { - "nullability": "NULLABILITY_NULLABLE" - } - } - ] - } - }, - "filter": { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - }, - { - "literal": { - "date": 8766 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 4, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - }, - { - "literal": { - "date": 9131 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 5, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - }, - { - "literal": { - "fp64": 0.05 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 6, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - }, - { - "literal": { - "fp64": 0.07 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 7, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "literal": { - "fp64": 24 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - "localFiles": { - "items": [ - { - "uriFile": "file:///home/rong/benchmark/data/tpch1_d4d/lineitem/part-00000-678b839d-2286-4750-923f-653adc66ed74-c000.snappy.parquet", - "format": "FILE_FORMAT_PARQUET", - "length": "206349871" - } - ] - } - } - }, - "condition": { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "functionReference": 2, - "args": [ - { - "scalarFunction": { - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 3, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - }, - { - "literal": { - "date": 8766 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 4, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 3 - } - } - } - }, - { - "literal": { - "date": 9131 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 5, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - }, - { - "literal": { - "fp64": 0.05 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 6, - "args": [ - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - }, - { - "literal": { - "fp64": 0.07 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - }, - { - "scalarFunction": { - "functionReference": 7, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "literal": { - "fp64": 24 - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ], - "outputType": { - "bool": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - } - }, - "expressions": [ - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 2 - } - } - } - } - ] - } - }, - "expressions": [ - { - "scalarFunction": { - "functionReference": 8, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - }, - { - "selection": { - "directReference": { - "structField": { - "field": 1 - } - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ] - } - }, - "groupings": [ - {} - ], - "measures": [ - { - "measure": { - "functionReference": 9, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - } - ], - "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ] - } - }, - "names": [ - "sum" - ] - } - } - ] - })"; - return eng::internal::SubstraitFromJSON("Plan", substrait_json); -} - -arrow::Future> GetSubstraitPlanStage1() { - std::string substrait_json = R"({ - "extensions": [ - { - "extensionFunction": { - "name": "sum:opt_fp64" - } - }, - { - "extensionFunction": { - "functionAnchor": 1, - "name": "alias:fp64" - } - } - ], - "relations": [ - { - "root": { - "input": { - "project": { - "common": { - "direct": {} - }, - "input": { - "aggregate": { - "common": { - "direct": {} - }, - "input": { - "read": { - "common": { - "direct": {} - }, - "baseSchema": { - "names": [ - "sum" - ], - "struct": { - "types": [ - { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - ] - } - }, - "localFiles": { - "items": [ - { - "uriFile": "iterator:0" - } - ] - } - } - }, - "groupings": [ - {} - ], - "measures": [ - { - "measure": { - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - } - ], - "phase": "AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT", - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ] - } - }, - "expressions": [ - { - "scalarFunction": { - "functionReference": 1, - "args": [ - { - "selection": { - "directReference": { - "structField": {} - } - } - } - ], - "outputType": { - "fp64": { - "nullability": "NULLABILITY_NULLABLE" - } - } - } - } - ] - } - }, - "names": [ - "revenue" - ] - } - } - ] - })"; - return eng::internal::SubstraitFromJSON("Plan", substrait_json); -} - -void ReplaceSourceDecls(std::vector source_decls, - arrow::compute::Declaration* decl) { - std::vector visited; - std::vector source_indexes; - visited.push_back(decl); - - while (!visited.empty()) { - auto top = visited.back(); - visited.pop_back(); - for (auto& input : top->inputs) { - auto& input_decl = arrow::util::get(input); - if (input_decl.factory_name == "source_index") { - source_indexes.push_back(&input_decl); - } else { - visited.push_back(&input_decl); - } - } - } - - assert(source_indexes.size() == source_decls.size()); - for (auto& source_index : source_indexes) { - auto index = - arrow::internal::checked_pointer_cast( - source_index->options) - ->index; - *source_index = std::move(source_decls[index]); - } -} - -int main(int argc, char** argv) { - std::shared_ptr input_schema; - arrow::AsyncGenerator> gen; - - // ------------------------ Stage 0 ------------------------- - { - auto maybe_serialized_plan = GetSubstraitPlanStage0().result(); - ABORT_ON_FAILURE(maybe_serialized_plan.status()); - std::shared_ptr serialized_plan = - std::move(maybe_serialized_plan).ValueOrDie(); - - // Print the received plan to stdout as JSON - arrow::Result maybe_plan_json = - eng::internal::SubstraitToJSON("Plan", *serialized_plan); - ABORT_ON_FAILURE(maybe_plan_json.status()); - std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; - std::cout << maybe_plan_json.ValueOrDie() << std::endl; - - // Deserialize each relation tree in the substrait plan to an Arrow compute - // Declaration - arrow::Result> maybe_decls = - eng::DeserializePlan(*serialized_plan); - ABORT_ON_FAILURE(maybe_decls.status()); - std::vector decls = std::move(maybe_decls).ValueOrDie(); - - // It's safe to drop the serialized plan; we don't leave references to its memory - serialized_plan.reset(); - - auto& decl = decls[0]; - - // Construct an empty plan (note: configure Function registry and ThreadPool here) - arrow::Result> maybe_plan = cp::ExecPlan::Make(); - ABORT_ON_FAILURE(maybe_plan.status()); - std::shared_ptr plan = std::move(maybe_plan).ValueOrDie(); - - auto* node = decl.AddToPlan(plan.get()).ValueOrDie(); - std::cout << "schema: " << node->output_schema()->ToString() << std::endl; - - input_schema = node->output_schema(); - - // add sink node - arrow::compute::MakeExecNode("sink", plan.get(), {node}, - arrow::compute::SinkNodeOptions{&gen}); - - // Validate the plan and print it to stdout - ABORT_ON_FAILURE(plan->Validate()); - std::cout << std::string(50, '#') << " produced arrow::ExecPlan:" << std::endl; - std::cout << plan->ToString() << std::endl; - - // Start the plan... - std::cout << std::string(50, '#') << " consuming batches:" << std::endl; - ABORT_ON_FAILURE(plan->StartProducing()); - ABORT_ON_FAILURE(plan->finished().status()); - } - - // ------------------------ Stage 1 ------------------------- - - { - auto maybe_serialized_plan = GetSubstraitPlanStage1().result(); - ABORT_ON_FAILURE(maybe_serialized_plan.status()); - std::shared_ptr serialized_plan = - std::move(maybe_serialized_plan).ValueOrDie(); - - // Print the received plan to stdout as JSON - arrow::Result maybe_plan_json = - eng::internal::SubstraitToJSON("Plan", *serialized_plan); - ABORT_ON_FAILURE(maybe_plan_json.status()); - std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; - std::cout << maybe_plan_json.ValueOrDie() << std::endl; - - // Deserialize each relation tree in the substrait plan to an Arrow compute - // Declaration - arrow::Result> maybe_decls = - eng::DeserializePlan(*serialized_plan); - ABORT_ON_FAILURE(maybe_decls.status()); - std::vector decls = std::move(maybe_decls).ValueOrDie(); - - // It's safe to drop the serialized plan; we don't leave references to its memory - serialized_plan.reset(); - - auto& decl = decls[0]; - // Add decls to plan (note: configure ExecNode registry before this point) - // Add source node - std::vector source = { - arrow::compute::Declaration{"source", cp::SourceNodeOptions{input_schema, gen}}}; - ReplaceSourceDecls(std::move(source), &decl); - - // Construct an empty plan (note: configure Function registry and ThreadPool here) - arrow::Result> maybe_plan = cp::ExecPlan::Make(); - ABORT_ON_FAILURE(maybe_plan.status()); - std::shared_ptr plan = std::move(maybe_plan).ValueOrDie(); - - auto* node = decl.AddToPlan(plan.get()).ValueOrDie(); - std::cout << "schema: " << node->output_schema()->ToString() << std::endl; - - auto output_schema = node->output_schema(); - - // add sink node - arrow::AsyncGenerator> sink_gen; - arrow::compute::MakeExecNode("sink", plan.get(), {node}, - arrow::compute::SinkNodeOptions{&sink_gen}); - - // Validate the plan and print it to stdout - ABORT_ON_FAILURE(plan->Validate()); - std::cout << std::string(50, '#') << " produced arrow::ExecPlan:" << std::endl; - std::cout << plan->ToString() << std::endl; - - // Start the plan... - std::cout << std::string(50, '#') << " consuming batches:" << std::endl; - ABORT_ON_FAILURE(plan->StartProducing()); - - std::shared_ptr sink_reader = cp::MakeGeneratorReader( - std::move(output_schema), std::move(sink_gen), arrow::default_memory_pool()); - - // Call sink_reader Iterator interface - int batch_cnt = 0; - while (true) { - auto maybe_batch = sink_reader->Next(); - ABORT_ON_FAILURE(maybe_batch.status()); - auto batch = std::move(maybe_batch).ValueOrDie(); - if (!batch) break; - std::cout << "Batch #" << batch_cnt++ << " Num output rows: " << batch->num_rows() - << std::endl; - ARROW_CHECK_EQ(batch->num_rows(), 1); - std::cout << "Result: " << batch->ToString() << std::endl; - } - // ... and wait for it to finish - ABORT_ON_FAILURE(plan->finished().status()); - } - - return EXIT_SUCCESS; -} diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc b/cpp/src/arrow/engine/substrait/expression_internal.cc index 7a85345bae60a..2dd3725f7bcab 100644 --- a/cpp/src/arrow/engine/substrait/expression_internal.cc +++ b/cpp/src/arrow/engine/substrait/expression_internal.cc @@ -167,10 +167,19 @@ Result FromProto(const substrait::Expression& expr, if (decoded_function.name.to_string() == "alias") { if (scalar_fn.args_size() != 1) { - return arrow::Status::Invalid("Alias should have exact 1 arg, but got " + std::to_string(scalar_fn.args_size())); + return arrow::Status::Invalid("Alias should have exact 1 arg, but got " + + std::to_string(scalar_fn.args_size())); } return FromProto(scalar_fn.args().at(0), ext_set); } + if (decoded_function.name.to_string() == "is_in") { + const auto& in_list = + std::static_pointer_cast(arguments[1].literal()->scalar()); + auto value = in_list->value; + return compute::call( + "is_in", std::vector{std::move(arguments[0])}, + arrow::compute::SetLookupOptions(*value)); + } return compute::call(decoded_function.name.to_string(), std::move(arguments)); } diff --git a/cpp/src/arrow/engine/substrait/extension_set.cc b/cpp/src/arrow/engine/substrait/extension_set.cc index da15bd9c73a5b..deba09f3c8c82 100644 --- a/cpp/src/arrow/engine/substrait/extension_set.cc +++ b/cpp/src/arrow/engine/substrait/extension_set.cc @@ -160,6 +160,7 @@ Result ExtensionSet::Make(std::vector uris, if (function_ids[i].empty()) continue; RETURN_NOT_OK(set.impl_->CheckHasUri(function_ids[i].uri)); + // Remove the types in function name. function_ids[i].name = function_ids[i].name.substr(0, function_ids[i].name.find(':')); if (auto rec = registry->GetFunction(function_ids[i])) { set.functions_[i] = {rec->id, rec->function_name}; @@ -222,19 +223,13 @@ ExtensionIdRegistry* default_extension_id_registry() { // The type (variation) mappings listed below need to be kept in sync // with the YAML at substrait/format/extension_types.yaml manually; // see ARROW-15535. - for (TypeName e : { - TypeName{uint8(), "u8"}, - TypeName{uint16(), "u16"}, - TypeName{uint32(), "u32"}, - TypeName{uint64(), "u64"}, - TypeName{int8(), "i8"}, - TypeName{int16(), "i16"}, - TypeName{int32(), "i32"}, - TypeName{int64(), "i64"}, - TypeName{float16(), "fp16"}, - TypeName{float64(), "fp64"}, - TypeName{date64(), "date"}, - }) { + for (TypeName e : + {TypeName{uint8(), "u8"}, TypeName{uint16(), "u16"}, TypeName{uint32(), "u32"}, + TypeName{uint64(), "u64"}, TypeName{int8(), "i8"}, TypeName{int16(), "i16"}, + TypeName{int32(), "i32"}, TypeName{int64(), "i64"}, + TypeName{float16(), "fp16"}, TypeName{float32(), "fp32"}, + TypeName{float64(), "fp64"}, TypeName{date64(), "date"}, + TypeName{utf8(), "string"}}) { DCHECK_OK(RegisterType({kArrowExtTypesUri, e.name}, std::move(e.type), /*is_variation=*/true)); } @@ -260,11 +255,15 @@ ExtensionIdRegistry* default_extension_id_registry() { {"avg", "mean"}, {"multiply", "multiply"}, {"is_not_null", "is_valid"}, - {"and", "and"}, + {"is_null", "is_null"}, + {"and", "and_kleene"}, + {"or", "or_kleene"}, + {"equal", "equal"}, {"lt", "less"}, {"lte", "less_equal"}, {"gte", "greater_equal"}, - {"alias", "alias"}}) { + {"alias", "alias"}, + {"in", "is_in"}}) { DCHECK_OK(RegisterFunction({kArrowExtTypesUri, name.first}, name.second)); } } diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 1eb1b3bd43d9b..094eac2656970 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -26,7 +26,6 @@ #include "arrow/engine/substrait/expression_internal.h" #include "arrow/engine/substrait/type_internal.h" #include "arrow/filesystem/localfs.h" -#include "arrow/util/uri.h" namespace arrow { namespace engine { @@ -67,10 +66,11 @@ Result FromProto(const substrait::Rel& rel, auto scan_options = std::make_shared(); - // FIXME: FieldPath is not supported in scan filter. See ARROW-14658 - if (read.has_filter()) { - // ARROW_ASSIGN_OR_RAISE(scan_options->filter, FromProto(read.filter(), ext_set)); - } + // FieldPath is not supported in scan filter. See ARROW-14658 + // Ignore the filter in ReadRel and use the push-down filter from Filter operator. + // if (read.has_filter()) { + // ARROW_ASSIGN_OR_RAISE(scan_options->filter, FromProto(read.filter(), ext_set)); + // } if (read.has_projection()) { // NOTE: scan_options->projection is not used by the scanner and thus can't be @@ -88,16 +88,18 @@ Result FromProto(const substrait::Rel& rel, "substrait::ReadRel::LocalFiles::advanced_extension"); } + // Check whether the input is iterator. auto head = read.local_files().items().at(0); if (head.path_type_case() == substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile && util::string_view{head.uri_file()}.starts_with("iterator:")) { const auto& index = head.uri_file().substr(9); + // Construct decl with the index of input iterator. return compute::Declaration{"source_index", compute::SourceIndexOptions{std::stoi(index)}}; } - ARROW_ASSIGN_OR_RAISE(auto filesystem, fs::FileSystemFromUri(head.uri_file())); std::shared_ptr format; + auto filesystem = std::make_shared(); std::vector> fragments; for (const auto& item : read.local_files().items()) { @@ -121,16 +123,19 @@ Result FromProto(const substrait::Rel& rel, "other than FILE_FORMAT_PARQUET"); } - auto uri = arrow::internal::Uri(); - RETURN_NOT_OK(uri.Parse(item.uri_file())); - auto path = uri.path(); - - // TODO: partition index. - if (item.partition_index() != 0) { - // return Status::NotImplemented( - // "non-default - // substrait::ReadRel::LocalFiles::FileOrFiles::partition_index"); + if (!util::string_view{item.uri_file()}.starts_with("file:///")) { + return Status::NotImplemented( + "substrait::ReadRel::LocalFiles::FileOrFiles::uri_file " + "with other than local filesystem (file:///)"); } + auto path = item.uri_file().substr(7); + + // Ignore partition index and use start and length to locate file fragment. + // if (item.partition_index() != 0) { + // return Status::NotImplemented( + // "non-default + // substrait::ReadRel::LocalFiles::FileOrFiles::partition_index"); + // } // Read all row groups if both start and length are not specified. int64_t start_offset = item.length() == 0 && item.start() == 0 @@ -194,142 +199,6 @@ Result FromProto(const substrait::Rel& rel, }); } - case substrait::Rel::RelTypeCase::kAggregate: { - const auto& aggregate = rel.aggregate(); - RETURN_NOT_OK(CheckRelCommon(aggregate)); - - if (!aggregate.has_input()) { - return Status::Invalid("substrait::AggregateRel with no input relation"); - } - ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set)); - - compute::AggregateNodeOptions opts{{}, {}, {}}; - - if (aggregate.groupings_size() > 1) { - return Status::Invalid("substrait::AggregateRel has " + - std::to_string(aggregate.groupings_size()) + " groupings"); - } - - for (const auto& grouping : aggregate.groupings()) { - for (const auto& expr : grouping.grouping_expressions()) { - ARROW_ASSIGN_OR_RAISE(auto key_expr, FromProto(expr, ext_set)); - if (auto field = key_expr.field_ref()) { - opts.keys.emplace_back(*field); - } else { - return Status::Invalid( - "substrait::AggregateRel grouping key is not a field reference: " + - key_expr.ToString()); - } - } - } - - // group by will first output targets then keys - // We need a post projector to first output keys then targets - // TODO: use options to control this behavior - std::vector reordered_fields(opts.keys.size()); - int32_t reordered_field_pos = 0; - - for (const auto& measure : aggregate.measures()) { - if (measure.has_filter()) { - // invalid - return Status::Invalid("substrait::AggregateRel has filter."); - } - - auto agg_func = measure.measure(); - ARROW_ASSIGN_OR_RAISE(auto decoded_function, - ext_set.DecodeFunction(agg_func.function_reference())); - - if (!agg_func.sorts().empty()) { - return Status::Invalid("substrait::AggregateRel aggregate function #" + - decoded_function.name.to_string() + " has sort."); - } - - std::vector target_fields; - target_fields.reserve(agg_func.args_size()); - for (const auto& arg : agg_func.args()) { - ARROW_ASSIGN_OR_RAISE(auto target_expr, FromProto(arg, ext_set)); - if (auto target_field = target_expr.field_ref()) { - target_fields.emplace_back(*target_field); - } else { - return Status::Invalid( - "substrait::AggregateRel measure's arg is not a field reference: " + - target_expr.ToString()); - } - } - - int32_t target_field_idx = 0; - if (decoded_function.name == "mean") { - switch (agg_func.phase()) { - case ::substrait::AggregationPhase:: - AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE: { - static std::vector function_names = {"hash_sum", "hash_count"}; - for (const std::string& func : function_names) { - opts.aggregates.push_back({func, nullptr}); - opts.targets.emplace_back(target_fields[target_field_idx]); - opts.names.emplace_back(func + " " + opts.targets.back().ToString()); - reordered_fields.emplace_back(compute::field_ref(reordered_field_pos++)); - } - target_field_idx++; - break; - } - case ::substrait::AggregationPhase:: - AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT: { - static std::vector function_names = {"hash_sum", "hash_sum"}; - for (const std::string& func : function_names) { - opts.aggregates.push_back({func, nullptr}); - opts.targets.emplace_back(std::move(target_fields[target_field_idx])); - opts.names.emplace_back(func + " " + opts.targets.back().ToString()); - target_field_idx++; - } - reordered_fields.emplace_back( - compute::call("divide", {compute::field_ref(reordered_field_pos++), - compute::field_ref(reordered_field_pos++)})); - break; - } - default: - return Status::Invalid("substrait::AggregateRel unsupported phase " + - std::to_string(agg_func.phase())); - } - } else if (decoded_function.name == "count" && - agg_func.phase() == ::substrait::AggregationPhase:: - AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT) { - std::string func = "hash_sum"; - opts.aggregates.push_back({func, nullptr}); - opts.targets.emplace_back(std::move(target_fields[target_field_idx])); - opts.names.emplace_back(func + " " + opts.targets.back().ToString()); - target_field_idx++; - reordered_fields.emplace_back(compute::field_ref(reordered_field_pos++)); - } else { - std::string func = opts.keys.empty() - ? decoded_function.name.to_string() - : "hash_" + decoded_function.name.to_string(); - opts.aggregates.push_back({func, nullptr}); - opts.targets.emplace_back(std::move(target_fields[target_field_idx])); - opts.names.emplace_back(func + " " + opts.targets.back().ToString()); - target_field_idx++; - reordered_fields.emplace_back(compute::field_ref(reordered_field_pos++)); - } - - if (target_field_idx != agg_func.args_size()) { - return Status::Invalid("substrait::AggregateRel aggregate function #" + - decoded_function.name.to_string() + - " not all arguments are consumed."); - } - } - - for (size_t i = 0; i < opts.keys.size(); ++i) { - reordered_fields[i] = compute::field_ref(reordered_field_pos++); - } - - auto aggregate_decl = compute::Declaration{"aggregate", std::move(opts)}; - - auto post_project_decl = compute::Declaration{ - "project", compute::ProjectNodeOptions{std::move(reordered_fields)}}; - - return compute::Declaration::Sequence( - {std::move(input), std::move(aggregate_decl), std::move(post_project_decl)}); - } - default: break; }