From 7430bbdfcda5d23cf18d7222da8419eb594d9f3f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 29 Apr 2021 20:42:29 -0400 Subject: [PATCH] ARROW-11929: [C++][Dataset][Compute] Promote expression to the compute namespace Moves Expression and its test and benchmark into the compute/exec/ directory. I haven't introduced an exec namespace. Closes #10166 from bkietz/11929-Promote-Expression-to-the Authored-by: Benjamin Kietzman Signed-off-by: David Li --- .../arrow/dataset_documentation_example.cc | 21 ++--- .../arrow/dataset_parquet_scan_example.cc | 10 ++- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/CMakeLists.txt | 2 + cpp/src/arrow/compute/exec/CMakeLists.txt | 22 +++++ .../{dataset => compute/exec}/expression.cc | 35 ++++++-- .../{dataset => compute/exec}/expression.h | 64 +++++++-------- .../exec}/expression_benchmark.cc | 31 ++++--- .../exec}/expression_internal.h | 6 +- .../exec}/expression_test.cc | 26 ++++-- cpp/src/arrow/dataset/CMakeLists.txt | 5 -- cpp/src/arrow/dataset/api.h | 2 +- cpp/src/arrow/dataset/dataset.cc | 17 ++-- cpp/src/arrow/dataset/dataset.h | 31 ++++--- cpp/src/arrow/dataset/dataset_internal.h | 14 ++-- cpp/src/arrow/dataset/discovery.cc | 2 +- cpp/src/arrow/dataset/discovery.h | 6 +- cpp/src/arrow/dataset/file_base.cc | 22 ++--- cpp/src/arrow/dataset/file_base.h | 16 ++-- cpp/src/arrow/dataset/file_benchmark.cc | 7 +- cpp/src/arrow/dataset/file_parquet.cc | 44 +++++----- cpp/src/arrow/dataset/file_parquet.h | 14 ++-- cpp/src/arrow/dataset/file_parquet_test.cc | 6 +- cpp/src/arrow/dataset/file_test.cc | 10 +-- cpp/src/arrow/dataset/partition.cc | 34 ++++---- cpp/src/arrow/dataset/partition.h | 22 ++--- cpp/src/arrow/dataset/partition_test.cc | 30 +++---- cpp/src/arrow/dataset/projector.h | 2 +- cpp/src/arrow/dataset/scanner.cc | 10 +-- cpp/src/arrow/dataset/scanner.h | 10 +-- cpp/src/arrow/dataset/scanner_internal.h | 38 +++++---- cpp/src/arrow/dataset/scanner_test.cc | 2 +- cpp/src/arrow/dataset/test_util.h | 54 ++++++------ cpp/src/arrow/dataset/type_fwd.h | 3 +- python/pyarrow/includes/libarrow_dataset.pxd | 27 +++--- r/NAMESPACE | 1 + r/R/arrowExports.R | 20 ++--- r/R/expression.R | 10 +-- r/man/FileFormat.Rd | 5 +- r/man/FragmentScanOptions.Rd | 11 +++ r/man/arrow-package.Rd | 2 + r/src/arrowExports.cpp | 82 +++++++++---------- r/src/arrow_types.h | 1 + r/src/dataset.cpp | 7 +- r/src/expression.cpp | 56 ++++++------- 45 files changed, 473 insertions(+), 368 deletions(-) create mode 100644 cpp/src/arrow/compute/exec/CMakeLists.txt rename cpp/src/arrow/{dataset => compute/exec}/expression.cc (97%) rename cpp/src/arrow/{dataset => compute/exec}/expression.h (85%) rename cpp/src/arrow/{dataset => compute/exec}/expression_benchmark.cc (81%) rename cpp/src/arrow/{dataset => compute/exec}/expression_internal.h (99%) rename cpp/src/arrow/{dataset => compute/exec}/expression_test.cc (98%) diff --git a/cpp/examples/arrow/dataset_documentation_example.cc b/cpp/examples/arrow/dataset_documentation_example.cc index 6954460d413b4..0fb4ad2f627a6 100644 --- a/cpp/examples/arrow/dataset_documentation_example.cc +++ b/cpp/examples/arrow/dataset_documentation_example.cc @@ -20,9 +20,9 @@ #include #include +#include #include #include -#include #include #include #include @@ -37,6 +37,7 @@ namespace ds = arrow::dataset; namespace fs = arrow::fs; +namespace cp = arrow::compute; #define ABORT_ON_FAILURE(expr) \ do { \ @@ -185,7 +186,7 @@ std::shared_ptr FilterAndSelectDataset( // Read specified columns with a row filter auto scan_builder = dataset->NewScan().ValueOrDie(); ABORT_ON_FAILURE(scan_builder->Project({"b"})); - ABORT_ON_FAILURE(scan_builder->Filter(ds::less(ds::field_ref("b"), ds::literal(4)))); + ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4)))); auto scanner = scan_builder->Finish().ValueOrDie(); return scanner->ToTable().ValueOrDie(); } @@ -210,12 +211,12 @@ std::shared_ptr ProjectDataset( ABORT_ON_FAILURE(scan_builder->Project( { // Leave column "a" as-is. - ds::field_ref("a"), + cp::field_ref("a"), // Cast column "b" to float32. - ds::call("cast", {ds::field_ref("b")}, + cp::call("cast", {cp::field_ref("b")}, arrow::compute::CastOptions::Safe(arrow::float32())), // Derive a boolean column from "c". - ds::equal(ds::field_ref("c"), ds::literal(1)), + cp::equal(cp::field_ref("c"), cp::literal(1)), }, {"a_renamed", "b_as_float32", "c_1"})); auto scanner = scan_builder->Finish().ValueOrDie(); @@ -239,15 +240,15 @@ std::shared_ptr SelectAndProjectDataset( // Read specified columns with a row filter auto scan_builder = dataset->NewScan().ValueOrDie(); std::vector names; - std::vector exprs; + std::vector exprs; // Read all the original columns. for (const auto& field : dataset->schema()->fields()) { names.push_back(field->name()); - exprs.push_back(ds::field_ref(field->name())); + exprs.push_back(cp::field_ref(field->name())); } // Also derive a new column. - names.push_back("b_large"); - exprs.push_back(ds::greater(ds::field_ref("b"), ds::literal(1))); + names.emplace_back("b_large"); + exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1))); ABORT_ON_FAILURE(scan_builder->Project(exprs, names)); auto scanner = scan_builder->Finish().ValueOrDie(); return scanner->ToTable().ValueOrDie(); @@ -295,7 +296,7 @@ std::shared_ptr FilterPartitionedDataset( // Filter based on the partition values. This will mean that we won't even read the // files whose partition expressions don't match the filter. ABORT_ON_FAILURE( - scan_builder->Filter(ds::equal(ds::field_ref("part"), ds::literal("b")))); + scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b")))); auto scanner = scan_builder->Finish().ValueOrDie(); return scanner->ToTable().ValueOrDie(); } diff --git a/cpp/examples/arrow/dataset_parquet_scan_example.cc b/cpp/examples/arrow/dataset_parquet_scan_example.cc index 197ca5aa4c688..cd9b89fe3800e 100644 --- a/cpp/examples/arrow/dataset_parquet_scan_example.cc +++ b/cpp/examples/arrow/dataset_parquet_scan_example.cc @@ -16,9 +16,9 @@ // under the License. #include +#include #include #include -#include #include #include #include @@ -37,6 +37,8 @@ namespace fs = arrow::fs; namespace ds = arrow::dataset; +namespace cp = arrow::compute; + #define ABORT_ON_FAILURE(expr) \ do { \ arrow::Status status_ = (expr); \ @@ -60,8 +62,8 @@ struct Configuration { // Indicates the filter by which rows will be filtered. This optimization can // make use of partition information and/or file metadata if possible. - ds::Expression filter = - ds::greater(ds::field_ref("total_amount"), ds::literal(1000.0f)); + cp::Expression filter = + cp::greater(cp::field_ref("total_amount"), cp::literal(1000.0f)); ds::InspectOptions inspect_options{}; ds::FinishOptions finish_options{}; @@ -146,7 +148,7 @@ std::shared_ptr GetDatasetFromPath( std::shared_ptr GetScannerFromDataset(std::shared_ptr dataset, std::vector columns, - ds::Expression filter, + cp::Expression filter, bool use_threads) { auto scanner_builder = dataset->NewScan().ValueOrDie(); diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d95c3b55efff8..62ea94b8d0206 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -367,6 +367,7 @@ if(ARROW_COMPUTE) compute/api_vector.cc compute/cast.cc compute/exec.cc + compute/exec/expression.cc compute/function.cc compute/kernel.cc compute/registry.cc diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index e781dff90e2a1..897dc32f35707 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -68,3 +68,5 @@ add_arrow_compute_test(internals_test add_arrow_benchmark(function_benchmark PREFIX "arrow-compute") add_subdirectory(kernels) + +add_subdirectory(exec) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt new file mode 100644 index 0000000000000..a10c1dad469a0 --- /dev/null +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +arrow_install_all_headers("arrow/compute/exec") + +add_arrow_compute_test(expression_test PREFIX "arrow-compute") + +add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/dataset/expression.cc b/cpp/src/arrow/compute/exec/expression.cc similarity index 97% rename from cpp/src/arrow/dataset/expression.cc rename to cpp/src/arrow/compute/exec/expression.cc index cc126fcc7fb25..1f819cf3d0457 100644 --- a/cpp/src/arrow/dataset/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/dataset/expression.h" +#include "arrow/compute/exec/expression.h" #include #include #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" +#include "arrow/compute/exec/expression_internal.h" #include "arrow/compute/exec_internal.h" -#include "arrow/dataset/expression_internal.h" #include "arrow/io/memory.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" @@ -39,7 +39,7 @@ namespace arrow { using internal::checked_cast; using internal::checked_pointer_cast; -namespace dataset { +namespace compute { Expression::Expression(Call call) : impl_(std::make_shared(std::move(call))) {} @@ -198,7 +198,7 @@ std::string Expression::ToString() const { if (auto options = GetStrptimeOptions(*call)) { return out + "format=" + options->format + - ", unit=" + internal::ToString(options->unit) + ")"; + ", unit=" + arrow::internal::ToString(options->unit) + ")"; } return out + "{NON-REPRESENTABLE OPTIONS})"; @@ -304,8 +304,9 @@ size_t Expression::hash() const { } std::shared_ptr> expected = nullptr; - internal::atomic_compare_exchange_strong(&const_cast(call)->hash, &expected, - std::make_shared>(out)); + ::arrow::internal::atomic_compare_exchange_strong( + &const_cast(call)->hash, &expected, + std::make_shared>(out)); return out; } @@ -525,6 +526,23 @@ Result ExecuteScalarExpression(const Expression& expr, const Datum& input "ExecuteScalarExpression cannot Execute non-scalar expression ", expr.ToString()); } + if (input.kind() == Datum::TABLE) { + TableBatchReader reader(*input.table()); + std::shared_ptr batch; + + while (true) { + RETURN_NOT_OK(reader.ReadNext(&batch)); + if (batch != nullptr) { + break; + } + ARROW_ASSIGN_OR_RAISE(Datum res, ExecuteScalarExpression(expr, batch)); + if (res.is_scalar()) { + ARROW_ASSIGN_OR_RAISE(res, MakeArrayFromScalar(*res.scalar(), batch->num_rows(), + exec_context->memory_pool())); + } + } + } + if (auto lit = expr.literal()) return *lit; if (auto ref = expr.field_ref()) { @@ -1156,7 +1174,8 @@ Result Deserialize(std::shared_ptr buffer) { Result> GetScalar(const std::string& i) { int32_t column_index; - if (!internal::ParseValue(i.data(), i.length(), &column_index)) { + if (!::arrow::internal::ParseValue(i.data(), i.length(), + &column_index)) { return Status::Invalid("Couldn't parse column_index"); } if (column_index >= batch_.num_columns()) { @@ -1279,5 +1298,5 @@ Expression operator||(Expression lhs, Expression rhs) { return or_(std::move(lhs), std::move(rhs)); } -} // namespace dataset +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/expression.h b/cpp/src/arrow/compute/exec/expression.h similarity index 85% rename from cpp/src/arrow/dataset/expression.h rename to cpp/src/arrow/compute/exec/expression.h index 8bdcb4a0ffa6f..3d3ce99c257ac 100644 --- a/cpp/src/arrow/dataset/expression.h +++ b/cpp/src/arrow/compute/exec/expression.h @@ -34,14 +34,14 @@ #include "arrow/util/variant.h" namespace arrow { -namespace dataset { +namespace compute { /// An unbound expression which maps a single Datum to another Datum. /// An expression is one of /// - A literal Datum. /// - A reference to a single (potentially nested) field of the input Datum. /// - A call to a compute function, with arguments specified by other Expressions. -class ARROW_DS_EXPORT Expression { +class ARROW_EXPORT Expression { public: struct Call { std::string function_name; @@ -122,9 +122,9 @@ class ARROW_DS_EXPORT Expression { using Impl = util::Variant; std::shared_ptr impl_; - ARROW_DS_EXPORT friend bool Identical(const Expression& l, const Expression& r); + ARROW_EXPORT friend bool Identical(const Expression& l, const Expression& r); - ARROW_DS_EXPORT friend void PrintTo(const Expression&, std::ostream*); + ARROW_EXPORT friend void PrintTo(const Expression&, std::ostream*); }; inline bool operator==(const Expression& l, const Expression& r) { return l.Equals(r); } @@ -132,7 +132,7 @@ inline bool operator!=(const Expression& l, const Expression& r) { return !l.Equ // Factories -ARROW_DS_EXPORT +ARROW_EXPORT Expression literal(Datum lit); template @@ -140,10 +140,10 @@ Expression literal(Arg&& arg) { return literal(Datum(std::forward(arg))); } -ARROW_DS_EXPORT +ARROW_EXPORT Expression field_ref(FieldRef ref); -ARROW_DS_EXPORT +ARROW_EXPORT Expression call(std::string function, std::vector arguments, std::shared_ptr options = NULLPTR); @@ -156,11 +156,11 @@ Expression call(std::string function, std::vector arguments, } /// Assemble a list of all fields referenced by an Expression at any depth. -ARROW_DS_EXPORT +ARROW_EXPORT std::vector FieldsInExpression(const Expression&); /// Assemble a mapping from field references to known values. -ARROW_DS_EXPORT +ARROW_EXPORT Result> ExtractKnownFieldValues( const Expression& guaranteed_true_predicate); @@ -179,17 +179,17 @@ Result> ExtractKnownFieldVal /// Weak canonicalization which establishes guarantees for subsequent passes. Even /// equivalent Expressions may result in different canonicalized expressions. /// TODO this could be a strong canonicalization -ARROW_DS_EXPORT +ARROW_EXPORT Result Canonicalize(Expression, compute::ExecContext* = NULLPTR); /// Simplify Expressions based on literal arguments (for example, add(null, x) will always /// be null so replace the call with a null literal). Includes early evaluation of all /// calls whose arguments are entirely literal. -ARROW_DS_EXPORT +ARROW_EXPORT Result FoldConstants(Expression); /// Simplify Expressions by replacing with known values of the fields which it references. -ARROW_DS_EXPORT +ARROW_EXPORT Result ReplaceFieldsWithKnownValues( const std::unordered_map& known_values, Expression); @@ -197,7 +197,7 @@ Result ReplaceFieldsWithKnownValues( /// a boolean expression which is guaranteed to evaluate to `true`. For example, this is /// used to remove redundant function calls from a filter expression or to replace a /// reference to a constant-value field with a literal. -ARROW_DS_EXPORT +ARROW_EXPORT Result SimplifyWithGuarantee(Expression, const Expression& guaranteed_true_predicate); @@ -207,44 +207,44 @@ Result SimplifyWithGuarantee(Expression, /// Execute a scalar expression against the provided state and input Datum. This /// expression must be bound. -ARROW_DS_EXPORT +ARROW_EXPORT Result ExecuteScalarExpression(const Expression&, const Datum& input, compute::ExecContext* = NULLPTR); // Serialization -ARROW_DS_EXPORT +ARROW_EXPORT Result> Serialize(const Expression&); -ARROW_DS_EXPORT +ARROW_EXPORT Result Deserialize(std::shared_ptr); // Convenience aliases for factories -ARROW_DS_EXPORT Expression project(std::vector values, - std::vector names); +ARROW_EXPORT Expression project(std::vector values, + std::vector names); -ARROW_DS_EXPORT Expression equal(Expression lhs, Expression rhs); +ARROW_EXPORT Expression equal(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression not_equal(Expression lhs, Expression rhs); +ARROW_EXPORT Expression not_equal(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression less(Expression lhs, Expression rhs); +ARROW_EXPORT Expression less(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression less_equal(Expression lhs, Expression rhs); +ARROW_EXPORT Expression less_equal(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression greater(Expression lhs, Expression rhs); +ARROW_EXPORT Expression greater(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression greater_equal(Expression lhs, Expression rhs); +ARROW_EXPORT Expression greater_equal(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression is_null(Expression lhs); +ARROW_EXPORT Expression is_null(Expression lhs); -ARROW_DS_EXPORT Expression is_valid(Expression lhs); +ARROW_EXPORT Expression is_valid(Expression lhs); -ARROW_DS_EXPORT Expression and_(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression and_(const std::vector&); -ARROW_DS_EXPORT Expression or_(Expression lhs, Expression rhs); -ARROW_DS_EXPORT Expression or_(const std::vector&); -ARROW_DS_EXPORT Expression not_(Expression operand); +ARROW_EXPORT Expression and_(Expression lhs, Expression rhs); +ARROW_EXPORT Expression and_(const std::vector&); +ARROW_EXPORT Expression or_(Expression lhs, Expression rhs); +ARROW_EXPORT Expression or_(const std::vector&); +ARROW_EXPORT Expression not_(Expression operand); -} // namespace dataset +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/expression_benchmark.cc b/cpp/src/arrow/compute/exec/expression_benchmark.cc similarity index 81% rename from cpp/src/arrow/dataset/expression_benchmark.cc rename to cpp/src/arrow/compute/exec/expression_benchmark.cc index 24870f38c143c..1899b7caab6df 100644 --- a/cpp/src/arrow/dataset/expression_benchmark.cc +++ b/cpp/src/arrow/compute/exec/expression_benchmark.cc @@ -18,23 +18,16 @@ #include "benchmark/benchmark.h" #include "arrow/compute/cast.h" -#include "arrow/dataset/expression.h" +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/partition.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" namespace arrow { -namespace dataset { +namespace compute { -static Expression GetPartitionExpression(const std::string& path, bool infer_dictionary) { - auto options = HivePartitioningFactoryOptions(); - options.infer_dictionary = infer_dictionary; - auto factory = HivePartitioning::MakeFactory(options); - ASSIGN_OR_ABORT(auto schema, factory->Inspect({path})); - ASSIGN_OR_ABORT(auto partitioning, factory->Finish(schema)); - ASSIGN_OR_ABORT(auto expr, partitioning->Parse(path)); - return expr; -} +std::shared_ptr ninety_nine_dict = + DictionaryScalar::Make(MakeScalar(0), ArrayFromJSON(int64(), "[99]")); // A benchmark of SimplifyWithGuarantee using expressions arising from partitioning. static void SimplifyFilterWithGuarantee(benchmark::State& state, Expression filter, @@ -61,11 +54,15 @@ auto filter_cast_negative = auto filter_cast_positive = and_(equal(call("cast", {field_ref("a")}, to_int64), literal(99)), equal(call("cast", {field_ref("b")}, to_int64), literal(99))); -// A fully simplified partition expression. -auto guarantee = GetPartitionExpression("a=99/b=99", /*infer_dictionary=*/false); -// A partition expression that uses dictionaries, which are inferred by default. -auto guarantee_dictionary = - GetPartitionExpression("a=99/b=99", /*infer_dictionary=*/true); + +// An unencoded partition expression for "a=99/b=99". +auto guarantee = and_(equal(field_ref("a"), literal(int64_t(99))), + equal(field_ref("b"), literal(int64_t(99)))); + +// A partition expression for "a=99/b=99" that uses dictionaries (inferred by default). +auto guarantee_dictionary = and_(equal(field_ref("a"), literal(ninety_nine_dict)), + equal(field_ref("b"), literal(ninety_nine_dict))); + // Negative queries (partition expressions that fail the filter) BENCHMARK_CAPTURE(SimplifyFilterWithGuarantee, negative_filter_simple_guarantee_simple, filter_simple_negative, guarantee); @@ -87,5 +84,5 @@ BENCHMARK_CAPTURE(SimplifyFilterWithGuarantee, BENCHMARK_CAPTURE(SimplifyFilterWithGuarantee, positive_filter_cast_guarantee_dictionary, filter_cast_positive, guarantee_dictionary); -} // namespace dataset +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/expression_internal.h b/cpp/src/arrow/compute/exec/expression_internal.h similarity index 99% rename from cpp/src/arrow/dataset/expression_internal.h rename to cpp/src/arrow/compute/exec/expression_internal.h index 24e60377f5a53..7b0cc758f57d7 100644 --- a/cpp/src/arrow/dataset/expression_internal.h +++ b/cpp/src/arrow/compute/exec/expression_internal.h @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/dataset/expression.h" +#include "arrow/compute/exec/expression.h" #include #include @@ -32,7 +32,7 @@ namespace arrow { using internal::checked_cast; -namespace dataset { +namespace compute { inline const Expression::Call* CallNotNull(const Expression& expr) { auto call = expr.call(); @@ -338,5 +338,5 @@ Result Modify(Expression expr, const PreVisit& pre, return post_call(std::move(expr), nullptr); } -} // namespace dataset +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/expression_test.cc b/cpp/src/arrow/compute/exec/expression_test.cc similarity index 98% rename from cpp/src/arrow/dataset/expression_test.cc rename to cpp/src/arrow/compute/exec/expression_test.cc index 2ab796b052f62..ab3fbb4d19651 100644 --- a/cpp/src/arrow/dataset/expression_test.cc +++ b/cpp/src/arrow/compute/exec/expression_test.cc @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/dataset/expression.h" +#include "arrow/compute/exec/expression.h" #include #include @@ -26,9 +26,8 @@ #include #include +#include "arrow/compute/exec/expression_internal.h" #include "arrow/compute/registry.h" -#include "arrow/dataset/expression_internal.h" -#include "arrow/dataset/test_util.h" #include "arrow/testing/gtest_util.h" using testing::HasSubstr; @@ -39,7 +38,24 @@ namespace arrow { using internal::checked_cast; using internal::checked_pointer_cast; -namespace dataset { +namespace compute { + +const std::shared_ptr kBoringSchema = schema({ + field("bool", boolean()), + field("i8", int8()), + field("i32", int32()), + field("i32_req", int32(), /*nullable=*/false), + field("u32", uint32()), + field("i64", int64()), + field("f32", float32()), + field("f32_req", float32(), /*nullable=*/false), + field("f64", float64()), + field("date64", date64()), + field("str", utf8()), + field("dict_str", dictionary(int32(), utf8())), + field("dict_i32", dictionary(int32(), int32())), + field("ts_ns", timestamp(TimeUnit::NANO)), +}); #define EXPECT_OK ARROW_EXPECT_OK @@ -1278,5 +1294,5 @@ TEST(Projection, AugmentWithKnownValues) { })); } -} // namespace dataset +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 14acbc73a4882..f2fde70305e69 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -22,7 +22,6 @@ arrow_install_all_headers("arrow/dataset") set(ARROW_DATASET_SRCS dataset.cc discovery.cc - expression.cc file_base.cc file_ipc.cc partition.cc @@ -106,7 +105,6 @@ endfunction() add_arrow_dataset_test(dataset_test) add_arrow_dataset_test(discovery_test) -add_arrow_dataset_test(expression_test) add_arrow_dataset_test(file_ipc_test) add_arrow_dataset_test(file_test) add_arrow_dataset_test(partition_test) @@ -121,14 +119,11 @@ if(ARROW_PARQUET) endif() if(ARROW_BUILD_BENCHMARKS) - add_arrow_benchmark(expression_benchmark PREFIX "arrow-dataset") add_arrow_benchmark(file_benchmark PREFIX "arrow-dataset") if(ARROW_BUILD_STATIC) - target_link_libraries(arrow-dataset-expression-benchmark PUBLIC arrow_dataset_static) target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_static) else() - target_link_libraries(arrow-dataset-expression-benchmark PUBLIC arrow_dataset_shared) target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_shared) endif() endif() diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h index da9f5ed371e31..f3672a0eff56d 100644 --- a/cpp/src/arrow/dataset/api.h +++ b/cpp/src/arrow/dataset/api.h @@ -19,9 +19,9 @@ #pragma once +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/dataset.h" #include "arrow/dataset/discovery.h" -#include "arrow/dataset/expression.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/file_csv.h" #include "arrow/dataset/file_ipc.h" diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 60d9bd730730c..ab0600dd1a8c7 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -31,7 +31,7 @@ namespace arrow { namespace dataset { -Fragment::Fragment(Expression partition_expression, +Fragment::Fragment(compute::Expression partition_expression, std::shared_ptr physical_schema) : partition_expression_(std::move(partition_expression)), physical_schema_(std::move(physical_schema)) {} @@ -58,14 +58,14 @@ Result> InMemoryFragment::ReadPhysicalSchemaImpl() { InMemoryFragment::InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, - Expression partition_expression) + compute::Expression partition_expression) : Fragment(std::move(partition_expression), std::move(schema)), record_batches_(std::move(record_batches)) { DCHECK_NE(physical_schema_, nullptr); } InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, - Expression partition_expression) + compute::Expression partition_expression) : Fragment(std::move(partition_expression), /*schema=*/nullptr), record_batches_(std::move(record_batches)) { // Order of argument evaluation is undefined, so compute physical_schema here @@ -144,7 +144,7 @@ Result InMemoryFragment::ScanBatchesAsync( options->batch_size); } -Dataset::Dataset(std::shared_ptr schema, Expression partition_expression) +Dataset::Dataset(std::shared_ptr schema, compute::Expression partition_expression) : schema_(std::move(schema)), partition_expression_(std::move(partition_expression)) {} @@ -158,11 +158,10 @@ Result> Dataset::NewScan() { } Result Dataset::GetFragments() { - ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_)); - return GetFragments(std::move(predicate)); + return GetFragments(compute::literal(true)); } -Result Dataset::GetFragments(Expression predicate) { +Result Dataset::GetFragments(compute::Expression predicate) { ARROW_ASSIGN_OR_RAISE( predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_)); return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate)) @@ -228,7 +227,7 @@ Result> InMemoryDataset::ReplaceSchema( return std::make_shared(std::move(schema), get_batches_); } -Result InMemoryDataset::GetFragmentsImpl(Expression) { +Result InMemoryDataset::GetFragmentsImpl(compute::Expression) { auto schema = this->schema(); auto create_fragment = @@ -269,7 +268,7 @@ Result> UnionDataset::ReplaceSchema( new UnionDataset(std::move(schema), std::move(children))); } -Result UnionDataset::GetFragmentsImpl(Expression predicate) { +Result UnionDataset::GetFragmentsImpl(compute::Expression predicate) { return GetFragmentsFromDatasets(children_, predicate); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index c5c22d731fc60..40a60ffd48e05 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -25,7 +25,7 @@ #include #include -#include "arrow/dataset/expression.h" +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/macros.h" @@ -75,19 +75,21 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { /// \brief An expression which evaluates to true for all data viewed by this /// Fragment. - const Expression& partition_expression() const { return partition_expression_; } + const compute::Expression& partition_expression() const { + return partition_expression_; + } virtual ~Fragment() = default; protected: Fragment() = default; - explicit Fragment(Expression partition_expression, + explicit Fragment(compute::Expression partition_expression, std::shared_ptr physical_schema); virtual Result> ReadPhysicalSchemaImpl() = 0; util::Mutex physical_schema_mutex_; - Expression partition_expression_ = literal(true); + compute::Expression partition_expression_ = compute::literal(true); std::shared_ptr physical_schema_; }; @@ -115,8 +117,9 @@ class ARROW_DS_EXPORT FragmentScanOptions { class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, - Expression = literal(true)); - explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true)); + compute::Expression = compute::literal(true)); + explicit InMemoryFragment(RecordBatchVector record_batches, + compute::Expression = compute::literal(true)); Result Scan(std::shared_ptr options) override; Result ScanBatchesAsync( @@ -144,14 +147,16 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Result> NewScan(); /// \brief GetFragments returns an iterator of Fragments given a predicate. - Result GetFragments(Expression predicate); + Result GetFragments(compute::Expression predicate); Result GetFragments(); const std::shared_ptr& schema() const { return schema_; } /// \brief An expression which evaluates to true for all data viewed by this Dataset. /// May be null, which indicates no information is available. - const Expression& partition_expression() const { return partition_expression_; } + const compute::Expression& partition_expression() const { + return partition_expression_; + } /// \brief The name identifying the kind of Dataset virtual std::string type_name() const = 0; @@ -168,12 +173,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { protected: explicit Dataset(std::shared_ptr schema) : schema_(std::move(schema)) {} - Dataset(std::shared_ptr schema, Expression partition_expression); + Dataset(std::shared_ptr schema, compute::Expression partition_expression); - virtual Result GetFragmentsImpl(Expression predicate) = 0; + virtual Result GetFragmentsImpl(compute::Expression predicate) = 0; std::shared_ptr schema_; - Expression partition_expression_ = literal(true); + compute::Expression partition_expression_ = compute::literal(true); }; /// \addtogroup dataset-implementations @@ -209,7 +214,7 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset { std::shared_ptr schema) const override; protected: - Result GetFragmentsImpl(Expression predicate) override; + Result GetFragmentsImpl(compute::Expression predicate) override; std::shared_ptr get_batches_; }; @@ -233,7 +238,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset { std::shared_ptr schema) const override; protected: - Result GetFragmentsImpl(Expression predicate) override; + Result GetFragmentsImpl(compute::Expression predicate) override; explicit UnionDataset(std::shared_ptr schema, DatasetVector children) : Dataset(std::move(schema)), children_(std::move(children)) {} diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index a5ac474754bfd..6527eac07dd02 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -38,7 +38,7 @@ namespace dataset { /// \brief GetFragmentsFromDatasets transforms a vector into a /// flattened FragmentIterator. inline Result GetFragmentsFromDatasets(const DatasetVector& datasets, - Expression predicate) { + compute::Expression predicate) { // Iterator auto datasets_it = MakeVectorIterator(datasets); @@ -108,12 +108,13 @@ struct SubtreeImpl { expression_codes partition_expression; }; - std::unordered_map expr_to_code_; - std::vector code_to_expr_; + std::unordered_map + expr_to_code_; + std::vector code_to_expr_; std::unordered_set subtree_exprs_; // Encode a subexpression (returning the existing code if possible). - expression_code GetOrInsert(const Expression& expr) { + expression_code GetOrInsert(const compute::Expression& expr) { auto next_code = static_cast(expr_to_code_.size()); auto it_success = expr_to_code_.emplace(expr, next_code); @@ -124,7 +125,8 @@ struct SubtreeImpl { } // Encode an expression (recursively breaking up conjunction members if possible). - void EncodeConjunctionMembers(const Expression& expr, expression_codes* codes) { + void EncodeConjunctionMembers(const compute::Expression& expr, + expression_codes* codes) { if (auto call = expr.call()) { if (call->function_name == "and_kleene") { // expr is a conjunction, encode its arguments @@ -138,7 +140,7 @@ struct SubtreeImpl { } // Convert an encoded subtree or fragment back into an expression. - Expression GetSubtreeExpression(const Encoded& encoded_subtree) { + compute::Expression GetSubtreeExpression(const Encoded& encoded_subtree) { // Filters will already be simplified by all of a subtree's ancestors, so // we only need to simplify the filter by the trailing conjunction member // of each subtree. diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index a146bd6185e0c..70b6930bf2f2a 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -34,7 +34,7 @@ namespace arrow { namespace dataset { -DatasetFactory::DatasetFactory() : root_partition_(literal(true)) {} +DatasetFactory::DatasetFactory() : root_partition_(compute::literal(true)) {} Result> DatasetFactory::Inspect(InspectOptions options) { ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas(std::move(options))); diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index cfe741a5d17a0..5559638448fe3 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -95,9 +95,9 @@ class ARROW_DS_EXPORT DatasetFactory { virtual Result> Finish(FinishOptions options) = 0; /// \brief Optional root partition for the resulting Dataset. - const Expression& root_partition() const { return root_partition_; } + const compute::Expression& root_partition() const { return root_partition_; } /// \brief Set the root partition for the resulting Dataset. - Status SetRootPartition(Expression partition) { + Status SetRootPartition(compute::Expression partition) { root_partition_ = std::move(partition); return Status::OK(); } @@ -107,7 +107,7 @@ class ARROW_DS_EXPORT DatasetFactory { protected: DatasetFactory(); - Expression root_partition_; + compute::Expression root_partition_; }; /// @} diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index d02b094bb43ab..1f47fc8ae8693 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -86,16 +86,17 @@ Result> FileSource::OpenCompressed( Result> FileFormat::MakeFragment( FileSource source, std::shared_ptr physical_schema) { - return MakeFragment(std::move(source), literal(true), std::move(physical_schema)); + return MakeFragment(std::move(source), compute::literal(true), + std::move(physical_schema)); } Result> FileFormat::MakeFragment( - FileSource source, Expression partition_expression) { + FileSource source, compute::Expression partition_expression) { return MakeFragment(std::move(source), std::move(partition_expression), nullptr); } Result> FileFormat::MakeFragment( - FileSource source, Expression partition_expression, + FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema) { return std::shared_ptr( new FileFragment(std::move(source), shared_from_this(), @@ -172,11 +173,11 @@ struct FileSystemDataset::FragmentSubtrees { // Forest for skipping fragments based on extracted subtree expressions Forest forest; // fragment indices and subtree expressions in forest order - std::vector> fragments_and_subtrees; + std::vector> fragments_and_subtrees; }; Result> FileSystemDataset::Make( - std::shared_ptr schema, Expression root_partition, + std::shared_ptr schema, compute::Expression root_partition, std::shared_ptr format, std::shared_ptr filesystem, std::vector> fragments) { std::shared_ptr out( @@ -215,7 +216,7 @@ std::string FileSystemDataset::ToString() const { repr += "\n" + fragment->source().path(); const auto& partition = fragment->partition_expression(); - if (partition != literal(true)) { + if (partition != compute::literal(true)) { repr += ": " + partition.ToString(); } } @@ -264,15 +265,16 @@ void FileSystemDataset::SetupSubtreePruning() { }); } -Result FileSystemDataset::GetFragmentsImpl(Expression predicate) { - if (predicate == literal(true)) { +Result FileSystemDataset::GetFragmentsImpl( + compute::Expression predicate) { + if (predicate == compute::literal(true)) { // trivial predicate; skip subtree pruning return MakeVectorIterator(FragmentVector(fragments_.begin(), fragments_.end())); } std::vector fragment_indices; - std::vector predicates{predicate}; + std::vector predicates{predicate}; RETURN_NOT_OK(subtrees_->forest.Visit( [&](Forest::Ref ref) -> Result { if (auto fragment_index = @@ -282,7 +284,7 @@ Result FileSystemDataset::GetFragmentsImpl(Expression predicat } const auto& subtree_expr = - util::get(subtrees_->fragments_and_subtrees[ref.i]); + util::get(subtrees_->fragments_and_subtrees[ref.i]); ARROW_ASSIGN_OR_RAISE(auto simplified, SimplifyWithGuarantee(predicates.back(), subtree_expr)); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 08359881a207c..41f7933aca9b9 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -158,12 +158,12 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> MakeFragment( - FileSource source, Expression partition_expression, + FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema); /// \brief Create a FileFragment for a FileSource. - Result> MakeFragment(FileSource source, - Expression partition_expression); + Result> MakeFragment( + FileSource source, compute::Expression partition_expression); /// \brief Create a FileFragment for a FileSource. Result> MakeFragment( @@ -193,7 +193,8 @@ class ARROW_DS_EXPORT FileFragment : public Fragment { protected: FileFragment(FileSource source, std::shared_ptr format, - Expression partition_expression, std::shared_ptr physical_schema) + compute::Expression partition_expression, + std::shared_ptr physical_schema) : Fragment(std::move(partition_expression), std::move(physical_schema)), source_(std::move(source)), format_(std::move(format)) {} @@ -226,7 +227,7 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { /// /// \return A constructed dataset. static Result> Make( - std::shared_ptr schema, Expression root_partition, + std::shared_ptr schema, compute::Expression root_partition, std::shared_ptr format, std::shared_ptr filesystem, std::vector> fragments); @@ -258,10 +259,11 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { explicit FileSystemDataset(std::shared_ptr schema) : Dataset(std::move(schema)) {} - FileSystemDataset(std::shared_ptr schema, Expression partition_expression) + FileSystemDataset(std::shared_ptr schema, + compute::Expression partition_expression) : Dataset(std::move(schema), partition_expression) {} - Result GetFragmentsImpl(Expression predicate) override; + Result GetFragmentsImpl(compute::Expression predicate) override; void SetupSubtreePruning(); diff --git a/cpp/src/arrow/dataset/file_benchmark.cc b/cpp/src/arrow/dataset/file_benchmark.cc index 238a83bdc1a39..5caea18511dda 100644 --- a/cpp/src/arrow/dataset/file_benchmark.cc +++ b/cpp/src/arrow/dataset/file_benchmark.cc @@ -17,8 +17,8 @@ #include "benchmark/benchmark.h" +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/discovery.h" -#include "arrow/dataset/expression.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/file_ipc.h" #include "arrow/dataset/partition.h" @@ -62,7 +62,7 @@ static void GetAllFragments(benchmark::State& state) { } } -static void GetFilteredFragments(benchmark::State& state, Expression filter) { +static void GetFilteredFragments(benchmark::State& state, compute::Expression filter) { auto dataset = GetDataset(); ASSERT_OK_AND_ASSIGN(filter, filter.Bind(*dataset->schema())); for (auto _ : state) { @@ -71,6 +71,9 @@ static void GetFilteredFragments(benchmark::State& state, Expression filter) { } } +using compute::field_ref; +using compute::literal; + BENCHMARK(GetAllFragments); // Drill down to a subtree. BENCHMARK_CAPTURE(GetFilteredFragments, single_dir, equal(field_ref("a"), literal(90))); diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 497e4128fdf6e..8dbc18059b36f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -163,7 +163,7 @@ static Result> GetSchemaManifest( return manifest; } -static util::optional ColumnChunkStatisticsAsExpression( +static util::optional ColumnChunkStatisticsAsExpression( const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics // are ignored by returning nullptr. The goal is two fold. First @@ -182,11 +182,11 @@ static util::optional ColumnChunkStatisticsAsExpression( } const auto& field = schema_field.field; - auto field_expr = field_ref(field->name()); + auto field_expr = compute::field_ref(field->name()); // Optimize for corner case where all values are nulls if (statistics->num_values() == statistics->null_count()) { - return equal(std::move(field_expr), literal(MakeNullScalar(field->type()))); + return equal(std::move(field_expr), compute::literal(MakeNullScalar(field->type()))); } std::shared_ptr min, max; @@ -197,10 +197,13 @@ static util::optional ColumnChunkStatisticsAsExpression( auto maybe_min = min->CastTo(field->type()); auto maybe_max = max->CastTo(field->type()); if (maybe_min.ok() && maybe_max.ok()) { - min = maybe_min.MoveValueUnsafe(); - max = maybe_max.MoveValueUnsafe(); - return and_(greater_equal(field_expr, literal(min)), - less_equal(field_expr, literal(max))); + auto lower_bound = + compute::greater_equal(field_expr, compute::literal(maybe_min.MoveValueUnsafe())); + + auto upper_bound = compute::less_equal(std::move(field_expr), + compute::literal(maybe_max.MoveValueUnsafe())); + + return compute::and_(std::move(lower_bound), std::move(upper_bound)); } return util::nullopt; @@ -383,7 +386,7 @@ Result ParquetFileFormat::ScanFile( } Result> ParquetFileFormat::MakeFragment( - FileSource source, Expression partition_expression, + FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema, std::vector row_groups) { return std::shared_ptr(new ParquetFileFragment( std::move(source), shared_from_this(), std::move(partition_expression), @@ -391,7 +394,7 @@ Result> ParquetFileFormat::MakeFragment( } Result> ParquetFileFormat::MakeFragment( - FileSource source, Expression partition_expression, + FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema) { return std::shared_ptr(new ParquetFileFragment( std::move(source), shared_from_this(), std::move(partition_expression), @@ -447,7 +450,7 @@ Status ParquetFileWriter::FinishInternal() { return parquet_writer_->Close(); } ParquetFileFragment::ParquetFileFragment(FileSource source, std::shared_ptr format, - Expression partition_expression, + compute::Expression partition_expression, std::shared_ptr physical_schema, util::optional> row_groups) : FileFragment(std::move(source), std::move(format), std::move(partition_expression), @@ -494,7 +497,7 @@ Status ParquetFileFragment::SetMetadata( metadata_ = std::move(metadata); manifest_ = std::move(manifest); - statistics_expressions_.resize(row_groups_->size(), literal(true)); + statistics_expressions_.resize(row_groups_->size(), compute::literal(true)); statistics_expressions_complete_.resize(physical_schema_->num_fields(), false); for (int row_group : *row_groups_) { @@ -509,7 +512,8 @@ Status ParquetFileFragment::SetMetadata( return Status::OK(); } -Result ParquetFileFragment::SplitByRowGroup(Expression predicate) { +Result ParquetFileFragment::SplitByRowGroup( + compute::Expression predicate) { RETURN_NOT_OK(EnsureCompleteMetadata()); ARROW_ASSIGN_OR_RAISE(auto row_groups, FilterRowGroups(predicate)); @@ -527,7 +531,8 @@ Result ParquetFileFragment::SplitByRowGroup(Expression predicate return fragments; } -Result> ParquetFileFragment::Subset(Expression predicate) { +Result> ParquetFileFragment::Subset( + compute::Expression predicate) { RETURN_NOT_OK(EnsureCompleteMetadata()); ARROW_ASSIGN_OR_RAISE(auto row_groups, FilterRowGroups(predicate)); return Subset(std::move(row_groups)); @@ -544,15 +549,16 @@ Result> ParquetFileFragment::Subset( return new_fragment; } -inline void FoldingAnd(Expression* l, Expression r) { - if (*l == literal(true)) { +inline void FoldingAnd(compute::Expression* l, compute::Expression r) { + if (*l == compute::literal(true)) { *l = std::move(r); } else { *l = and_(std::move(*l), std::move(r)); } } -Result> ParquetFileFragment::FilterRowGroups(Expression predicate) { +Result> ParquetFileFragment::FilterRowGroups( + compute::Expression predicate) { auto lock = physical_schema_mutex_.Lock(); DCHECK_NE(metadata_, nullptr); @@ -724,7 +730,7 @@ ParquetDatasetFactory::CollectParquetFragments(const Partitioning& partitioning) auto partition_expression = partitioning.Parse(StripPrefixAndFilename(path, options_.partition_base_dir)) - .ValueOr(literal(true)); + .ValueOr(compute::literal(true)); ARROW_ASSIGN_OR_RAISE( auto fragment, @@ -775,8 +781,8 @@ Result> ParquetDatasetFactory::Finish(FinishOptions opt } ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*partitioning)); - return FileSystemDataset::Make(std::move(schema), literal(true), format_, filesystem_, - std::move(fragments)); + return FileSystemDataset::Make(std::move(schema), compute::literal(true), format_, + filesystem_, std::move(fragments)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 790e89c24c23c..99e18337ad725 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -103,12 +103,12 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \brief Create a Fragment targeting all RowGroups. Result> MakeFragment( - FileSource source, Expression partition_expression, + FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema) override; /// \brief Create a Fragment, restricted to the specified row groups. Result> MakeFragment( - FileSource source, Expression partition_expression, + FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema, std::vector row_groups); /// \brief Return a FileReader on the given source. @@ -136,7 +136,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// significant performance boost when scanning high latency file systems. class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { public: - Result SplitByRowGroup(Expression predicate); + Result SplitByRowGroup(compute::Expression predicate); /// \brief Return the RowGroups selected by this fragment. const std::vector& row_groups() const { @@ -152,12 +152,12 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR); /// \brief Return fragment which selects a filtered subset of this fragment's RowGroups. - Result> Subset(Expression predicate); + Result> Subset(compute::Expression predicate); Result> Subset(std::vector row_group_ids); private: ParquetFileFragment(FileSource source, std::shared_ptr format, - Expression partition_expression, + compute::Expression partition_expression, std::shared_ptr physical_schema, util::optional> row_groups); @@ -171,7 +171,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { } /// Return a filtered subset of row group indices. - Result> FilterRowGroups(Expression predicate); + Result> FilterRowGroups(compute::Expression predicate); ParquetFileFormat& parquet_format_; @@ -179,7 +179,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// or util::nullopt if all row groups are selected. util::optional> row_groups_; - std::vector statistics_expressions_; + std::vector statistics_expressions_; std::vector statistics_expressions_complete_; std::shared_ptr metadata_; std::shared_ptr manifest_; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 04e61c30d41fb..599c6240c1c5b 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -157,7 +157,8 @@ class TestParquetFileFormat : public FileFormatFixtureMixin } void CountRowGroupsInFragment(const std::shared_ptr& fragment, - std::vector expected_row_groups, Expression filter) { + std::vector expected_row_groups, + compute::Expression filter) { SetFilter(filter); auto parquet_fragment = checked_pointer_cast(fragment); @@ -271,7 +272,8 @@ class TestParquetFileFormatScan : public FileFormatScanMixin& fragment, - std::vector expected_row_groups, Expression filter) { + std::vector expected_row_groups, + compute::Expression filter) { SetFilter(filter); auto parquet_fragment = checked_pointer_cast(fragment); diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index dbddb5b385b34..0c8954e6b7b92 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -170,7 +170,7 @@ TEST_F(TestFileSystemDataset, RootPartitionPruning) { auto root_partition = equal(field_ref("i32"), literal(5)); MakeDataset({fs::File("a"), fs::File("b")}, root_partition); - auto GetFragments = [&](Expression filter) { + auto GetFragments = [&](compute::Expression filter) { return *dataset_->GetFragments(*filter.Bind(*dataset_->schema())); }; @@ -204,7 +204,7 @@ TEST_F(TestFileSystemDataset, TreePartitionPruning) { fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"), }; - std::vector partitions = { + std::vector partitions = { equal(field_ref("state"), literal("NY")), and_(equal(field_ref("state"), literal("NY")), @@ -234,7 +234,7 @@ TEST_F(TestFileSystemDataset, TreePartitionPruning) { // Default filter should always return all data. AssertFragmentsAreFromPath(*dataset_->GetFragments(), all_cities); - auto GetFragments = [&](Expression filter) { + auto GetFragments = [&](compute::Expression filter) { return *dataset_->GetFragments(*filter.Bind(*dataset_->schema())); }; @@ -260,7 +260,7 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) { fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"), }; - std::vector partitions = { + std::vector partitions = { equal(field_ref("state"), literal("NY")), and_(equal(field_ref("state"), literal("NY")), @@ -566,7 +566,7 @@ TEST(Subtree, EncodeFragments) { auto encoded = tree.EncodeFragments(fragments); EXPECT_THAT( tree.code_to_expr_, - ContainerEq(std::vector{ + ContainerEq(std::vector{ equal(field_ref("a"), literal("1")), equal(field_ref("b"), literal("2")), equal(field_ref("a"), literal("2")), equal(field_ref("b"), literal("3"))})); EXPECT_THAT( diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 43ccd777cf2e2..f6e7b9a0d2893 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -53,18 +53,18 @@ std::shared_ptr Partitioning::Default() { std::string type_name() const override { return "default"; } - Result Parse(const std::string& path) const override { - return literal(true); + Result Parse(const std::string& path) const override { + return compute::literal(true); } - Result Format(const Expression& expr) const override { + Result Format(const compute::Expression& expr) const override { return Status::NotImplemented("formatting paths from ", type_name(), " Partitioning"); } Result Partition( const std::shared_ptr& batch) const override { - return PartitionedBatches{{batch}, {literal(true)}}; + return PartitionedBatches{{batch}, {compute::literal(true)}}; } }; @@ -103,7 +103,7 @@ Result KeyValuePartitioning::Partition( if (key_indices.empty()) { // no fields to group by; return the whole batch - return PartitionedBatches{{batch}, {literal(true)}}; + return PartitionedBatches{{batch}, {compute::literal(true)}}; } // assemble an ExecBatch of the key columns @@ -132,14 +132,15 @@ Result KeyValuePartitioning::Partition( // assemble partition expressions from the unique keys out.expressions.resize(grouper->num_groups()); for (uint32_t group = 0; group < grouper->num_groups(); ++group) { - std::vector exprs(num_keys); + std::vector exprs(num_keys); for (int i = 0; i < num_keys; ++i) { ARROW_ASSIGN_OR_RAISE(auto val, unique_arrays[i]->GetScalar(group)); const auto& name = batch->schema()->field(key_indices[i])->name(); - exprs[i] = val->is_valid ? equal(field_ref(name), literal(std::move(val))) - : is_null(field_ref(name)); + exprs[i] = val->is_valid ? compute::equal(compute::field_ref(name), + compute::literal(std::move(val))) + : compute::is_null(compute::field_ref(name)); } out.expressions[group] = and_(std::move(exprs)); } @@ -157,10 +158,10 @@ Result KeyValuePartitioning::Partition( return out; } -Result KeyValuePartitioning::ConvertKey(const Key& key) const { +Result KeyValuePartitioning::ConvertKey(const Key& key) const { ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(*schema_)); if (match.empty()) { - return literal(true); + return compute::literal(true); } auto field_index = match[0]; @@ -169,7 +170,7 @@ Result KeyValuePartitioning::ConvertKey(const Key& key) const { std::shared_ptr converted; if (!key.value.has_value()) { - return is_null(field_ref(field->name())); + return compute::is_null(compute::field_ref(field->name())); } else if (field->type()->id() == Type::DICTIONARY) { if (dictionaries_.empty() || dictionaries_[field_index] == nullptr) { return Status::Invalid("No dictionary provided for dictionary field ", @@ -201,22 +202,23 @@ Result KeyValuePartitioning::ConvertKey(const Key& key) const { ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), *key.value)); } - return equal(field_ref(field->name()), literal(std::move(converted))); + return compute::equal(compute::field_ref(field->name()), + compute::literal(std::move(converted))); } -Result KeyValuePartitioning::Parse(const std::string& path) const { - std::vector expressions; +Result KeyValuePartitioning::Parse(const std::string& path) const { + std::vector expressions; for (const Key& key : ParseKeys(path)) { ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key)); - if (expr == literal(true)) continue; + if (expr == compute::literal(true)) continue; expressions.push_back(std::move(expr)); } return and_(std::move(expressions)); } -Result KeyValuePartitioning::Format(const Expression& expr) const { +Result KeyValuePartitioning::Format(const compute::Expression& expr) const { ScalarVector values{static_cast(schema_->num_fields()), nullptr}; ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr)); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 6330725b7a63b..36276e7a3b1f4 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -26,7 +26,7 @@ #include #include -#include "arrow/dataset/expression.h" +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/optional.h" @@ -67,15 +67,15 @@ class ARROW_DS_EXPORT Partitioning { /// produce sub-batches which satisfy mutually exclusive Expressions. struct PartitionedBatches { RecordBatchVector batches; - std::vector expressions; + std::vector expressions; }; virtual Result Partition( const std::shared_ptr& batch) const = 0; /// \brief Parse a path into a partition expression - virtual Result Parse(const std::string& path) const = 0; + virtual Result Parse(const std::string& path) const = 0; - virtual Result Format(const Expression& expr) const = 0; + virtual Result Format(const compute::Expression& expr) const = 0; /// \brief A default Partitioning which always yields scalar(true) static std::shared_ptr Default(); @@ -142,9 +142,9 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { Result Partition( const std::shared_ptr& batch) const override; - Result Parse(const std::string& path) const override; + Result Parse(const std::string& path) const override; - Result Format(const Expression& expr) const override; + Result Format(const compute::Expression& expr) const override; protected: KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries) @@ -159,7 +159,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { virtual Result FormatValues(const ScalarVector& values) const = 0; /// Convert a Key to a full expression. - Result ConvertKey(const Key& key) const; + Result ConvertKey(const Key& key) const; ArrayVector dictionaries_; }; @@ -234,9 +234,9 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { /// \brief Implementation provided by lambda or other callable class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning { public: - using ParseImpl = std::function(const std::string&)>; + using ParseImpl = std::function(const std::string&)>; - using FormatImpl = std::function(const Expression&)>; + using FormatImpl = std::function(const compute::Expression&)>; FunctionPartitioning(std::shared_ptr schema, ParseImpl parse_impl, FormatImpl format_impl = NULLPTR, std::string name = "function") @@ -247,11 +247,11 @@ class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning { std::string type_name() const override { return name_; } - Result Parse(const std::string& path) const override { + Result Parse(const std::string& path) const override { return parse_impl_(path); } - Result Format(const Expression& expr) const override { + Result Format(const compute::Expression& expr) const override { if (format_impl_) { return format_impl_(expr); } diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 06c3cc676742a..1c776f1832928 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -46,17 +46,17 @@ class TestPartitioning : public ::testing::Test { ASSERT_RAISES(Invalid, partitioning_->Parse(path)); } - void AssertParse(const std::string& path, Expression expected) { + void AssertParse(const std::string& path, compute::Expression expected) { ASSERT_OK_AND_ASSIGN(auto parsed, partitioning_->Parse(path)); ASSERT_EQ(parsed, expected); } template - void AssertFormatError(Expression expr) { + void AssertFormatError(compute::Expression expr) { ASSERT_EQ(partitioning_->Format(expr).status().code(), code); } - void AssertFormat(Expression expr, const std::string& expected) { + void AssertFormat(compute::Expression expr, const std::string& expected) { // formatted partition expressions are bound to the schema of the dataset being // written ASSERT_OK_AND_ASSIGN(auto formatted, partitioning_->Format(expr)); @@ -64,7 +64,8 @@ class TestPartitioning : public ::testing::Test { // ensure the formatted path round trips the relevant components of the partition // expression: roundtripped should be a subset of expr - ASSERT_OK_AND_ASSIGN(Expression roundtripped, partitioning_->Parse(formatted)); + ASSERT_OK_AND_ASSIGN(compute::Expression roundtripped, + partitioning_->Parse(formatted)); ASSERT_OK_AND_ASSIGN(roundtripped, roundtripped.Bind(*written_schema_)); ASSERT_OK_AND_ASSIGN(auto simplified, SimplifyWithGuarantee(roundtripped, expr)); @@ -81,7 +82,7 @@ class TestPartitioning : public ::testing::Test { void AssertPartition(const std::shared_ptr partitioning, const std::shared_ptr full_batch, const RecordBatchVector& expected_batches, - const std::vector& expected_expressions) { + const std::vector& expected_expressions) { ASSERT_OK_AND_ASSIGN(auto partition_results, partitioning->Partition(full_batch)); std::shared_ptr rest = full_batch; ASSERT_EQ(partition_results.batches.size(), expected_batches.size()); @@ -91,7 +92,8 @@ class TestPartitioning : public ::testing::Test { std::shared_ptr actual_batch = partition_results.batches[partition_index]; AssertBatchesEqual(*expected_batches[partition_index], *actual_batch); - Expression actual_expression = partition_results.expressions[partition_index]; + compute::Expression actual_expression = + partition_results.expressions[partition_index]; ASSERT_EQ(expected_expressions[partition_index], actual_expression); } } @@ -101,7 +103,7 @@ class TestPartitioning : public ::testing::Test { const std::string& record_batch_json, const std::shared_ptr partitioned_schema, const std::vector& expected_record_batch_strs, - const std::vector& expected_expressions) { + const std::vector& expected_expressions) { auto record_batch = RecordBatchFromJSON(schema, record_batch_json); RecordBatchVector expected_batches; for (const auto& expected_record_batch_str : expected_record_batch_strs) { @@ -161,7 +163,7 @@ TEST_F(TestPartitioning, Partition) { R"([{"c": 4}])", }; - std::vector expected_expressions = { + std::vector expected_expressions = { and_(equal(field_ref("a"), literal(3)), equal(field_ref("b"), literal("x"))), and_(equal(field_ref("a"), literal(1)), is_null(field_ref("b"))), and_(is_null(field_ref("a")), is_null(field_ref("b"))), @@ -562,7 +564,7 @@ TEST_F(TestPartitioning, EtlThenHive) { field("hour", int8()), field("alpha", int32()), field("beta", float32())}); partitioning_ = std::make_shared( - schm, [&](const std::string& path) -> Result { + schm, [&](const std::string& path) -> Result { auto segments = fs::internal::SplitAbstractPath(path); if (segments.size() < etl_fields.size() + alphabeta_fields.size()) { return Status::Invalid("path ", path, " can't be parsed"); @@ -604,8 +606,8 @@ TEST_F(TestPartitioning, Set) { // An adhoc partitioning which parses segments like "/x in [1 4 5]" // into (field_ref("x") == 1 or field_ref("x") == 4 or field_ref("x") == 5) partitioning_ = std::make_shared( - schm, [&](const std::string& path) -> Result { - std::vector subexpressions; + schm, [&](const std::string& path) -> Result { + std::vector subexpressions; for (auto segment : fs::internal::SplitAbstractPath(path)) { std::smatch matches; @@ -643,8 +645,8 @@ class RangePartitioning : public Partitioning { std::string type_name() const override { return "range"; } - Result Parse(const std::string& path) const override { - std::vector ranges; + Result Parse(const std::string& path) const override { + std::vector ranges; for (auto segment : fs::internal::SplitAbstractPath(path)) { auto key = HivePartitioning::ParseKey(segment, ""); @@ -688,7 +690,7 @@ class RangePartitioning : public Partitioning { return Status::OK(); } - Result Format(const Expression&) const override { return ""; } + Result Format(const compute::Expression&) const override { return ""; } Result Partition( const std::shared_ptr&) const override { return Status::OK(); diff --git a/cpp/src/arrow/dataset/projector.h b/cpp/src/arrow/dataset/projector.h index d3171fbfb3d43..86d38f0af2352 100644 --- a/cpp/src/arrow/dataset/projector.h +++ b/cpp/src/arrow/dataset/projector.h @@ -25,7 +25,7 @@ namespace arrow { namespace dataset { -// FIXME this is superceded by Expression::Bind +// FIXME this is superceded by compute::Expression::Bind ARROW_DS_EXPORT Status CheckProjectable(const Schema& from, const Schema& to); } // namespace dataset diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 70f1529465015..41fa7ec5c7730 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -45,7 +45,7 @@ using FragmentGenerator = std::function>()>; std::vector ScanOptions::MaterializedFields() const { std::vector fields; - for (const Expression* expr : {&filter, &projection}) { + for (const compute::Expression* expr : {&filter, &projection}) { for (const FieldRef& ref : FieldsInExpression(*expr)) { DCHECK(ref.name()); fields.push_back(*ref.name()); @@ -406,7 +406,7 @@ namespace { inline Result DoFilterAndProjectRecordBatchAsync( const std::shared_ptr& scanner, const EnumeratedRecordBatch& in) { - ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, SimplifyWithGuarantee(scanner->options()->filter, in.fragment.value->partition_expression())); @@ -431,7 +431,7 @@ inline Result DoFilterAndProjectRecordBatchAsync( compute::FilterOptions::Defaults(), &exec_context)); } - ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, SimplifyWithGuarantee(scanner->options()->projection, in.fragment.value->partition_expression())); ARROW_ASSIGN_OR_RAISE( @@ -685,12 +685,12 @@ Status ScannerBuilder::Project(std::vector columns) { return SetProjection(scan_options_.get(), std::move(columns)); } -Status ScannerBuilder::Project(std::vector exprs, +Status ScannerBuilder::Project(std::vector exprs, std::vector names) { return SetProjection(scan_options_.get(), std::move(exprs), std::move(names)); } -Status ScannerBuilder::Filter(const Expression& filter) { +Status ScannerBuilder::Filter(const compute::Expression& filter) { return SetFilter(scan_options_.get(), filter); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 79e5986a4def0..15bd27ab4f359 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -25,8 +25,8 @@ #include #include +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/dataset.h" -#include "arrow/dataset/expression.h" #include "arrow/dataset/projector.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -55,9 +55,9 @@ constexpr int32_t kDefaultFragmentReadahead = 8; /// Scan-specific options, which can be changed between scans of the same dataset. struct ARROW_DS_EXPORT ScanOptions { /// A row filter (which will be pushed down to partitioning/reading if supported). - Expression filter = literal(true); + compute::Expression filter = compute::literal(true); /// A projection expression (which can add/remove/rename columns). - Expression projection; + compute::Expression projection; /// Schema with which batches will be read from fragments. This is also known as the /// "reader schema" it will be used (for example) in constructing CSV file readers to @@ -333,7 +333,7 @@ class ARROW_DS_EXPORT ScannerBuilder { /// /// \return Failure if any referenced column does not exists in the dataset's /// Schema. - Status Project(std::vector exprs, std::vector names); + Status Project(std::vector exprs, std::vector names); /// \brief Set the filter expression to return only rows matching the filter. /// @@ -346,7 +346,7 @@ class ARROW_DS_EXPORT ScannerBuilder { /// /// \return Failure if any referenced columns does not exist in the dataset's /// Schema. - Status Filter(const Expression& filter); + Status Filter(const compute::Expression& filter); /// \brief Indicate if the Scanner should make use of the available /// ThreadPool found in ScanOptions; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 507bf82a735db..56065d9983e8a 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -39,7 +39,8 @@ using internal::Executor; namespace dataset { inline Result> FilterSingleBatch( - const std::shared_ptr& in, const Expression& filter, MemoryPool* pool) { + const std::shared_ptr& in, const compute::Expression& filter, + MemoryPool* pool) { compute::ExecContext exec_context{pool}; ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(filter, Datum(in), &exec_context)); @@ -58,7 +59,8 @@ inline Result> FilterSingleBatch( return filtered.record_batch(); } -inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression filter, +inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, + compute::Expression filter, MemoryPool* pool) { return MakeMaybeMapIterator( [=](std::shared_ptr in) -> Result> { @@ -68,7 +70,7 @@ inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression } inline Result> ProjectSingleBatch( - const std::shared_ptr& in, const Expression& projection, + const std::shared_ptr& in, const compute::Expression& projection, MemoryPool* pool) { compute::ExecContext exec_context{pool}; ARROW_ASSIGN_OR_RAISE(Datum projected, @@ -88,7 +90,8 @@ inline Result> ProjectSingleBatch( } inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, - Expression projection, MemoryPool* pool) { + compute::Expression projection, + MemoryPool* pool) { return MakeMaybeMapIterator( [=](std::shared_ptr in) -> Result> { return ProjectSingleBatch(in, projection, pool); @@ -98,7 +101,8 @@ inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, class FilterAndProjectScanTask : public ScanTask { public: - explicit FilterAndProjectScanTask(std::shared_ptr task, Expression partition) + explicit FilterAndProjectScanTask(std::shared_ptr task, + compute::Expression partition) : ScanTask(task->options(), task->fragment()), task_(std::move(task)), partition_(std::move(partition)) {} @@ -106,10 +110,10 @@ class FilterAndProjectScanTask : public ScanTask { Result Execute() override { ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); - ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, SimplifyWithGuarantee(options()->filter, partition_)); - ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, SimplifyWithGuarantee(options()->projection, partition_)); RecordBatchIterator filter_it = @@ -122,10 +126,10 @@ class FilterAndProjectScanTask : public ScanTask { Result ToFilteredAndProjectedIterator( const RecordBatchVector& rbs) { auto it = MakeVectorIterator(rbs); - ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, SimplifyWithGuarantee(options()->filter, partition_)); - ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, SimplifyWithGuarantee(options()->projection, partition_)); RecordBatchIterator filter_it = @@ -137,10 +141,10 @@ class FilterAndProjectScanTask : public ScanTask { Result> FilterAndProjectBatch( const std::shared_ptr& batch) { - ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, SimplifyWithGuarantee(options()->filter, partition_)); - ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, SimplifyWithGuarantee(options()->projection, partition_)); ARROW_ASSIGN_OR_RAISE(auto filtered, FilterSingleBatch(batch, simplified_filter, options_->pool)); @@ -174,7 +178,7 @@ class FilterAndProjectScanTask : public ScanTask { private: std::shared_ptr task_; - Expression partition_; + compute::Expression partition_; }; /// \brief GetScanTaskIterator transforms an Iterator in a @@ -209,7 +213,7 @@ inline Status NestedFieldRefsNotImplemented() { return Status::NotImplemented("Nested field references in scans."); } -inline Status SetProjection(ScanOptions* options, const Expression& projection) { +inline Status SetProjection(ScanOptions* options, const compute::Expression& projection) { ARROW_ASSIGN_OR_RAISE(options->projection, projection.Bind(*options->dataset_schema)); if (options->projection.type()->id() != Type::STRUCT) { @@ -223,7 +227,7 @@ inline Status SetProjection(ScanOptions* options, const Expression& projection) return Status::OK(); } -inline Status SetProjection(ScanOptions* options, std::vector exprs, +inline Status SetProjection(ScanOptions* options, std::vector exprs, std::vector names) { compute::ProjectOptions project_options{std::move(names)}; @@ -243,14 +247,14 @@ inline Status SetProjection(ScanOptions* options, std::vector exprs, } inline Status SetProjection(ScanOptions* options, std::vector names) { - std::vector exprs(names.size()); + std::vector exprs(names.size()); for (size_t i = 0; i < exprs.size(); ++i) { - exprs[i] = field_ref(names[i]); + exprs[i] = compute::field_ref(names[i]); } return SetProjection(options, std::move(exprs), std::move(names)); } -inline Status SetFilter(ScanOptions* options, const Expression& filter) { +inline Status SetFilter(ScanOptions* options, const compute::Expression& filter) { for (const auto& ref : FieldsInExpression(filter)) { if (!ref.name()) return NestedFieldRefsNotImplemented(); diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index a83210fdd3bc7..17f4e079ae40b 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -566,7 +566,7 @@ class ControlledDataset : public Dataset { void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); } protected: - Result GetFragmentsImpl(Expression predicate) override { + Result GetFragmentsImpl(compute::Expression predicate) override { std::vector> casted_fragments(fragments_.begin(), fragments_.end()); return MakeVectorIterator(std::move(casted_fragments)); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 0a65099ce07e8..83ae4bbf1e87c 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -31,6 +31,7 @@ #include #include +#include "arrow/compute/exec/expression.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" @@ -54,22 +55,22 @@ namespace arrow { namespace dataset { -const std::shared_ptr kBoringSchema = schema({ - field("bool", boolean()), - field("i8", int8()), - field("i32", int32()), - field("i32_req", int32(), /*nullable=*/false), - field("u32", uint32()), - field("i64", int64()), - field("f32", float32()), - field("f32_req", float32(), /*nullable=*/false), - field("f64", float64()), - field("date64", date64()), - field("str", utf8()), - field("dict_str", dictionary(int32(), utf8())), - field("dict_i32", dictionary(int32(), int32())), - field("ts_ns", timestamp(TimeUnit::NANO)), -}); +using compute::call; +using compute::field_ref; +using compute::literal; + +using compute::and_; +using compute::equal; +using compute::greater; +using compute::greater_equal; +using compute::is_null; +using compute::is_valid; +using compute::less; +using compute::less_equal; +using compute::not_; +using compute::not_equal; +using compute::or_; +using compute::project; using fs::internal::GetAbstractPathExtension; using internal::checked_cast; @@ -125,13 +126,15 @@ class FragmentDataset : public Dataset { public: FragmentDataset(std::shared_ptr schema, FragmentVector fragments) : Dataset(std::move(schema)), fragments_(std::move(fragments)) {} + std::string type_name() const override { return "fragment"; } + Result> ReplaceSchema(std::shared_ptr) const override { return Status::NotImplemented(""); } protected: - Result GetFragmentsImpl(Expression predicate) override { + Result GetFragmentsImpl(compute::Expression predicate) override { return MakeVectorIterator(fragments_); } FragmentVector fragments_; @@ -288,7 +291,7 @@ class DatasetFixtureMixin : public ::testing::Test { SetFilter(literal(true)); } - void SetFilter(Expression filter) { + void SetFilter(compute::Expression filter) { ASSERT_OK_AND_ASSIGN(options_->filter, filter.Bind(*schema_)); } @@ -381,7 +384,7 @@ class FileFormatFixtureMixin : public ::testing::Test { ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names())); } - void SetFilter(Expression filter) { + void SetFilter(compute::Expression filter) { ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema)); } @@ -740,9 +743,9 @@ struct MakeFileSystemDatasetMixin { } void MakeDataset(const std::vector& infos, - Expression root_partition = literal(true), - std::vector partitions = {}, - std::shared_ptr s = kBoringSchema) { + compute::Expression root_partition = literal(true), + std::vector partitions = {}, + std::shared_ptr s = schema({})) { auto n_fragments = infos.size(); if (partitions.empty()) { partitions.resize(n_fragments, literal(true)); @@ -801,8 +804,9 @@ void AssertFragmentsAreFromPath(FragmentIterator it, std::vector ex testing::UnorderedElementsAreArray(expected)); } -static std::vector PartitionExpressionsOf(const FragmentVector& fragments) { - std::vector partition_expressions; +static std::vector PartitionExpressionsOf( + const FragmentVector& fragments) { + std::vector partition_expressions; std::transform(fragments.begin(), fragments.end(), std::back_inserter(partition_expressions), [](const std::shared_ptr& fragment) { @@ -812,7 +816,7 @@ static std::vector PartitionExpressionsOf(const FragmentVector& frag } void AssertFragmentsHavePartitionExpressions(std::shared_ptr dataset, - std::vector expected) { + std::vector expected) { ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments()); for (auto& expr : expected) { ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*dataset->schema())); diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 6ba65a63afdc8..0a4148c49e1a4 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -30,6 +30,7 @@ namespace arrow { namespace compute { class ExecContext; +class Expression; } // namespace compute @@ -73,8 +74,6 @@ class ParquetFragmentScanOptions; class ParquetFileWriter; class ParquetFileWriteOptions; -class Expression; - class Partitioning; class PartitioningFactory; class PartitioningOrFactory; diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 82e1c8f13a2b3..bff1a2bbb540e 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -32,28 +32,37 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: pass -cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: +cdef extern from "arrow/compute/exec/expression.h" \ + namespace "arrow::compute" nogil: - cdef cppclass CExpression "arrow::dataset::Expression": + cdef cppclass CExpression "arrow::compute::Expression": c_bool Equals(const CExpression& other) const c_string ToString() const CResult[CExpression] Bind(const CSchema&) cdef CExpression CMakeScalarExpression \ - "arrow::dataset::literal"(shared_ptr[CScalar] value) + "arrow::compute::literal"(shared_ptr[CScalar] value) cdef CExpression CMakeFieldExpression \ - "arrow::dataset::field_ref"(c_string name) + "arrow::compute::field_ref"(c_string name) cdef CExpression CMakeCallExpression \ - "arrow::dataset::call"(c_string function, + "arrow::compute::call"(c_string function, vector[CExpression] arguments, shared_ptr[CFunctionOptions] options) cdef CResult[shared_ptr[CBuffer]] CSerializeExpression \ - "arrow::dataset::Serialize"(const CExpression&) + "arrow::compute::Serialize"(const CExpression&) + cdef CResult[CExpression] CDeserializeExpression \ - "arrow::dataset::Deserialize"(shared_ptr[CBuffer]) + "arrow::compute::Deserialize"(shared_ptr[CBuffer]) + + cdef CResult[unordered_map[CFieldRef, CDatum, CFieldRefHash]] \ + CExtractKnownFieldValues "arrow::compute::ExtractKnownFieldValues"( + const CExpression& partition_expression) + + +cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScanOptions "arrow::dataset::ScanOptions": @staticmethod @@ -331,10 +340,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CPartitioning] partitioning() const shared_ptr[CPartitioningFactory] factory() const - cdef CResult[unordered_map[CFieldRef, CDatum, CFieldRefHash]] \ - CExtractKnownFieldValues "arrow::dataset::ExtractKnownFieldValues"( - const CExpression& partition_expression) - cdef cppclass CFileSystemFactoryOptions \ "arrow::dataset::FileSystemFactoryOptions": CPartitioningOrFactory partitioning diff --git a/r/NAMESPACE b/r/NAMESPACE index 117e3de5c22bb..607177235e977 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -162,6 +162,7 @@ export(ParquetArrowReaderProperties) export(ParquetFileFormat) export(ParquetFileReader) export(ParquetFileWriter) +export(ParquetFragmentScanOptions) export(ParquetVersionType) export(ParquetWriterProperties) export(Partitioning) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 51cdcf85df08d..b8d72c30ed63d 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -756,24 +756,24 @@ FixedSizeListType__list_size <- function(type){ .Call(`_arrow_FixedSizeListType__list_size`, type) } -dataset___expr__call <- function(func_name, argument_list, options){ - .Call(`_arrow_dataset___expr__call`, func_name, argument_list, options) +compute___expr__call <- function(func_name, argument_list, options){ + .Call(`_arrow_compute___expr__call`, func_name, argument_list, options) } -dataset___expr__field_ref <- function(name){ - .Call(`_arrow_dataset___expr__field_ref`, name) +compute___expr__field_ref <- function(name){ + .Call(`_arrow_compute___expr__field_ref`, name) } -dataset___expr__get_field_ref_name <- function(ref){ - .Call(`_arrow_dataset___expr__get_field_ref_name`, ref) +compute___expr__get_field_ref_name <- function(x){ + .Call(`_arrow_compute___expr__get_field_ref_name`, x) } -dataset___expr__scalar <- function(x){ - .Call(`_arrow_dataset___expr__scalar`, x) +compute___expr__scalar <- function(x){ + .Call(`_arrow_compute___expr__scalar`, x) } -dataset___expr__ToString <- function(x){ - .Call(`_arrow_dataset___expr__ToString`, x) +compute___expr__ToString <- function(x){ + .Call(`_arrow_compute___expr__ToString`, x) } ipc___WriteFeather__Table <- function(stream, table, version, chunk_size, compression, compression_level){ diff --git a/r/R/expression.R b/r/R/expression.R index b3fdd52a5d05f..1e104677d8bfa 100644 --- a/r/R/expression.R +++ b/r/R/expression.R @@ -253,7 +253,7 @@ print.array_expression <- function(x, ...) { #' @export Expression <- R6Class("Expression", inherit = ArrowObject, public = list( - ToString = function() dataset___expr__ToString(self), + ToString = function() compute___expr__ToString(self), cast = function(to_type, safe = TRUE, ...) { opts <- list( to_type = to_type, @@ -265,7 +265,7 @@ Expression <- R6Class("Expression", inherit = ArrowObject, } ), active = list( - field_name = function() dataset___expr__get_field_ref_name(self) + field_name = function() compute___expr__get_field_ref_name(self) ) ) Expression$create <- function(function_name, @@ -273,14 +273,14 @@ Expression$create <- function(function_name, args = list(...), options = empty_named_list()) { assert_that(is.string(function_name)) - dataset___expr__call(function_name, args, options) + compute___expr__call(function_name, args, options) } Expression$field_ref <- function(name) { assert_that(is.string(name)) - dataset___expr__field_ref(name) + compute___expr__field_ref(name) } Expression$scalar <- function(x) { - dataset___expr__scalar(Scalar$create(x)) + compute___expr__scalar(Scalar$create(x)) } build_dataset_expression <- function(FUN, diff --git a/r/man/FileFormat.Rd b/r/man/FileFormat.Rd index 795027e1f24a0..b8d4dc01badf0 100644 --- a/r/man/FileFormat.Rd +++ b/r/man/FileFormat.Rd @@ -28,11 +28,8 @@ delimiter for text files `format = "parquet"``: \itemize{ -\item \code{use_buffered_stream}: Read files through buffered input streams rather than -loading entire row groups at once. This may be enabled -to reduce memory overhead. Disabled by default. -\item \code{buffer_size}: Size of buffered stream, if enabled. Default is 8KB. \item \code{dict_columns}: Names of columns which should be read as dictionaries. +\item Any Parquet options from \link{FragmentScanOptions}. } \code{format = "text"}: see \link{CsvParseOptions}. Note that you can specify them either diff --git a/r/man/FragmentScanOptions.Rd b/r/man/FragmentScanOptions.Rd index 8bafbb0b21c87..103d0589505aa 100644 --- a/r/man/FragmentScanOptions.Rd +++ b/r/man/FragmentScanOptions.Rd @@ -3,6 +3,7 @@ \name{FragmentScanOptions} \alias{FragmentScanOptions} \alias{CsvFragmentScanOptions} +\alias{ParquetFragmentScanOptions} \title{Format-specific scan options} \description{ A \code{FragmentScanOptions} holds options specific to a \code{FileFormat} and a scan @@ -14,14 +15,24 @@ operation. \itemize{ \item \code{format}: A string identifier of the file format. Currently supported values: \itemize{ +\item "parquet" \item "csv"/"text", aliases for the same format. } \item \code{...}: Additional format-specific options +`format = "parquet"``: +\itemize{ +\item \code{use_buffered_stream}: Read files through buffered input streams rather than +loading entire row groups at once. This may be enabled +to reduce memory overhead. Disabled by default. +\item \code{buffer_size}: Size of buffered stream, if enabled. Default is 8KB. +\item \code{pre_buffer}: Pre-buffer the raw Parquet data. This can improve performance +on high-latency filesystems. Disabled by default. \code{format = "text"}: see \link{CsvConvertOptions}. Note that options can only be specified with the Arrow C++ library naming. Also, "block_size" from \link{CsvReadOptions} may be given. } +} It returns the appropriate subclass of \code{FragmentScanOptions} (e.g. \code{CsvFragmentScanOptions}). diff --git a/r/man/arrow-package.Rd b/r/man/arrow-package.Rd index 0c19402a045d2..ca6d32a895a4d 100644 --- a/r/man/arrow-package.Rd +++ b/r/man/arrow-package.Rd @@ -26,6 +26,8 @@ Useful links: Authors: \itemize{ + \item Ian Cook \email{ianmcook@gmail.com} + \item Jonathan Keane \email{jkeane@gmail.com} \item Romain François \email{romain@rstudio.com} (\href{https://orcid.org/0000-0002-2444-4226}{ORCID}) \item Jeroen Ooms \email{jeroen@berkeley.edu} \item Apache Arrow \email{dev@arrow.apache.org} [copyright holder] diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index c5ef6343ced03..3feef14a87358 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1855,11 +1855,11 @@ extern "C" SEXP _arrow_dataset___ScannerBuilder__ProjectNames(SEXP sb_sexp, SEXP // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -void dataset___ScannerBuilder__ProjectExprs(const std::shared_ptr& sb, const std::vector>& exprs, const std::vector& names); +void dataset___ScannerBuilder__ProjectExprs(const std::shared_ptr& sb, const std::vector>& exprs, const std::vector& names); extern "C" SEXP _arrow_dataset___ScannerBuilder__ProjectExprs(SEXP sb_sexp, SEXP exprs_sexp, SEXP names_sexp){ BEGIN_CPP11 arrow::r::Input&>::type sb(sb_sexp); - arrow::r::Input>&>::type exprs(exprs_sexp); + arrow::r::Input>&>::type exprs(exprs_sexp); arrow::r::Input&>::type names(names_sexp); dataset___ScannerBuilder__ProjectExprs(sb, exprs, names); return R_NilValue; @@ -1873,11 +1873,11 @@ extern "C" SEXP _arrow_dataset___ScannerBuilder__ProjectExprs(SEXP sb_sexp, SEXP // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -void dataset___ScannerBuilder__Filter(const std::shared_ptr& sb, const std::shared_ptr& expr); +void dataset___ScannerBuilder__Filter(const std::shared_ptr& sb, const std::shared_ptr& expr); extern "C" SEXP _arrow_dataset___ScannerBuilder__Filter(SEXP sb_sexp, SEXP expr_sexp){ BEGIN_CPP11 arrow::r::Input&>::type sb(sb_sexp); - arrow::r::Input&>::type expr(expr_sexp); + arrow::r::Input&>::type expr(expr_sexp); dataset___ScannerBuilder__Filter(sb, expr); return R_NilValue; END_CPP11 @@ -2927,79 +2927,79 @@ extern "C" SEXP _arrow_FixedSizeListType__list_size(SEXP type_sexp){ #endif // expression.cpp -#if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___expr__call(std::string func_name, cpp11::list argument_list, cpp11::list options); -extern "C" SEXP _arrow_dataset___expr__call(SEXP func_name_sexp, SEXP argument_list_sexp, SEXP options_sexp){ +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr compute___expr__call(std::string func_name, cpp11::list argument_list, cpp11::list options); +extern "C" SEXP _arrow_compute___expr__call(SEXP func_name_sexp, SEXP argument_list_sexp, SEXP options_sexp){ BEGIN_CPP11 arrow::r::Input::type func_name(func_name_sexp); arrow::r::Input::type argument_list(argument_list_sexp); arrow::r::Input::type options(options_sexp); - return cpp11::as_sexp(dataset___expr__call(func_name, argument_list, options)); + return cpp11::as_sexp(compute___expr__call(func_name, argument_list, options)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___expr__call(SEXP func_name_sexp, SEXP argument_list_sexp, SEXP options_sexp){ - Rf_error("Cannot call dataset___expr__call(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_compute___expr__call(SEXP func_name_sexp, SEXP argument_list_sexp, SEXP options_sexp){ + Rf_error("Cannot call compute___expr__call(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // expression.cpp -#if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___expr__field_ref(std::string name); -extern "C" SEXP _arrow_dataset___expr__field_ref(SEXP name_sexp){ +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr compute___expr__field_ref(std::string name); +extern "C" SEXP _arrow_compute___expr__field_ref(SEXP name_sexp){ BEGIN_CPP11 arrow::r::Input::type name(name_sexp); - return cpp11::as_sexp(dataset___expr__field_ref(name)); + return cpp11::as_sexp(compute___expr__field_ref(name)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___expr__field_ref(SEXP name_sexp){ - Rf_error("Cannot call dataset___expr__field_ref(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_compute___expr__field_ref(SEXP name_sexp){ + Rf_error("Cannot call compute___expr__field_ref(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // expression.cpp -#if defined(ARROW_R_WITH_DATASET) -std::string dataset___expr__get_field_ref_name(const std::shared_ptr& ref); -extern "C" SEXP _arrow_dataset___expr__get_field_ref_name(SEXP ref_sexp){ +#if defined(ARROW_R_WITH_ARROW) +std::string compute___expr__get_field_ref_name(const std::shared_ptr& x); +extern "C" SEXP _arrow_compute___expr__get_field_ref_name(SEXP x_sexp){ BEGIN_CPP11 - arrow::r::Input&>::type ref(ref_sexp); - return cpp11::as_sexp(dataset___expr__get_field_ref_name(ref)); + arrow::r::Input&>::type x(x_sexp); + return cpp11::as_sexp(compute___expr__get_field_ref_name(x)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___expr__get_field_ref_name(SEXP ref_sexp){ - Rf_error("Cannot call dataset___expr__get_field_ref_name(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_compute___expr__get_field_ref_name(SEXP x_sexp){ + Rf_error("Cannot call compute___expr__get_field_ref_name(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // expression.cpp -#if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___expr__scalar(const std::shared_ptr& x); -extern "C" SEXP _arrow_dataset___expr__scalar(SEXP x_sexp){ +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr compute___expr__scalar(const std::shared_ptr& x); +extern "C" SEXP _arrow_compute___expr__scalar(SEXP x_sexp){ BEGIN_CPP11 arrow::r::Input&>::type x(x_sexp); - return cpp11::as_sexp(dataset___expr__scalar(x)); + return cpp11::as_sexp(compute___expr__scalar(x)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___expr__scalar(SEXP x_sexp){ - Rf_error("Cannot call dataset___expr__scalar(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_compute___expr__scalar(SEXP x_sexp){ + Rf_error("Cannot call compute___expr__scalar(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // expression.cpp -#if defined(ARROW_R_WITH_DATASET) -std::string dataset___expr__ToString(const std::shared_ptr& x); -extern "C" SEXP _arrow_dataset___expr__ToString(SEXP x_sexp){ +#if defined(ARROW_R_WITH_ARROW) +std::string compute___expr__ToString(const std::shared_ptr& x); +extern "C" SEXP _arrow_compute___expr__ToString(SEXP x_sexp){ BEGIN_CPP11 - arrow::r::Input&>::type x(x_sexp); - return cpp11::as_sexp(dataset___expr__ToString(x)); + arrow::r::Input&>::type x(x_sexp); + return cpp11::as_sexp(compute___expr__ToString(x)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___expr__ToString(SEXP x_sexp){ - Rf_error("Cannot call dataset___expr__ToString(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_compute___expr__ToString(SEXP x_sexp){ + Rf_error("Cannot call compute___expr__ToString(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -6793,11 +6793,11 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_FixedSizeListType__value_field", (DL_FUNC) &_arrow_FixedSizeListType__value_field, 1}, { "_arrow_FixedSizeListType__value_type", (DL_FUNC) &_arrow_FixedSizeListType__value_type, 1}, { "_arrow_FixedSizeListType__list_size", (DL_FUNC) &_arrow_FixedSizeListType__list_size, 1}, - { "_arrow_dataset___expr__call", (DL_FUNC) &_arrow_dataset___expr__call, 3}, - { "_arrow_dataset___expr__field_ref", (DL_FUNC) &_arrow_dataset___expr__field_ref, 1}, - { "_arrow_dataset___expr__get_field_ref_name", (DL_FUNC) &_arrow_dataset___expr__get_field_ref_name, 1}, - { "_arrow_dataset___expr__scalar", (DL_FUNC) &_arrow_dataset___expr__scalar, 1}, - { "_arrow_dataset___expr__ToString", (DL_FUNC) &_arrow_dataset___expr__ToString, 1}, + { "_arrow_compute___expr__call", (DL_FUNC) &_arrow_compute___expr__call, 3}, + { "_arrow_compute___expr__field_ref", (DL_FUNC) &_arrow_compute___expr__field_ref, 1}, + { "_arrow_compute___expr__get_field_ref_name", (DL_FUNC) &_arrow_compute___expr__get_field_ref_name, 1}, + { "_arrow_compute___expr__scalar", (DL_FUNC) &_arrow_compute___expr__scalar, 1}, + { "_arrow_compute___expr__ToString", (DL_FUNC) &_arrow_compute___expr__ToString, 1}, { "_arrow_ipc___WriteFeather__Table", (DL_FUNC) &_arrow_ipc___WriteFeather__Table, 6}, { "_arrow_ipc___feather___Reader__version", (DL_FUNC) &_arrow_ipc___feather___Reader__version, 1}, { "_arrow_ipc___feather___Reader__Read", (DL_FUNC) &_arrow_ipc___feather___Reader__Read, 2}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index b94ab76472965..5f7c725ffecc7 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -55,6 +55,7 @@ namespace ds = ::arrow::dataset; #endif +namespace compute = ::arrow::compute; namespace fs = ::arrow::fs; SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index f4d7746eb10c6..7d8ccae6eeecb 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -31,6 +31,7 @@ namespace ds = ::arrow::dataset; namespace fs = ::arrow::fs; +namespace compute = ::arrow::compute; namespace cpp11 { @@ -370,10 +371,10 @@ void dataset___ScannerBuilder__ProjectNames(const std::shared_ptr& sb, - const std::vector>& exprs, + const std::vector>& exprs, const std::vector& names) { // We have shared_ptrs of expressions but need the Expressions - std::vector expressions; + std::vector expressions; for (auto expr : exprs) { expressions.push_back(*expr); } @@ -382,7 +383,7 @@ void dataset___ScannerBuilder__ProjectExprs( // [[dataset::export]] void dataset___ScannerBuilder__Filter(const std::shared_ptr& sb, - const std::shared_ptr& expr) { + const std::shared_ptr& expr) { StopIfNotOk(sb->Filter(*expr)); } diff --git a/r/src/expression.cpp b/r/src/expression.cpp index 0e8fd52034d10..798853edd720d 100644 --- a/r/src/expression.cpp +++ b/r/src/expression.cpp @@ -17,54 +17,54 @@ #include "./arrow_types.h" -#if defined(ARROW_R_WITH_DATASET) +#if defined(ARROW_R_WITH_ARROW) #include -#include -namespace ds = ::arrow::dataset; +#include -std::shared_ptr make_compute_options( - std::string func_name, cpp11::list options); +namespace compute = ::arrow::compute; -// [[dataset::export]] -std::shared_ptr dataset___expr__call(std::string func_name, - cpp11::list argument_list, - cpp11::list options) { - std::vector arguments; +std::shared_ptr make_compute_options(std::string func_name, + cpp11::list options); + +// [[arrow::export]] +std::shared_ptr compute___expr__call(std::string func_name, + cpp11::list argument_list, + cpp11::list options) { + std::vector arguments; for (SEXP argument : argument_list) { - auto argument_ptr = cpp11::as_cpp>(argument); + auto argument_ptr = cpp11::as_cpp>(argument); arguments.push_back(*argument_ptr); } auto options_ptr = make_compute_options(func_name, options); - return std::make_shared( - ds::call(std::move(func_name), std::move(arguments), std::move(options_ptr))); + return std::make_shared( + compute::call(std::move(func_name), std::move(arguments), std::move(options_ptr))); } -// [[dataset::export]] -std::shared_ptr dataset___expr__field_ref(std::string name) { - return std::make_shared(ds::field_ref(std::move(name))); +// [[arrow::export]] +std::shared_ptr compute___expr__field_ref(std::string name) { + return std::make_shared(compute::field_ref(std::move(name))); } -// [[dataset::export]] -std::string dataset___expr__get_field_ref_name( - const std::shared_ptr& ref) { - auto field_ref = ref->field_ref(); - if (field_ref == nullptr) { - return ""; +// [[arrow::export]] +std::string compute___expr__get_field_ref_name( + const std::shared_ptr& x) { + if (auto field_ref = x->field_ref()) { + return *field_ref->name(); } - return *field_ref->name(); + return ""; } -// [[dataset::export]] -std::shared_ptr dataset___expr__scalar( +// [[arrow::export]] +std::shared_ptr compute___expr__scalar( const std::shared_ptr& x) { - return std::make_shared(ds::literal(std::move(x))); + return std::make_shared(compute::literal(std::move(x))); } -// [[dataset::export]] -std::string dataset___expr__ToString(const std::shared_ptr& x) { +// [[arrow::export]] +std::string compute___expr__ToString(const std::shared_ptr& x) { return x->ToString(); }