Skip to content

Commit

Permalink
add filter impls for nested types
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Jun 13, 2019
1 parent 19f8109 commit 32726fa
Show file tree
Hide file tree
Showing 6 changed files with 643 additions and 168 deletions.
169 changes: 169 additions & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,175 @@ std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {

namespace internal {

// get the maximum buffer length required, then allocate a single zeroed buffer
// to use anywhere a buffer is required
class NullArrayFactory {
public:
struct GetBufferLength {
GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
: type_(*type), length_(length), buffer_length_(BitUtil::BytesForBits(length)) {}

operator int64_t() && {
DCHECK_OK(VisitTypeInline(type_, this));
return buffer_length_;
}

template <typename T, typename = decltype(TypeTraits<T>::bytes_required(0))>
Status Visit(const T&) {
return MaxOf(TypeTraits<T>::bytes_required(length_));
}

Status Visit(const ListType& type) {
// list's values array may be empty, but there must be at least one offset of 0
return MaxOf(sizeof(int32_t));
}

Status Visit(const FixedSizeListType& type) {
return MaxOf(GetBufferLength(type.value_type(), type.list_size() * length_));
}

Status Visit(const StructType& type) {
for (const auto& child : type.children()) {
DCHECK_OK(MaxOf(GetBufferLength(child->type(), length_)));
}
return Status::OK();
}

Status Visit(const UnionType& type) {
// type codes
DCHECK_OK(MaxOf(length_));
if (type.mode() == UnionMode::DENSE) {
// offsets
DCHECK_OK(MaxOf(sizeof(int32_t) * length_));
}
for (const auto& child : type.children()) {
DCHECK_OK(MaxOf(GetBufferLength(child->type(), length_)));
}
return Status::OK();
}

Status Visit(const DictionaryType& type) {
DCHECK_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
return MaxOf(GetBufferLength(type.index_type(), length_));
}

Status Visit(const ExtensionType& type) {
// XXX is an extension array's length always == storage length
return MaxOf(GetBufferLength(type.storage_type(), length_));
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction of all-null ", type);
}

private:
Status MaxOf(int64_t buffer_length) {
if (buffer_length > buffer_length_) {
buffer_length_ = buffer_length;
}
return Status::OK();
}

const DataType& type_;
int64_t length_, buffer_length_;
};

NullArrayFactory(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<ArrayData>* out)
: type_(type), length_(length), out_(out) {}

Status CreateBuffer() {
int64_t buffer_length = GetBufferLength(type_, length_);
RETURN_NOT_OK(AllocateBuffer(buffer_length, &buffer_));
std::memset(buffer_->mutable_data(), 0, buffer_->size());
return Status::OK();
}

Status Create() {
if (buffer_ == nullptr) {
RETURN_NOT_OK(CreateBuffer());
}
std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_children());
*out_ = ArrayData::Make(type_, length_, {buffer_}, child_data, length_, 0);
return VisitTypeInline(*type_, this);
}

Status Visit(const NullType&) { return Status::OK(); }

Status Visit(const FixedWidthType&) {
(*out_)->buffers.resize(2, buffer_);
return Status::OK();
}

Status Visit(const BinaryType&) {
(*out_)->buffers.resize(3, buffer_);
return Status::OK();
}

Status Visit(const ListType& type) {
(*out_)->buffers.resize(2, buffer_);
return CreateChild(0, length_, &(*out_)->child_data[0]);
}

Status Visit(const FixedSizeListType& type) {
return CreateChild(0, length_ * type.list_size(), &(*out_)->child_data[0]);
}

Status Visit(const StructType& type) {
for (int i = 0; i < type_->num_children(); ++i) {
DCHECK_OK(CreateChild(i, length_, &(*out_)->child_data[i]));
}
return Status::OK();
}

Status Visit(const UnionType& type) {
if (type.mode() == UnionMode::DENSE) {
(*out_)->buffers.resize(3, buffer_);
} else {
(*out_)->buffers.resize(2, buffer_);
}

for (int i = 0; i < type_->num_children(); ++i) {
DCHECK_OK(CreateChild(i, length_, &(*out_)->child_data[i]));
}
return Status::OK();
}

Status Visit(const DictionaryType& type) {
(*out_)->buffers.resize(2, buffer_);
std::shared_ptr<ArrayData> dictionary_data;
return MakeArrayOfNull(type.value_type(), 0, &(*out_)->dictionary);
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction of all-null ", type);
}

Status CreateChild(int i, int64_t length, std::shared_ptr<ArrayData>* out) {
NullArrayFactory child_factory(type_->child(i)->type(), length,
&(*out_)->child_data[i]);
child_factory.buffer_ = buffer_;
return child_factory.Create();
}

std::shared_ptr<DataType> type_;
int64_t length_;
std::shared_ptr<ArrayData>* out_;
std::shared_ptr<Buffer> buffer_;
};

} // namespace internal

Status MakeArrayOfNull(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<Array>* out) {
std::shared_ptr<ArrayData> out_data;
RETURN_NOT_OK(internal::NullArrayFactory(type, length, &out_data).Create());
*out = MakeArray(out_data);
return Status::OK();
}

namespace internal {

std::vector<ArrayVector> RechunkArraysConsistently(
const std::vector<ArrayVector>& groups) {
if (groups.size() <= 1) {
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ struct ARROW_EXPORT ArrayData {
ARROW_EXPORT
std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data);

/// \brief Create a strongly-typed Array instance with all elements null
/// \param[in] type the array type
/// \param[in] length the array length
/// \param[out] out resulting Array instance
ARROW_EXPORT
Status MakeArrayOfNull(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<Array>* out);

// ----------------------------------------------------------------------
// User array accessor types

Expand Down Expand Up @@ -513,12 +521,15 @@ class ARROW_EXPORT ListArray : public Array {
/// Return pointer to raw value offsets accounting for any slice offset
const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }

// Neither of these functions will perform boundschecking
// The following functions will not perform boundschecking
int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
int32_t value_length(int64_t i) const {
i += data_->offset;
return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
}
std::shared_ptr<Array> value_slice(int64_t i) const {
return values_->Slice(value_offset(i), value_length(i));
}

protected:
void SetData(const std::shared_ptr<ArrayData>& data);
Expand Down Expand Up @@ -550,12 +561,15 @@ class ARROW_EXPORT FixedSizeListArray : public Array {

std::shared_ptr<DataType> value_type() const;

// Neither of these functions will perform boundschecking
// The following functions will not perform boundschecking
int32_t value_offset(int64_t i) const {
i += data_->offset;
return static_cast<int32_t>(list_size_ * i);
}
int32_t value_length(int64_t i = 0) const { return list_size_; }
std::shared_ptr<Array> value_slice(int64_t i) const {
return values_->Slice(value_offset(i), value_length(i));
}

protected:
void SetData(const std::shared_ptr<ArrayData>& data);
Expand Down
Loading

0 comments on commit 32726fa

Please sign in to comment.