diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index f525ec23c58af..cf5b16c994ec7 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "arrow/buffer.h" #include "arrow/status.h" @@ -135,6 +136,7 @@ class ARROW_EXPORT BufferBuilder { void UnsafeAppend(const void* data, const int64_t length) { memcpy(data_ + size_, data, static_cast(length)); size_ += length; + _mm_prefetch(data_ + size_ + 64, _MM_HINT_T0); } void UnsafeAppend(const int64_t num_copies, uint8_t value) { @@ -315,9 +317,9 @@ class TypedBufferBuilder { } void UnsafeAppend(bool value) { - BitUtil::SetBitTo(mutable_data(), bit_length_, value); if (!value) { ++false_count_; + BitUtil::SetBitTo(mutable_data(), bit_length_, value); } ++bit_length_; } @@ -334,7 +336,8 @@ class TypedBufferBuilder { } void UnsafeAppend(const int64_t num_copies, bool value) { - BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); + if (!value) + BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); false_count_ += num_copies * !value; bit_length_ += num_copies; } @@ -364,9 +367,9 @@ class TypedBufferBuilder { // so ask it again before calling memset(). const int64_t new_byte_capacity = bytes_builder_.capacity(); if (new_byte_capacity > old_byte_capacity) { - // The additional buffer space is 0-initialized for convenience, - // so that other methods can simply bump the length. - memset(mutable_data() + old_byte_capacity, 0, + // The additional buffer space is initialized as true, + // so we can ignore the valid set + memset(mutable_data() + old_byte_capacity, 0xffffffff, static_cast(new_byte_capacity - old_byte_capacity)); } return Status::OK(); @@ -381,6 +384,7 @@ class TypedBufferBuilder { Status Advance(const int64_t length) { ARROW_RETURN_NOT_OK(Reserve(length)); bit_length_ += length; + BitUtil::SetBitsTo(mutable_data(), bit_length_, length, false); false_count_ += length; return Status::OK(); } @@ -416,4 +420,92 @@ class TypedBufferBuilder { int64_t false_count_ = 0; }; +/// \brief A BufferBuilder for building a buffer containing a int32_t for offset +template <> +class TypedBufferBuilder { + public: + explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool()) + : bytes_builder_(pool) {} + + explicit TypedBufferBuilder(BufferBuilder builder) + : bytes_builder_(std::move(builder)) {} + + BufferBuilder* bytes_builder() { return &bytes_builder_; } + + Status Append(int32_t value) { + ARROW_RETURN_NOT_OK(Reserve(1)); + UnsafeAppend(value); + return Status::OK(); + } + + Status Append(const int32_t* values, int64_t num_elements) { + ARROW_RETURN_NOT_OK(Reserve(num_elements)); + UnsafeAppend(values, num_elements); + return Status::OK(); + } + + Status Append(const int64_t num_copies, int32_t value) { + ARROW_RETURN_NOT_OK(Reserve(num_copies)); + UnsafeAppend(num_copies, value); + return Status::OK(); + } + + void UnsafeAppend(int32_t value) { + mutable_data()[length()] = value; + _mm_prefetch(mutable_data()+length() + 1, _MM_HINT_T0); + bytes_builder_.UnsafeAdvance(sizeof(int32_t)); + } + + void UnsafeAppend(const int32_t* values, int64_t num_elements) { + for (int64_t i = 0; i < num_elements; i++) { + UnsafeAppend(values[i]); + } + } + + void UnsafeAppend(const int64_t num_copies, int32_t value) { + for (int64_t i = 0; i < num_copies; i++) { + UnsafeAppend(value); + } + } + + template + void UnsafeAppend(Iter values_begin, Iter values_end) { + int64_t num_elements = static_cast(std::distance(values_begin, values_end)); + auto data = mutable_data() + length(); + bytes_builder_.UnsafeAdvance(num_elements * sizeof(int32_t)); + std::copy(values_begin, values_end, data); + } + + Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) { + return bytes_builder_.Resize(new_capacity * sizeof(int32_t), shrink_to_fit); + } + + Status Reserve(const int64_t additional_elements) { + return bytes_builder_.Reserve(additional_elements * sizeof(int32_t)); + } + + Status Advance(const int64_t length) { + return bytes_builder_.Advance(length * sizeof(int32_t)); + } + + Status Finish(std::shared_ptr* out, bool shrink_to_fit = true) { + return bytes_builder_.Finish(out, shrink_to_fit); + } + + Result> Finish(bool shrink_to_fit = true) { + std::shared_ptr out; + ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit)); + return out; + } + + void Reset() { bytes_builder_.Reset(); } + + int64_t length() const { return bytes_builder_.length() / sizeof(int32_t); } + int64_t capacity() const { return bytes_builder_.capacity() / sizeof(int32_t); } + const int32_t* data() const { return reinterpret_cast(bytes_builder_.data()); } + int32_t* mutable_data() { return reinterpret_cast(bytes_builder_.mutable_data()); } + + private: + BufferBuilder bytes_builder_; +}; } // namespace arrow diff --git a/cpp/src/gandiva/projector_filter_exec.h b/cpp/src/gandiva/projector_filter_exec.h new file mode 100644 index 0000000000000..130509d9984af --- /dev/null +++ b/cpp/src/gandiva/projector_filter_exec.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/status.h" +#include "arrow/util/iterator.h" + +#include "gandiva/projector.h" +#include "gandiva/filter.h" + +#include "gandiva/arrow.h" +#include "gandiva/configuration.h" +#include "gandiva/expression.h" +#include "gandiva/selection_vector.h" +#include "gandiva/visibility.h" + +namespace gandiva { + +class GANDIVA_EXPORT Projector_Filter_Exec { + public: + Projector_Filter_Exec(std::shared_ptr proj, std::shared_ptr filter, + arrow::MemoryPool* pool,SelectionVector::Mode selection_mode=SelectionVector::MODE_UINT32); + + ~Projector_Filter_Exec(); + + Status Evaluate(const std::shared_ptr& in, std::shared_ptr& out); + + arrow::RecordBatchIterator Process(arrow::RecordBatchIterator in); + + private: + std::shared_ptr proj_; + std::shared_ptr filter_; + arrow::MemoryPool* pool_; + SelectionVector::Mode selection_mode_; + +}; + +} // namespace gandiva diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 37b1d8520ee7b..d16fdbf8b9c4c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -940,6 +940,11 @@ cdef class Fragment(_Weakrefable): def _scanner(self, **kwargs): return Scanner.from_fragment(self, **kwargs) + def scan_(self, Schema schema=None, **kwargs): + cdef ScanOptions options=self._scanner(schema=schema, **kwargs).options() + for maybe_task in GetResultValue(self.fragment.Scan(options.unwrap())): + yield ScanTask.wrap(GetResultValue(move(maybe_task))) + def scan(self, Schema schema=None, **kwargs): """Builds a scan operation against the dataset. @@ -1147,6 +1152,32 @@ class RowGroupInfo: return False return self.id == other.id +cdef class ScanOptions(_Weakrefable): + """Scan options specific to scan operation.""" + + cdef: + shared_ptr[CScanOptions] wrapped + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef init(self, const shared_ptr[CScanOptions]& sp): + self.wrapped = sp + + @staticmethod + cdef wrap(const shared_ptr[CScanOptions]& sp): + cdef ScanOptions self = ScanOptions.__new__(ScanOptions) + self.init(sp) + return self + + cdef shared_ptr[CScanOptions] unwrap(self): + return self.wrapped + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False cdef class FragmentScanOptions(_Weakrefable): """Scan options specific to a particular fragment and scan operation.""" @@ -2933,6 +2964,8 @@ cdef class Scanner(_Weakrefable): result = self.scanner.Head(num_rows) return pyarrow_wrap_table(GetResultValue(result)) + def options(self): + return ScanOptions.wrap(self.scanner.options()) def _get_partition_keys(Expression partition_expression): """ diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 9ae7c2908bbaf..d1caee1ea31cf 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -56,6 +56,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::Deserialize"(shared_ptr[CBuffer]) cdef cppclass CScanOptions "arrow::dataset::ScanOptions": + CExpression filter + CExpression projection + shared_ptr[CSchema] dataset_schema + int64_t batch_size + c_bool use_threads @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema)