diff --git a/.travis.yml b/.travis.yml index 57646246c4a54..2cf70cca982ff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -125,7 +125,8 @@ jobs: JDK: 11 allow_failures: - - arch: s390x + - name: "Go on s390x" + - name: "Java on s390x" before_install: - eval "$(python ci/detect-changes.py)" diff --git a/c_glib/arrow-dataset-glib/scanner.cpp b/c_glib/arrow-dataset-glib/scanner.cpp index 68c3e98ea0880..5a8bdf2171488 100644 --- a/c_glib/arrow-dataset-glib/scanner.cpp +++ b/c_glib/arrow-dataset-glib/scanner.cpp @@ -265,7 +265,7 @@ gad_scan_options_class_init(GADScanOptionsClass *klass) gobject_class->set_property = gad_scan_options_set_property; gobject_class->get_property = gad_scan_options_get_property; - auto scan_options = arrow::dataset::ScanOptions::Make(arrow::schema({})); + auto scan_options = std::make_shared(); spec = g_param_spec_pointer("scan-options", "ScanOptions", @@ -307,7 +307,8 @@ GADScanOptions * gad_scan_options_new(GArrowSchema *schema) { auto arrow_schema = garrow_schema_get_raw(schema); - auto arrow_scan_options = arrow::dataset::ScanOptions::Make(arrow_schema); + auto arrow_scan_options = std::make_shared(); + arrow_scan_options->dataset_schema = arrow_schema; return gad_scan_options_new_raw(&arrow_scan_options); } @@ -323,30 +324,10 @@ GArrowSchema * gad_scan_options_get_schema(GADScanOptions *scan_options) { auto priv = GAD_SCAN_OPTIONS_GET_PRIVATE(scan_options); - auto arrow_schema = priv->scan_options->schema(); + auto arrow_schema = priv->scan_options->dataset_schema; return garrow_schema_new_raw(&arrow_schema); } -/** - * gad_scan_options_replace_schema: - * @scan_options: A #GADScanOptions. - * @schema: A #GArrowSchema. - * - * Returns: (transfer full): - * A copy of the #GADScanOptions with the given #GArrowSchema. - * - * Since: 1.0.0 - */ -GADScanOptions * -gad_scan_options_replace_schema(GADScanOptions *scan_options, - GArrowSchema *schema) -{ - auto priv = GAD_SCAN_OPTIONS_GET_PRIVATE(scan_options); - auto arrow_schema = garrow_schema_get_raw(schema); - auto arrow_scan_options_copy = priv->scan_options->ReplaceSchema(arrow_schema); - return gad_scan_options_new_raw(&arrow_scan_options_copy); -} - /* arrow::dataset::ScanTask */ typedef struct GADScanTaskPrivate_ { diff --git a/c_glib/arrow-dataset-glib/scanner.h b/c_glib/arrow-dataset-glib/scanner.h index 75d212b180834..1c53343369373 100644 --- a/c_glib/arrow-dataset-glib/scanner.h +++ b/c_glib/arrow-dataset-glib/scanner.h @@ -57,9 +57,6 @@ GARROW_AVAILABLE_IN_1_0 GADScanOptions *gad_scan_options_new(GArrowSchema *schema); GARROW_AVAILABLE_IN_1_0 GArrowSchema *gad_scan_options_get_schema(GADScanOptions *scan_options); -GARROW_AVAILABLE_IN_1_0 -GADScanOptions *gad_scan_options_replace_schema(GADScanOptions *scan_options, - GArrowSchema *schema); /* arrow::dataset::ScanTask */ diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp index 17100e76a3c12..db6fa544069b1 100644 --- a/c_glib/arrow-glib/reader.cpp +++ b/c_glib/arrow-glib/reader.cpp @@ -1591,8 +1591,7 @@ garrow_csv_reader_new(GArrowInputStream *input, } auto arrow_reader = - arrow::csv::TableReader::Make(arrow::default_memory_pool(), - arrow::io::AsyncContext(), + arrow::csv::TableReader::Make(arrow::io::default_io_context(), arrow_input, read_options, parse_options, diff --git a/c_glib/test/dataset/test-scan-options.rb b/c_glib/test/dataset/test-scan-options.rb index 1f5b77f2e9ff0..a8bcd12afded6 100644 --- a/c_glib/test/dataset/test-scan-options.rb +++ b/c_glib/test/dataset/test-scan-options.rb @@ -34,11 +34,4 @@ def test_batch_size assert_equal(42, @scan_options.batch_size) end - - def test_replace_schema - other_schema = Arrow::Schema.new([Arrow::Field.new("visible", Arrow::BooleanDataType.new)]) - other_scan_options = @scan_options.replace_schema(other_schema) - assert_not_equal(@schema, other_scan_options.schema) - assert_equal(other_schema, other_scan_options.schema) - end end diff --git a/c_glib/test/test-decimal128-data-type.rb b/c_glib/test/test-decimal128-data-type.rb index a02e3badca051..b27e1cad1ea3f 100644 --- a/c_glib/test/test-decimal128-data-type.rb +++ b/c_glib/test/test-decimal128-data-type.rb @@ -23,12 +23,12 @@ def test_type def test_name data_type = Arrow::Decimal128DataType.new(2, 0) - assert_equal("decimal", data_type.name) + assert_equal("decimal128", data_type.name) end def test_to_s data_type = Arrow::Decimal128DataType.new(2, 0) - assert_equal("decimal(2, 0)", data_type.to_s) + assert_equal("decimal128(2, 0)", data_type.to_s) end def test_precision diff --git a/cpp/examples/minimal_build/example.cc b/cpp/examples/minimal_build/example.cc index 8f58de5777a49..e1b5c123a85fb 100644 --- a/cpp/examples/minimal_build/example.cc +++ b/cpp/examples/minimal_build/example.cc @@ -39,7 +39,7 @@ Status RunMain(int argc, char** argv) { ARROW_ASSIGN_OR_RAISE( auto csv_reader, arrow::csv::TableReader::Make(arrow::default_memory_pool(), - arrow::io::AsyncContext(), + arrow::io::default_io_context(), input_file, arrow::csv::ReadOptions::Defaults(), arrow::csv::ParseOptions::Defaults(), diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 4403def994932..abd5428b3d775 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -349,6 +349,9 @@ if(ARROW_CSV) csv/options.cc csv/parser.cc csv/reader.cc) + if(ARROW_COMPUTE) + list(APPEND ARROW_SRCS csv/writer.cc) + endif() list(APPEND ARROW_TESTING_SRCS csv/test_common.cc) endif() @@ -366,6 +369,7 @@ if(ARROW_COMPUTE) compute/kernels/aggregate_basic.cc compute/kernels/aggregate_mode.cc compute/kernels/aggregate_quantile.cc + compute/kernels/aggregate_tdigest.cc compute/kernels/aggregate_var_std.cc compute/kernels/codegen_internal.cc compute/kernels/scalar_arithmetic.cc diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 586eac2eeaec3..5afa104896085 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -68,5 +68,10 @@ Result Quantile(const Datum& value, const QuantileOptions& options, return CallFunction("quantile", {value}, &options, ctx); } +Result TDigest(const Datum& value, const TDigestOptions& options, + ExecContext* ctx) { + return CallFunction("tdigest", {value}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 335186122fdc8..eef1587bb732b 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -127,6 +127,28 @@ struct ARROW_EXPORT QuantileOptions : public FunctionOptions { enum Interpolation interpolation; }; +/// \brief Control TDigest approximate quantile kernel behavior +/// +/// By default, returns the median value. +struct ARROW_EXPORT TDigestOptions : public FunctionOptions { + explicit TDigestOptions(double q = 0.5, uint32_t delta = 100, + uint32_t buffer_size = 500) + : q{q}, delta{delta}, buffer_size{buffer_size} {} + + explicit TDigestOptions(std::vector q, uint32_t delta = 100, + uint32_t buffer_size = 500) + : q{std::move(q)}, delta{delta}, buffer_size{buffer_size} {} + + static TDigestOptions Defaults() { return TDigestOptions{}; } + + /// quantile must be between 0 and 1 inclusive + std::vector q; + /// compression parameter, default 100 + uint32_t delta; + /// input buffer size, default 500 + uint32_t buffer_size; +}; + /// @} /// \brief Count non-null (or null) values in an array. @@ -270,5 +292,19 @@ Result Quantile(const Datum& value, const QuantileOptions& options = QuantileOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Calculate the approximate quantiles of a numeric array with T-Digest algorithm +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see TDigestOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return resulting datum as an array +/// +/// \since 4.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result TDigest(const Datum& value, + const TDigestOptions& options = TDigestOptions::Defaults(), + ExecContext* ctx = NULLPTR); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_scalar.h b/cpp/src/arrow/compute/api_scalar.h index 37f3077e4bd91..0d95092c95bf4 100644 --- a/cpp/src/arrow/compute/api_scalar.h +++ b/cpp/src/arrow/compute/api_scalar.h @@ -115,10 +115,25 @@ struct CompareOptions : public FunctionOptions { }; struct ARROW_EXPORT ProjectOptions : public FunctionOptions { - explicit ProjectOptions(std::vector n) : field_names(std::move(n)) {} + ProjectOptions(std::vector n, std::vector r, + std::vector> m) + : field_names(std::move(n)), + field_nullability(std::move(r)), + field_metadata(std::move(m)) {} + + explicit ProjectOptions(std::vector n) + : field_names(std::move(n)), + field_nullability(field_names.size(), true), + field_metadata(field_names.size(), NULLPTR) {} /// Names for wrapped columns std::vector field_names; + + /// Nullability bits for wrapped columns + std::vector field_nullability; + + /// Metadata attached to wrapped columns + std::vector> field_metadata; }; /// @} diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index f5ab46ac603c3..0082d48112dc1 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -74,8 +74,9 @@ Result> Unique(const Datum& value, ExecContext* ctx) { return result.make_array(); } -Result DictionaryEncode(const Datum& value, ExecContext* ctx) { - return CallFunction("dictionary_encode", {value}, ctx); +Result DictionaryEncode(const Datum& value, const DictionaryEncodeOptions& options, + ExecContext* ctx) { + return CallFunction("dictionary_encode", {value}, &options, ctx); } const char kValuesFieldName[] = "values"; diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 9e9cad9e5d9bf..d67568e15671a 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -63,6 +63,24 @@ enum class SortOrder { Descending, }; +/// \brief Options for the dictionary encode function +struct DictionaryEncodeOptions : public FunctionOptions { + /// Configure how null values will be encoded + enum NullEncodingBehavior { + /// the null value will be added to the dictionary with a proper index + ENCODE, + /// the null value will be masked in the indices array + MASK + }; + + explicit DictionaryEncodeOptions(NullEncodingBehavior null_encoding = MASK) + : null_encoding_behavior(null_encoding) {} + + static DictionaryEncodeOptions Defaults() { return DictionaryEncodeOptions(); } + + NullEncodingBehavior null_encoding_behavior = MASK; +}; + /// \brief One sort key for PartitionNthIndices (TODO) and SortIndices struct ARROW_EXPORT SortKey { explicit SortKey(std::string name, SortOrder order = SortOrder::Ascending) @@ -289,14 +307,29 @@ Result> ValueCounts(const Datum& value, ExecContext* ctx = NULLPTR); /// \brief Dictionary-encode values in an array-like object +/// +/// Any nulls encountered in the dictionary will be handled according to the +/// specified null encoding behavior. +/// +/// For example, given values ["a", "b", null, "a", null] the output will be +/// (null_encoding == ENCODE) Indices: [0, 1, 2, 0, 2] / Dict: ["a", "b", null] +/// (null_encoding == MASK) Indices: [0, 1, null, 0, null] / Dict: ["a", "b"] +/// +/// If the input is already dictionary encoded this function is a no-op unless +/// it needs to modify the null_encoding (TODO) +/// /// \param[in] data array-like input /// \param[in] ctx the function execution context, optional +/// \param[in] options configures null encoding behavior /// \return result with same shape and type as input /// /// \since 1.0.0 /// \note API not yet finalized ARROW_EXPORT -Result DictionaryEncode(const Datum& data, ExecContext* ctx = NULLPTR); +Result DictionaryEncode( + const Datum& data, + const DictionaryEncodeOptions& options = DictionaryEncodeOptions::Defaults(), + ExecContext* ctx = NULLPTR); // ---------------------------------------------------------------------- // Deprecated functions diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index ecf3d6962f552..6443c96e91868 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -473,6 +473,9 @@ class KernelExecutorImpl : public KernelExecutor { if (validity_preallocated_) { ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_->AllocateBitmap(length)); } + if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) { + out->null_count = 0; + } for (size_t i = 0; i < data_preallocated_.size(); ++i) { const auto& prealloc = data_preallocated_[i]; if (prealloc.bit_width >= 0) { diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc index db5db543013d8..c90dd03c06ed7 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc @@ -461,11 +461,23 @@ VARIANCE_KERNEL_BENCHMARK(VarianceKernelDouble, DoubleType); // Quantile // +static std::vector deciles() { + return {0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}; +} + +static std::vector centiles() { + std::vector q(101); + for (int i = 0; i <= 100; ++i) { + q[i] = i / 100.0; + } + return q; +} + template -void QuantileKernelBench(benchmark::State& state, int min, int max) { +void QuantileKernel(benchmark::State& state, int min, int max, std::vector q) { using CType = typename TypeTraits::CType; - QuantileOptions options; + QuantileOptions options(std::move(q)); RegressionArgs args(state); const int64_t array_size = args.size / sizeof(CType); auto rand = random::RandomArrayGenerator(1926); @@ -474,29 +486,90 @@ void QuantileKernelBench(benchmark::State& state, int min, int max) { for (auto _ : state) { ABORT_NOT_OK(Quantile(array, options).status()); } + state.SetItemsProcessed(state.iterations() * array_size); +} + +template +void QuantileKernelMedian(benchmark::State& state, int min, int max) { + QuantileKernel(state, min, max, {0.5}); +} + +template +void QuantileKernelMedianWide(benchmark::State& state) { + QuantileKernel(state, 0, 1 << 24, {0.5}); +} + +template +void QuantileKernelMedianNarrow(benchmark::State& state) { + QuantileKernel(state, -30000, 30000, {0.5}); +} + +template +void QuantileKernelDecilesWide(benchmark::State& state) { + QuantileKernel(state, 0, 1 << 24, deciles()); +} + +template +void QuantileKernelDecilesNarrow(benchmark::State& state) { + QuantileKernel(state, -30000, 30000, deciles()); +} + +template +void QuantileKernelCentilesWide(benchmark::State& state) { + QuantileKernel(state, 0, 1 << 24, centiles()); +} + +template +void QuantileKernelCentilesNarrow(benchmark::State& state) { + QuantileKernel(state, -30000, 30000, centiles()); } -static void QuantileKernelBenchArgs(benchmark::internal::Benchmark* bench) { +static void QuantileKernelArgs(benchmark::internal::Benchmark* bench) { BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); } -#define QUANTILE_KERNEL_BENCHMARK_WIDE(FuncName, Type) \ - static void FuncName(benchmark::State& state) { \ - QuantileKernelBench(state, 0, 1 << 24); \ - } \ - BENCHMARK(FuncName)->Apply(QuantileKernelBenchArgs) - -#define QUANTILE_KERNEL_BENCHMARK_NARROW(FuncName, Type) \ - static void FuncName(benchmark::State& state) { \ - QuantileKernelBench(state, -30000, 30000); \ - } \ - BENCHMARK(FuncName)->Apply(QuantileKernelBenchArgs) - -QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt32Wide, Int32Type); -QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt32Narrow, Int32Type); -QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt64Wide, Int64Type); -QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt64Narrow, Int64Type); -QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelDouble, DoubleType); +BENCHMARK_TEMPLATE(QuantileKernelMedianNarrow, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianWide, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianNarrow, Int64Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianWide, Int64Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianWide, DoubleType)->Apply(QuantileKernelArgs); + +BENCHMARK_TEMPLATE(QuantileKernelDecilesNarrow, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelDecilesWide, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelDecilesWide, DoubleType)->Apply(QuantileKernelArgs); + +BENCHMARK_TEMPLATE(QuantileKernelCentilesNarrow, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelCentilesWide, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelCentilesWide, DoubleType)->Apply(QuantileKernelArgs); + +static void TDigestKernelDouble(benchmark::State& state, std::vector q) { + TDigestOptions options{std::move(q)}; + RegressionArgs args(state); + const int64_t array_size = args.size / sizeof(double); + auto rand = random::RandomArrayGenerator(1926); + auto array = rand.Numeric(array_size, 0, 1 << 24, args.null_proportion); + + for (auto _ : state) { + ABORT_NOT_OK(TDigest(array, options).status()); + } + state.SetItemsProcessed(state.iterations() * array_size); +} + +static void TDigestKernelDoubleMedian(benchmark::State& state) { + TDigestKernelDouble(state, {0.5}); +} + +static void TDigestKernelDoubleDeciles(benchmark::State& state) { + TDigestKernelDouble(state, deciles()); +} + +static void TDigestKernelDoubleCentiles(benchmark::State& state) { + TDigestKernelDouble(state, centiles()); +} + +BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs); +BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs); +BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc new file mode 100644 index 0000000000000..fc8f43b0ae2d5 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/api_aggregate.h" +#include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/tdigest.h" + +namespace arrow { +namespace compute { +namespace internal { + +namespace { + +using arrow::internal::TDigest; +using arrow::internal::VisitSetBitRunsVoid; + +template +struct TDigestImpl : public ScalarAggregator { + using ThisType = TDigestImpl; + using ArrayType = typename TypeTraits::ArrayType; + using CType = typename ArrowType::c_type; + + explicit TDigestImpl(const TDigestOptions& options) + : q{options.q}, tdigest{options.delta, options.buffer_size} {} + + void Consume(KernelContext*, const ExecBatch& batch) override { + const ArrayData& data = *batch[0].array(); + const CType* values = data.GetValues(1); + + if (data.length > data.GetNullCount()) { + VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + [&](int64_t pos, int64_t len) { + for (int64_t i = 0; i < len; ++i) { + this->tdigest.NanAdd(values[pos + i]); + } + }); + } + } + + void MergeFrom(KernelContext*, KernelState&& src) override { + auto& other = checked_cast(src); + std::vector other_tdigest; + other_tdigest.push_back(std::move(other.tdigest)); + this->tdigest.Merge(&other_tdigest); + } + + void Finalize(KernelContext* ctx, Datum* out) override { + const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size(); + auto out_data = ArrayData::Make(float64(), out_length, 0); + out_data->buffers.resize(2, nullptr); + + if (out_length > 0) { + KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx, + ctx->Allocate(out_length * sizeof(double))); + double* out_buffer = out_data->template GetMutableValues(1); + for (int64_t i = 0; i < out_length; ++i) { + out_buffer[i] = this->tdigest.Quantile(this->q[i]); + } + } + + *out = Datum(std::move(out_data)); + } + + const std::vector& q; + TDigest tdigest; +}; + +struct TDigestInitState { + std::unique_ptr state; + KernelContext* ctx; + const DataType& in_type; + const TDigestOptions& options; + + TDigestInitState(KernelContext* ctx, const DataType& in_type, + const TDigestOptions& options) + : ctx(ctx), in_type(in_type), options(options) {} + + Status Visit(const DataType&) { + return Status::NotImplemented("No tdigest implemented"); + } + + Status Visit(const HalfFloatType&) { + return Status::NotImplemented("No tdigest implemented"); + } + + template + enable_if_t::value, Status> Visit(const Type&) { + state.reset(new TDigestImpl(options)); + return Status::OK(); + } + + std::unique_ptr Create() { + ctx->SetStatus(VisitTypeInline(in_type, this)); + return std::move(state); + } +}; + +std::unique_ptr TDigestInit(KernelContext* ctx, const KernelInitArgs& args) { + TDigestInitState visitor(ctx, *args.inputs[0].type, + static_cast(*args.options)); + return visitor.Create(); +} + +void AddTDigestKernels(KernelInit init, + const std::vector>& types, + ScalarAggregateFunction* func) { + for (const auto& ty : types) { + auto sig = KernelSignature::Make({InputType::Array(ty)}, float64()); + AddAggKernel(std::move(sig), init, func); + } +} + +const FunctionDoc tdigest_doc{ + "Approximate quantiles of a numeric array with T-Digest algorithm", + ("By default, 0.5 quantile (median) is returned.\n" + "Nulls and NaNs are ignored.\n" + "An empty array is returned if there is no valid data point."), + {"array"}, + "TDigestOptions"}; + +std::shared_ptr AddTDigestAggKernels() { + static auto default_tdigest_options = TDigestOptions::Defaults(); + auto func = std::make_shared( + "tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options); + AddTDigestKernels(TDigestInit, NumericTypes(), func.get()); + return func; +} + +} // namespace + +void RegisterScalarAggregateTDigest(FunctionRegistry* registry) { + DCHECK_OK(registry->AddFunction(AddTDigestAggKernels())); +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 47872565ab66b..e772d474909df 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1292,7 +1292,7 @@ TYPED_TEST(TestVarStdKernelRandom, Basics) { double var_population, var_sample; using ArrayType = typename TypeTraits::ArrayType; - auto typed_array = std::static_pointer_cast(array->Slice(0, total_size)); + auto typed_array = checked_pointer_cast(array->Slice(0, total_size)); std::tie(var_population, var_sample) = WelfordVar(*typed_array); this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population); @@ -1313,7 +1313,7 @@ TEST_F(TestVarStdKernelIntegerLength, Basics) { // auto array = rand.Numeric(4000000000, min, min + 100000, 0.1); double var_population, var_sample; - auto int32_array = std::static_pointer_cast(array); + auto int32_array = checked_pointer_cast(array); std::tie(var_population, var_sample) = WelfordVar(*int32_array); this->AssertVarStdIs(*array, VarianceOptions{0}, var_population); @@ -1343,22 +1343,22 @@ class TestPrimitiveQuantileKernel : public ::testing::Test { ASSERT_OK(out_array->ValidateFull()); ASSERT_EQ(out_array->length(), options.q.size()); ASSERT_EQ(out_array->null_count(), 0); - ASSERT_EQ(out_array->type(), expected[0][i].type()); + AssertTypeEqual(out_array->type(), expected[0][i].type()); - if (out_array->type() == float64()) { + if (out_array->type()->Equals(float64())) { const double* quantiles = out_array->data()->GetValues(1); for (int64_t j = 0; j < out_array->length(); ++j) { const auto& numeric_scalar = - std::static_pointer_cast(expected[j][i].scalar()); + checked_pointer_cast(expected[j][i].scalar()); ASSERT_TRUE((quantiles[j] == numeric_scalar->value) || (std::isnan(quantiles[j]) && std::isnan(numeric_scalar->value))); } } else { - ASSERT_EQ(out_array->type(), type_singleton()); + AssertTypeEqual(out_array->type(), type_singleton()); const CType* quantiles = out_array->data()->GetValues(1); for (int64_t j = 0; j < out_array->length(); ++j) { const auto& numeric_scalar = - std::static_pointer_cast>(expected[j][i].scalar()); + checked_pointer_cast>(expected[j][i].scalar()); ASSERT_EQ(quantiles[j], numeric_scalar->value); } } @@ -1530,42 +1530,86 @@ TEST_F(TestInt64QuantileKernel, Int64) { #undef O #ifndef __MINGW32__ -class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { +class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { public: void CheckQuantiles(int64_t array_size, int64_t num_quantiles) { - auto rand = random::RandomArrayGenerator(0x5487658); + std::shared_ptr array; + std::vector quantiles; // small value range to exercise input array with equal values and histogram approach - const auto array = rand.Numeric(array_size, -100, 200, 0.1); + GenerateTestData(array_size, num_quantiles, -100, 200, &array, &quantiles); + this->AssertQuantilesAre(array, QuantileOptions{quantiles}, + NaiveQuantile(*array, quantiles, interpolations_)); + } + + void CheckTDigests(const std::vector& chunk_sizes, int64_t num_quantiles) { + int total_size = 0; + for (int size : chunk_sizes) { + total_size += size; + } + std::shared_ptr array; std::vector quantiles; - random_real(num_quantiles, 0x5487658, 0.0, 1.0, &quantiles); - // make sure to exercise 0 and 1 quantiles - *std::min_element(quantiles.begin(), quantiles.end()) = 0; - *std::max_element(quantiles.begin(), quantiles.end()) = 1; + GenerateTestData(total_size, num_quantiles, 100, 123456789, &array, &quantiles); - this->AssertQuantilesAre(array, QuantileOptions{quantiles}, - NaiveQuantile(*array, quantiles)); + total_size = 0; + ArrayVector array_vector; + for (int size : chunk_sizes) { + array_vector.emplace_back(array->Slice(total_size, size)); + total_size += size; + } + auto chunked = *ChunkedArray::Make(array_vector); + + TDigestOptions options(quantiles); + ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, options)); + const auto& out_array = out.make_array(); + ASSERT_OK(out_array->ValidateFull()); + ASSERT_EQ(out_array->length(), quantiles.size()); + ASSERT_EQ(out_array->null_count(), 0); + AssertTypeEqual(out_array->type(), float64()); + + // linear interpolated exact quantile as reference + std::vector> exact = + NaiveQuantile(*array, quantiles, {QuantileOptions::LINEAR}); + const double* approx = out_array->data()->GetValues(1); + for (size_t i = 0; i < quantiles.size(); ++i) { + const auto& exact_scalar = checked_pointer_cast(exact[i][0].scalar()); + const double tolerance = std::fabs(exact_scalar->value) * 0.05; + EXPECT_NEAR(approx[i], exact_scalar->value, tolerance) << quantiles[i]; + } } private: - std::vector> NaiveQuantile(const Array& array, - const std::vector& quantiles) { + void GenerateTestData(int64_t array_size, int64_t num_quantiles, int min, int max, + std::shared_ptr* array, std::vector* quantiles) { + auto rand = random::RandomArrayGenerator(0x5487658); + *array = rand.Float64(array_size, min, max, /*null_prob=*/0.1, /*nan_prob=*/0.2); + + random_real(num_quantiles, 0x5487658, 0.0, 1.0, quantiles); + // make sure to exercise 0 and 1 quantiles + *std::min_element(quantiles->begin(), quantiles->end()) = 0; + *std::max_element(quantiles->begin(), quantiles->end()) = 1; + } + + std::vector> NaiveQuantile( + const Array& array, const std::vector& quantiles, + const std::vector& interpolations) { // copy and sort input array - std::vector input(array.length() - array.null_count()); - const int32_t* values = array.data()->GetValues(1); + std::vector input(array.length() - array.null_count()); + const double* values = array.data()->GetValues(1); const auto bitmap = array.null_bitmap_data(); int64_t index = 0; for (int64_t i = 0; i < array.length(); ++i) { - if (BitUtil::GetBit(bitmap, i)) { + if (BitUtil::GetBit(bitmap, i) && !std::isnan(values[i])) { input[index++] = values[i]; } } + input.resize(index); std::sort(input.begin(), input.end()); std::vector> output(quantiles.size(), - std::vector(interpolations_.size())); - for (uint64_t i = 0; i < interpolations_.size(); ++i) { - const auto interp = interpolations_[i]; + std::vector(interpolations.size())); + for (uint64_t i = 0; i < interpolations.size(); ++i) { + const auto interp = interpolations[i]; for (uint64_t j = 0; j < quantiles.size(); ++j) { output[j][i] = GetQuantile(input, quantiles[j], interp); } @@ -1573,7 +1617,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { return output; } - Datum GetQuantile(const std::vector& input, double q, + Datum GetQuantile(const std::vector& input, double q, enum QuantileOptions::Interpolation interp) { const double index = (input.size() - 1) * q; const uint64_t lower_index = static_cast(index); @@ -1594,14 +1638,14 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { } case QuantileOptions::LINEAR: if (fraction == 0) { - return Datum(static_cast(input[lower_index])); + return Datum(input[lower_index]); } else { return Datum(fraction * input[lower_index + 1] + (1 - fraction) * input[lower_index]); } case QuantileOptions::MIDPOINT: if (fraction == 0) { - return Datum(static_cast(input[lower_index])); + return Datum(input[lower_index]); } else { return Datum(input[lower_index] / 2.0 + input[lower_index + 1] / 2.0); } @@ -1625,7 +1669,30 @@ TEST_F(TestRandomQuantileKernel, Histogram) { // exercise histogram approach: size >= 65536, range <= 65536 this->CheckQuantiles(/*array_size=*/80000, /*num_quantiles=*/100); } + +TEST_F(TestRandomQuantileKernel, TDigest) { + this->CheckTDigests(/*chunk_sizes=*/{12345, 6789, 8765, 4321}, /*num_quantiles=*/100); +} #endif +class TestTDigestKernel : public ::testing::Test {}; + +TEST_F(TestTDigestKernel, AllNullsOrNaNs) { + const std::vector> tests = { + {"[]"}, + {"[null, null]", "[]", "[null]"}, + {"[NaN]", "[NaN, NaN]", "[]"}, + {"[null, NaN, null]"}, + {"[NaN, NaN]", "[]", "[null]"}, + }; + + for (const auto& json : tests) { + auto chunked = ChunkedArrayFromJSON(float64(), json); + ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, TDigestOptions())); + ASSERT_OK(out.make_array()->ValidateFull()); + ASSERT_EQ(out.array()->length, 0); + } +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.cc b/cpp/src/arrow/compute/kernels/codegen_internal.cc index b321ff3fc8b33..ad43b7a3aa981 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.cc +++ b/cpp/src/arrow/compute/kernels/codegen_internal.cc @@ -48,6 +48,7 @@ std::vector> g_numeric_types; std::vector> g_base_binary_types; std::vector> g_temporal_types; std::vector> g_primitive_types; +std::vector g_decimal_type_ids; static std::once_flag codegen_static_initialized; template @@ -71,6 +72,9 @@ static void InitStaticData() { // Floating point types g_floating_types = {float32(), float64()}; + // Decimal types + g_decimal_type_ids = {Type::DECIMAL128, Type::DECIMAL256}; + // Numeric types Extend(g_int_types, &g_numeric_types); Extend(g_floating_types, &g_numeric_types); @@ -132,6 +136,11 @@ const std::vector>& FloatingPointTypes() { return g_floating_types; } +const std::vector& DecimalTypeIds() { + std::call_once(codegen_static_initialized, InitStaticData); + return g_decimal_type_ids; +} + const std::vector& AllTimeUnits() { static std::vector units = {TimeUnit::SECOND, TimeUnit::MILLI, TimeUnit::MICRO, TimeUnit::NANO}; diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index 11e03bba2873a..9e2ed82a4267a 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -188,6 +188,16 @@ struct GetViewType { } }; +template <> +struct GetViewType { + using T = Decimal256; + using PhysicalType = util::string_view; + + static T LogicalValue(PhysicalType value) { + return Decimal256(reinterpret_cast(value.data())); + } +}; + template struct GetOutputType; @@ -206,6 +216,11 @@ struct GetOutputType { using T = Decimal128; }; +template <> +struct GetOutputType { + using T = Decimal256; +}; + // ---------------------------------------------------------------------- // Iteration / value access utilities @@ -396,6 +411,7 @@ const std::vector>& SignedIntTypes(); const std::vector>& UnsignedIntTypes(); const std::vector>& IntTypes(); const std::vector>& FloatingPointTypes(); +const std::vector& DecimalTypeIds(); ARROW_EXPORT const std::vector& AllTimeUnits(); @@ -663,11 +679,14 @@ struct ScalarUnaryNotNullStateful { static void Exec(const ThisType& functor, KernelContext* ctx, const ArrayData& arg0, Datum* out) { ArrayData* out_arr = out->mutable_array(); - auto out_data = out_arr->GetMutableValues(1); + // Decimal128 data buffers are not safely reinterpret_cast-able on big-endian + using endian_agnostic = std::array; + auto out_data = out_arr->GetMutableValues(1); VisitArrayValuesInline( arg0, [&](Arg0Value v) { - *out_data++ = functor.op.template Call(ctx, v); + functor.op.template Call(ctx, v).ToBytes( + out_data++->data()); }, [&]() { ++out_data; }); } @@ -1182,6 +1201,22 @@ ArrayKernelExec GenerateTemporal(detail::GetTypeId get_id) { } } +// Generate a kernel given a templated functor for decimal types +// +// See "Numeric" above for description of the generator functor +template