Skip to content

Commit

Permalink
use expanded bitmap for FixedSizeList and List
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Jun 14, 2019
1 parent f1d27a5 commit b21736d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 48 deletions.
7 changes: 7 additions & 0 deletions cpp/src/arrow/array/builder_primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,11 @@ Status BooleanBuilder::AppendValues(const std::vector<bool>& values) {
return Status::OK();
}

Status BooleanBuilder::AppendValues(int64_t length, bool value) {
RETURN_NOT_OK(Reserve(length));
data_builder_.UnsafeAppend(length, value);
ArrayBuilder::UnsafeSetNotNull(length);
return Status::OK();
}

} // namespace arrow
2 changes: 2 additions & 0 deletions cpp/src/arrow/array/builder_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendValues(int64_t length, bool value);

Status FinishInternal(std::shared_ptr<ArrayData>* out) override;

/// \cond FALSE
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/kernels/filter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ TEST_F(TestFilterKernelWithList, FilterListInt32) {
std::string list_json = "[[], [1,2], null, [3]]";
this->AssertFilter(list(int32()), list_json, "[0, 0, 0, 0]", "[]");
this->AssertFilter(list(int32()), list_json, "[0, 1, 1, null]", "[[1,2], null, null]");
this->AssertFilter(list(int32()), list_json, "[0, 0, 1, null]", "[null, null]");
this->AssertFilter(list(int32()), list_json, "[1, 0, 0, 1]", "[[], [3]]");
this->AssertFilter(list(int32()), list_json, "[1, 1, 1, 1]", list_json);
this->AssertFilter(list(int32()), list_json, "[0, 1, 0, 1]", "[[1,2], [3]]");
}
Expand All @@ -318,6 +320,8 @@ TEST_F(TestFilterKernelWithFixedSizeList, FilterFixedSizeListInt32) {
this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 0, 0, 0]", "[]");
this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 1, 1, null]",
"[[1, null, 3], [4, 5, 6], null]");
this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 0, 1, null]",
"[[4, 5, 6], null]");
this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[1, 1, 1, 1]", list_json);
this->AssertFilter(fixed_size_list(int32(), 3), list_json, "[0, 1, 0, 1]",
"[[1, null, 3], [7, 8, null]]");
Expand Down
103 changes: 55 additions & 48 deletions cpp/src/arrow/compute/kernels/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <utility>

#include "arrow/array/concatenate.h"
#include "arrow/builder.h"
#include "arrow/compute/context.h"
#include "arrow/compute/kernels/filter.h"
Expand All @@ -46,7 +45,8 @@ class ARROW_EXPORT FilterKernel : public BinaryKernel {
std::shared_ptr<DataType> out_type() const override { return type_; }

virtual Status Filter(FunctionContext* ctx, const Array& values,
const BooleanArray& filter, std::shared_ptr<Array>* out) = 0;
const BooleanArray& filter, int64_t length,
std::shared_ptr<Array>* out) = 0;

