diff --git a/cpp/src/arrow/array/builder_primitive.cc b/cpp/src/arrow/array/builder_primitive.cc index d4def92760027..3c899c068cb84 100644 --- a/cpp/src/arrow/array/builder_primitive.cc +++ b/cpp/src/arrow/array/builder_primitive.cc @@ -128,4 +128,11 @@ Status BooleanBuilder::AppendValues(const std::vector& values) { return Status::OK(); } +Status BooleanBuilder::AppendValues(int64_t length, bool value) { + RETURN_NOT_OK(Reserve(length)); + data_builder_.UnsafeAppend(length, value); + ArrayBuilder::UnsafeSetNotNull(length); + return Status::OK(); +} + } // namespace arrow diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h index 3d566846d1947..8abbe029e1341 100644 --- a/cpp/src/arrow/array/builder_primitive.h +++ b/cpp/src/arrow/array/builder_primitive.h @@ -409,6 +409,8 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { return Status::OK(); } + Status AppendValues(int64_t length, bool value); + Status FinishInternal(std::shared_ptr* out) override; /// \cond FALSE diff --git a/cpp/src/arrow/compute/kernels/filter-test.cc b/cpp/src/arrow/compute/kernels/filter-test.cc index f4ad016f10f9d..7b349492b1daa 100644 --- a/cpp/src/arrow/compute/kernels/filter-test.cc +++ b/cpp/src/arrow/compute/kernels/filter-test.cc @@ -307,6 +307,8 @@ TEST_F(TestFilterKernelWithList, FilterListInt32) { std::string list_json = "[[], [1,2], null, [3]]"; this->AssertFilter(list(int32()), list_json, "[0, 0, 0, 0]", "[]"); this->AssertFilter(list(int32()), list_json, "[0, 1, 1, null]", "[[1,2], null, null]"); + this->AssertFilter(list(int32()), list_json, "[0, 0, 1, null]", "[null, null]"); + this->AssertFilter(list(int32()), list_json, "[1, 0, 0, 1]", "[[], [3]]"); this->AssertFilter(list(int32()), list_json, "[1, 1, 1, 1]", list_json); this->AssertFilter(list(int32()), list_json, "[0, 1, 0, 1]", "[[1,2], [3]]"); } @@ -318,6 +320,8 @@ TEST_F(TestFilterKernelWithFixedSizeList, FilterFixedSizeListInt32) { this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 0, 0, 0]", "[]"); this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 1, 1, null]", "[[1, null, 3], [4, 5, 6], null]"); + this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 0, 1, null]", + "[[4, 5, 6], null]"); this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[1, 1, 1, 1]", list_json); this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 1, 0, 1]", "[[1, null, 3], [7, 8, null]]"); diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 6a4e346511677..bc590e727b29e 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -19,7 +19,6 @@ #include #include -#include "arrow/array/concatenate.h" #include "arrow/builder.h" #include "arrow/compute/context.h" #include "arrow/compute/kernels/filter.h" @@ -46,7 +45,8 @@ class ARROW_EXPORT FilterKernel : public BinaryKernel { std::shared_ptr out_type() const override { return type_; } virtual Status Filter(FunctionContext* ctx, const Array& values, - const BooleanArray& filter, std::shared_ptr* out) = 0; + const BooleanArray& filter, int64_t length, + std::shared_ptr* out) = 0; static Status Make(const std::shared_ptr& value_type, std::unique_ptr* out); @@ -86,13 +86,11 @@ static Status UnsafeAppend(StringBuilder* builder, util::string_view value) { static int64_t OutputSize(const BooleanArray& filter) { auto offset = filter.offset(); auto length = filter.length(); - internal::BitmapReader filter_data(filter.values()->data(), offset, length); int64_t size = 0; for (auto i = offset; i < offset + length; ++i) { - if (filter.IsNull(i) || filter_data.IsSet()) { + if (filter.IsNull(i) || filter.Value(i)) { ++size; } - filter_data.Next(); } return size; } @@ -106,8 +104,8 @@ class FilterImpl : public FilterKernel { using FilterKernel::FilterKernel; Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { - out->reset(new NullArray(OutputSize(checked_cast(filter)))); + int64_t length, std::shared_ptr* out) override { + out->reset(new NullArray(length)); return Status::OK(); } }; @@ -121,7 +119,7 @@ class FilterImpl : public FilterKernel { using FilterKernel::FilterKernel; Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { + int64_t length, std::shared_ptr* out) override { std::unique_ptr builder; RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), type_, &builder)); RETURN_NOT_OK(builder->Resize(OutputSize(filter))); @@ -177,17 +175,16 @@ class FilterImpl : public FilterKernel { : FilterKernel(type), child_kernels_(std::move(child_kernels)) {} Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { + int64_t length, std::shared_ptr* out) override { const auto& struct_array = checked_cast(values); - auto length = OutputSize(filter); TypedBufferBuilder null_bitmap_builder(ctx->memory_pool()); RETURN_NOT_OK(null_bitmap_builder.Resize(length)); ArrayVector fields(type_->num_children()); for (int i = 0; i < type_->num_children(); ++i) { - RETURN_NOT_OK( - child_kernels_[i]->Filter(ctx, *struct_array.field(i), filter, &fields[i])); + RETURN_NOT_OK(child_kernels_[i]->Filter(ctx, *struct_array.field(i), filter, length, + &fields[i])); } for (int64_t i = 0; i < filter.length(); ++i) { @@ -223,43 +220,48 @@ class FilterImpl : public FilterKernel { using FilterKernel::FilterKernel; Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { + int64_t length, std::shared_ptr* out) override { const auto& list_array = checked_cast(values); TypedBufferBuilder null_bitmap_builder(ctx->memory_pool()); - auto length = OutputSize(filter); RETURN_NOT_OK(null_bitmap_builder.Resize(length)); - std::shared_ptr null_slice; - RETURN_NOT_OK(MakeArrayOfNull(list_array.value_type(), - list_array.list_type()->list_size(), &null_slice)); - ArrayVector value_slices(length, null_slice); + BooleanBuilder value_filter_builder(ctx->memory_pool()); + auto list_size = list_array.list_type()->list_size(); + RETURN_NOT_OK(value_filter_builder.Resize(list_size * length)); - for (int64_t filtered_i = 0, i = 0; i < filter.length(); ++i) { + for (int64_t i = 0; i < filter.length(); ++i) { if (filter.IsNull(i)) { null_bitmap_builder.UnsafeAppend(false); - ++filtered_i; + for (int64_t j = 0; j < list_size; ++j) { + value_filter_builder.UnsafeAppendNull(); + } continue; } if (!filter.Value(i)) { + for (int64_t j = 0; j < list_size; ++j) { + value_filter_builder.UnsafeAppend(false); + } continue; } if (values.IsNull(i)) { null_bitmap_builder.UnsafeAppend(false); - ++filtered_i; + for (int64_t j = 0; j < list_size; ++j) { + value_filter_builder.UnsafeAppendNull(); + } continue; } + for (int64_t j = 0; j < list_size; ++j) { + value_filter_builder.UnsafeAppend(true); + } null_bitmap_builder.UnsafeAppend(true); - value_slices[filtered_i] = list_array.value_slice(i); - ++filtered_i; } + std::shared_ptr value_filter; + RETURN_NOT_OK(value_filter_builder.Finish(&value_filter)); std::shared_ptr out_values; - if (length != 0) { - RETURN_NOT_OK(Concatenate(value_slices, ctx->memory_pool(), &out_values)); - } else { - out_values = list_array.values()->Slice(0, 0); - } + RETURN_NOT_OK( + arrow::compute::Filter(ctx, *list_array.values(), *value_filter, &out_values)); auto null_count = null_bitmap_builder.false_count(); std::shared_ptr null_bitmap; @@ -277,48 +279,50 @@ class FilterImpl : public FilterKernel { using FilterKernel::FilterKernel; Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { + int64_t length, std::shared_ptr* out) override { const auto& list_array = checked_cast(values); TypedBufferBuilder null_bitmap_builder(ctx->memory_pool()); - auto length = OutputSize(filter); RETURN_NOT_OK(null_bitmap_builder.Resize(length)); + BooleanBuilder value_filter_builder(ctx->memory_pool()); + TypedBufferBuilder offset_builder(ctx->memory_pool()); RETURN_NOT_OK(offset_builder.Resize(length + 1)); int32_t offset = 0; offset_builder.UnsafeAppend(offset); - ArrayVector value_slices(length, list_array.values()->Slice(0, 0)); - for (int64_t filtered_i = 0, i = 0; i < filter.length(); ++i) { + for (int64_t i = 0; i < filter.length(); ++i) { if (filter.IsNull(i)) { null_bitmap_builder.UnsafeAppend(false); offset_builder.UnsafeAppend(offset); - ++filtered_i; + RETURN_NOT_OK( + value_filter_builder.AppendValues(list_array.value_length(i), false)); continue; } if (!filter.Value(i)) { + RETURN_NOT_OK( + value_filter_builder.AppendValues(list_array.value_length(i), false)); continue; } if (values.IsNull(i)) { null_bitmap_builder.UnsafeAppend(false); offset_builder.UnsafeAppend(offset); - ++filtered_i; + RETURN_NOT_OK( + value_filter_builder.AppendValues(list_array.value_length(i), false)); continue; } null_bitmap_builder.UnsafeAppend(true); - value_slices[filtered_i] = list_array.value_slice(i); - ++filtered_i; offset += list_array.value_length(i); offset_builder.UnsafeAppend(offset); + RETURN_NOT_OK(value_filter_builder.AppendValues(list_array.value_length(i), true)); } + std::shared_ptr value_filter; + RETURN_NOT_OK(value_filter_builder.Finish(&value_filter)); std::shared_ptr out_values; - if (length != 0) { - RETURN_NOT_OK(Concatenate(value_slices, ctx->memory_pool(), &out_values)); - } else { - out_values = list_array.values()->Slice(0, 0); - } + RETURN_NOT_OK( + arrow::compute::Filter(ctx, *list_array.values(), *value_filter, &out_values)); auto null_count = null_bitmap_builder.false_count(); std::shared_ptr offsets, null_bitmap; @@ -343,11 +347,12 @@ class FilterImpl : public FilterKernel { : FilterKernel(type), impl_(std::move(impl)) {} Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { + int64_t length, std::shared_ptr* out) override { auto dict_array = checked_cast(&values); // To filter a dictionary, apply the current kernel to the dictionary's indices. std::shared_ptr taken_indices; - RETURN_NOT_OK(impl_->Filter(ctx, *dict_array->indices(), filter, &taken_indices)); + RETURN_NOT_OK( + impl_->Filter(ctx, *dict_array->indices(), filter, length, &taken_indices)); return DictionaryArray::FromArrays(values.type(), taken_indices, dict_array->dictionary(), out); } @@ -363,11 +368,12 @@ class FilterImpl : public FilterKernel { : FilterKernel(type), impl_(std::move(impl)) {} Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - std::shared_ptr* out) override { + int64_t length, std::shared_ptr* out) override { auto ext_array = checked_cast(&values); // To take from an extension array, apply the current kernel to storage. std::shared_ptr taken_storage; - RETURN_NOT_OK(impl_->Filter(ctx, *ext_array->storage(), filter, &taken_storage)); + RETURN_NOT_OK( + impl_->Filter(ctx, *ext_array->storage(), filter, length, &taken_storage)); *out = ext_array->extension_type()->MakeArray(taken_storage->data()); return Status::OK(); } @@ -448,10 +454,11 @@ Status FilterKernel::Call(FunctionContext* ctx, const Datum& values, const Datum if (!values.is_array() || !filter.is_array()) { return Status::Invalid("FilterKernel expects array values and filter"); } - std::shared_ptr out_array; - auto filter_array = checked_pointer_cast(filter.make_array()); auto values_array = values.make_array(); - RETURN_NOT_OK(this->Filter(ctx, *values_array, *filter_array, &out_array)); + auto filter_array = checked_pointer_cast(filter.make_array()); + const auto length = OutputSize(*filter_array); + std::shared_ptr out_array; + RETURN_NOT_OK(this->Filter(ctx, *values_array, *filter_array, length, &out_array)); *out = out_array; return Status::OK(); }