static Status Make(const std::shared_ptr<DataType>& value_type,
std::unique_ptr<FilterKernel>* out);
Expand Down Expand Up @@ -86,13 +86,11 @@ static Status UnsafeAppend(StringBuilder* builder, util::string_view value) {
static int64_t OutputSize(const BooleanArray& filter) {
auto offset = filter.offset();
auto length = filter.length();
internal::BitmapReader filter_data(filter.values()->data(), offset, length);
int64_t size = 0;
for (auto i = offset; i < offset + length; ++i) {
if (filter.IsNull(i) || filter_data.IsSet()) {
if (filter.IsNull(i) || filter.Value(i)) {
++size;
}
filter_data.Next();
}
return size;
}
Expand All @@ -106,8 +104,8 @@ class FilterImpl<NullType> : public FilterKernel {
using FilterKernel::FilterKernel;

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
out->reset(new NullArray(OutputSize(checked_cast<const BooleanArray&>(filter))));
int64_t length, std::shared_ptr<Array>* out) override {
out->reset(new NullArray(length));
return Status::OK();
}
};
Expand All @@ -121,7 +119,7 @@ class FilterImpl : public FilterKernel {
using FilterKernel::FilterKernel;

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
int64_t length, std::shared_ptr<Array>* out) override {
std::unique_ptr<OutBuilder> builder;
RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), type_, &builder));
RETURN_NOT_OK(builder->Resize(OutputSize(filter)));
Expand Down Expand Up @@ -177,17 +175,16 @@ class FilterImpl<StructType> : public FilterKernel {
: FilterKernel(type), child_kernels_(std::move(child_kernels)) {}

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
int64_t length, std::shared_ptr<Array>* out) override {
const auto& struct_array = checked_cast<const StructArray&>(values);

auto length = OutputSize(filter);
TypedBufferBuilder<bool> null_bitmap_builder(ctx->memory_pool());
RETURN_NOT_OK(null_bitmap_builder.Resize(length));

ArrayVector fields(type_->num_children());
for (int i = 0; i < type_->num_children(); ++i) {
RETURN_NOT_OK(
child_kernels_[i]->Filter(ctx, *struct_array.field(i), filter, &fields[i]));
RETURN_NOT_OK(child_kernels_[i]->Filter(ctx, *struct_array.field(i), filter, length,
&fields[i]));
}

for (int64_t i = 0; i < filter.length(); ++i) {
Expand Down Expand Up @@ -223,43 +220,48 @@ class FilterImpl<FixedSizeListType> : public FilterKernel {
using FilterKernel::FilterKernel;

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
int64_t length, std::shared_ptr<Array>* out) override {
const auto& list_array = checked_cast<const FixedSizeListArray&>(values);

TypedBufferBuilder<bool> null_bitmap_builder(ctx->memory_pool());
auto length = OutputSize(filter);
RETURN_NOT_OK(null_bitmap_builder.Resize(length));

std::shared_ptr<Array> null_slice;
RETURN_NOT_OK(MakeArrayOfNull(list_array.value_type(),
list_array.list_type()->list_size(), &null_slice));
ArrayVector value_slices(length, null_slice);
BooleanBuilder value_filter_builder(ctx->memory_pool());
auto list_size = list_array.list_type()->list_size();
RETURN_NOT_OK(value_filter_builder.Resize(list_size * length));

for (int64_t filtered_i = 0, i = 0; i < filter.length(); ++i) {
for (int64_t i = 0; i < filter.length(); ++i) {
if (filter.IsNull(i)) {
null_bitmap_builder.UnsafeAppend(false);
++filtered_i;
for (int64_t j = 0; j < list_size; ++j) {
value_filter_builder.UnsafeAppendNull();
}
continue;
}
if (!filter.Value(i)) {
for (int64_t j = 0; j < list_size; ++j) {
value_filter_builder.UnsafeAppend(false);
}
continue;
}
if (values.IsNull(i)) {
null_bitmap_builder.UnsafeAppend(false);
++filtered_i;
for (int64_t j = 0; j < list_size; ++j) {
value_filter_builder.UnsafeAppendNull();
}
continue;
}
for (int64_t j = 0; j < list_size; ++j) {
value_filter_builder.UnsafeAppend(true);
}
null_bitmap_builder.UnsafeAppend(true);
value_slices[filtered_i] = list_array.value_slice(i);
++filtered_i;
}

std::shared_ptr<BooleanArray> value_filter;
RETURN_NOT_OK(value_filter_builder.Finish(&value_filter));
std::shared_ptr<Array> out_values;
if (length != 0) {
RETURN_NOT_OK(Concatenate(value_slices, ctx->memory_pool(), &out_values));
} else {
out_values = list_array.values()->Slice(0, 0);
}
RETURN_NOT_OK(
arrow::compute::Filter(ctx, *list_array.values(), *value_filter, &out_values));

auto null_count = null_bitmap_builder.false_count();
std::shared_ptr<Buffer> null_bitmap;
Expand All @@ -277,48 +279,50 @@ class FilterImpl<ListType> : public FilterKernel {
using FilterKernel::FilterKernel;

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
int64_t length, std::shared_ptr<Array>* out) override {
const auto& list_array = checked_cast<const ListArray&>(values);

TypedBufferBuilder<bool> null_bitmap_builder(ctx->memory_pool());
auto length = OutputSize(filter);
RETURN_NOT_OK(null_bitmap_builder.Resize(length));

BooleanBuilder value_filter_builder(ctx->memory_pool());

TypedBufferBuilder<int32_t> offset_builder(ctx->memory_pool());
RETURN_NOT_OK(offset_builder.Resize(length + 1));
int32_t offset = 0;
offset_builder.UnsafeAppend(offset);

ArrayVector value_slices(length, list_array.values()->Slice(0, 0));
for (int64_t filtered_i = 0, i = 0; i < filter.length(); ++i) {
for (int64_t i = 0; i < filter.length(); ++i) {
if (filter.IsNull(i)) {
null_bitmap_builder.UnsafeAppend(false);
offset_builder.UnsafeAppend(offset);
++filtered_i;
RETURN_NOT_OK(
value_filter_builder.AppendValues(list_array.value_length(i), false));
continue;
}
if (!filter.Value(i)) {
RETURN_NOT_OK(
value_filter_builder.AppendValues(list_array.value_length(i), false));
continue;
}
if (values.IsNull(i)) {
null_bitmap_builder.UnsafeAppend(false);
offset_builder.UnsafeAppend(offset);
++filtered_i;
RETURN_NOT_OK(
value_filter_builder.AppendValues(list_array.value_length(i), false));
continue;
}
null_bitmap_builder.UnsafeAppend(true);
value_slices[filtered_i] = list_array.value_slice(i);
++filtered_i;
offset += list_array.value_length(i);
offset_builder.UnsafeAppend(offset);
RETURN_NOT_OK(value_filter_builder.AppendValues(list_array.value_length(i), true));
}

std::shared_ptr<BooleanArray> value_filter;
RETURN_NOT_OK(value_filter_builder.Finish(&value_filter));
std::shared_ptr<Array> out_values;
if (length != 0) {
RETURN_NOT_OK(Concatenate(value_slices, ctx->memory_pool(), &out_values));
} else {
out_values = list_array.values()->Slice(0, 0);
}
RETURN_NOT_OK(
arrow::compute::Filter(ctx, *list_array.values(), *value_filter, &out_values));

auto null_count = null_bitmap_builder.false_count();
std::shared_ptr<Buffer> offsets, null_bitmap;
Expand All @@ -343,11 +347,12 @@ class FilterImpl<DictionaryType> : public FilterKernel {
: FilterKernel(type), impl_(std::move(impl)) {}

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
int64_t length, std::shared_ptr<Array>* out) override {
auto dict_array = checked_cast<const DictionaryArray*>(&values);
// To filter a dictionary, apply the current kernel to the dictionary's indices.
std::shared_ptr<Array> taken_indices;
RETURN_NOT_OK(impl_->Filter(ctx, *dict_array->indices(), filter, &taken_indices));
RETURN_NOT_OK(
impl_->Filter(ctx, *dict_array->indices(), filter, length, &taken_indices));
return DictionaryArray::FromArrays(values.type(), taken_indices,
dict_array->dictionary(), out);
}
Expand All @@ -363,11 +368,12 @@ class FilterImpl<ExtensionType> : public FilterKernel {
: FilterKernel(type), impl_(std::move(impl)) {}

Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter,
std::shared_ptr<Array>* out) override {
int64_t length, std::shared_ptr<Array>* out) override {
auto ext_array = checked_cast<const ExtensionArray*>(&values);
// To take from an extension array, apply the current kernel to storage.
std::shared_ptr<Array> taken_storage;
RETURN_NOT_OK(impl_->Filter(ctx, *ext_array->storage(), filter, &taken_storage));
RETURN_NOT_OK(
impl_->Filter(ctx, *ext_array->storage(), filter, length, &taken_storage));
*out = ext_array->extension_type()->MakeArray(taken_storage->data());
return Status::OK();
}
Expand Down Expand Up @@ -448,10 +454,11 @@ Status FilterKernel::Call(FunctionContext* ctx, const Datum& values, const Datum
if (!values.is_array() || !filter.is_array()) {
return Status::Invalid("FilterKernel expects array values and filter");
}
std::shared_ptr<Array> out_array;
auto filter_array = checked_pointer_cast<BooleanArray>(filter.make_array());
auto values_array = values.make_array();
RETURN_NOT_OK(this->Filter(ctx, *values_array, *filter_array, &out_array));
auto filter_array = checked_pointer_cast<BooleanArray>(filter.make_array());
const auto length = OutputSize(*filter_array);
std::shared_ptr<Array> out_array;
RETURN_NOT_OK(this->Filter(ctx, *values_array, *filter_array, length, &out_array));
*out = out_array;
return Status::OK();
}
Expand Down

0 comments on commit b21736d

Please sign in to comment.