From deccd9d6403f27a6cacad5c92aa56dedd8ebb438 Mon Sep 17 00:00:00 2001 From: binwei Date: Thu, 30 Sep 2021 14:49:17 +0800 Subject: [PATCH 1/7] expose fragment.scan function --- python/pyarrow/_dataset.pyx | 33 ++++++++++++++++++++ python/pyarrow/includes/libarrow_dataset.pxd | 5 +++ 2 files changed, 38 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 619942840c472..655d46aad9760 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -921,6 +921,11 @@ cdef class Fragment(_Weakrefable): def _scanner(self, **kwargs): return Scanner.from_fragment(self, **kwargs) + def scan_(self, Schema schema=None, **kwargs): + cdef ScanOptions options=self._scanner(schema=schema, **kwargs).options() + for maybe_task in GetResultValue(self.fragment.Scan(options.unwrap())): + yield ScanTask.wrap(GetResultValue(move(maybe_task))) + def scan(self, Schema schema=None, **kwargs): """Builds a scan operation against the dataset. @@ -1128,6 +1133,32 @@ class RowGroupInfo: return False return self.id == other.id +cdef class ScanOptions(_Weakrefable): + """Scan options specific to scan operation.""" + + cdef: + shared_ptr[CScanOptions] wrapped + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef init(self, const shared_ptr[CScanOptions]& sp): + self.wrapped = sp + + @staticmethod + cdef wrap(const shared_ptr[CScanOptions]& sp): + cdef ScanOptions self = ScanOptions.__new__(ScanOptions) + self.init(sp) + return self + + cdef shared_ptr[CScanOptions] unwrap(self): + return self.wrapped + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False cdef class FragmentScanOptions(_Weakrefable): """Scan options specific to a particular fragment and scan operation.""" @@ -2917,6 +2948,8 @@ cdef class Scanner(_Weakrefable): result = self.scanner.Head(num_rows) return pyarrow_wrap_table(GetResultValue(result)) + def options(self): + return ScanOptions.wrap(self.scanner.options()) def _get_partition_keys(Expression partition_expression): """ diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 16f6c5c018392..ab835d4c46dcc 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -56,6 +56,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::Deserialize"(shared_ptr[CBuffer]) cdef cppclass CScanOptions "arrow::dataset::ScanOptions": + CExpression filter + CExpression projection + shared_ptr[CSchema] dataset_schema + int64_t batch_size + c_bool use_threads @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) From e7ca07d9bb69a66977e58501bcca958e782b0365 Mon Sep 17 00:00:00 2001 From: binwei Date: Tue, 11 Jan 2022 19:34:59 +0800 Subject: [PATCH 2/7] specify int32_t for buffer_builder --- cpp/src/arrow/buffer_builder.h | 87 ++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index f525ec23c58af..69721939010b4 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -416,4 +416,91 @@ class TypedBufferBuilder { int64_t false_count_ = 0; }; +/// \brief A BufferBuilder for building a buffer containing a int32_t for offset +template <> +class TypedBufferBuilder { + public: + explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool()) + : bytes_builder_(pool) {} + + explicit TypedBufferBuilder(BufferBuilder builder) + : bytes_builder_(std::move(builder)) {} + + BufferBuilder* bytes_builder() { return &bytes_builder_; } + + Status Append(int32_t value) { + ARROW_RETURN_NOT_OK(Reserve(1)); + UnsafeAppend(value); + return Status::OK(); + } + + Status Append(const int32_t* values, int64_t num_elements) { + ARROW_RETURN_NOT_OK(Reserve(num_elements)); + UnsafeAppend(values, num_elements); + return Status::OK(); + } + + Status Append(const int64_t num_copies, int32_t value) { + ARROW_RETURN_NOT_OK(Reserve(num_copies)); + UnsafeAppend(num_copies, value); + return Status::OK(); + } + + void UnsafeAppend(int32_t value) { + mutable_data()[length()] = value; + bytes_builder_.UnsafeAdvance(sizeof(uint32_t)); + } + + void UnsafeAppend(const int32_t* values, int64_t num_elements) { + for (int64_t i = 0; i < num_elements; i++) { + UnsafeAppend(values[i]); + } + } + + void UnsafeAppend(const int64_t num_copies, int32_t value) { + for (int64_t i = 0; i < num_copies; i++) { + UnsafeAppend(value); + } + } + + template + void UnsafeAppend(Iter values_begin, Iter values_end) { + int64_t num_elements = static_cast(std::distance(values_begin, values_end)); + auto data = mutable_data() + length(); + bytes_builder_.UnsafeAdvance(num_elements * sizeof(int32_t)); + std::copy(values_begin, values_end, data); + } + + Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) { + return bytes_builder_.Resize(new_capacity * sizeof(int32_t), shrink_to_fit); + } + + Status Reserve(const int64_t additional_elements) { + return bytes_builder_.Reserve(additional_elements * sizeof(int32_t)); + } + + Status Advance(const int64_t length) { + return bytes_builder_.Advance(length * sizeof(int32_t)); + } + + Status Finish(std::shared_ptr* out, bool shrink_to_fit = true) { + return bytes_builder_.Finish(out, shrink_to_fit); + } + + Result> Finish(bool shrink_to_fit = true) { + std::shared_ptr out; + ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit)); + return out; + } + + void Reset() { bytes_builder_.Reset(); } + + int64_t length() const { return bytes_builder_.length() / sizeof(int32_t); } + int64_t capacity() const { return bytes_builder_.capacity() / sizeof(int32_t); } + const int32_t* data() const { return reinterpret_cast(bytes_builder_.data()); } + int32_t* mutable_data() { return reinterpret_cast(bytes_builder_.mutable_data()); } + + private: + BufferBuilder bytes_builder_; +}; } // namespace arrow From ba59412b700a88eccbec5c5590c1c0a70b25a2ad Mon Sep 17 00:00:00 2001 From: binwei Date: Wed, 12 Jan 2022 14:32:02 +0800 Subject: [PATCH 3/7] 1. add prefetch to build buffer 2. reset null bitmap to all true after allocation, skip bit update if it's true --- cpp/src/arrow/buffer_builder.h | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index 69721939010b4..c2c1741ffc1be 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "arrow/buffer.h" #include "arrow/status.h" @@ -135,6 +136,7 @@ class ARROW_EXPORT BufferBuilder { void UnsafeAppend(const void* data, const int64_t length) { memcpy(data_ + size_, data, static_cast(length)); size_ += length; + _mm_prefetch(data_ + size_ + 64, _MM_HINT_T0); } void UnsafeAppend(const int64_t num_copies, uint8_t value) { @@ -315,9 +317,9 @@ class TypedBufferBuilder { } void UnsafeAppend(bool value) { - BitUtil::SetBitTo(mutable_data(), bit_length_, value); if (!value) { ++false_count_; + BitUtil::SetBitTo(mutable_data(), bit_length_, value); } ++bit_length_; } @@ -334,7 +336,8 @@ class TypedBufferBuilder { } void UnsafeAppend(const int64_t num_copies, bool value) { - BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); + if (!value) + BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); false_count_ += num_copies * !value; bit_length_ += num_copies; } @@ -364,9 +367,9 @@ class TypedBufferBuilder { // so ask it again before calling memset(). const int64_t new_byte_capacity = bytes_builder_.capacity(); if (new_byte_capacity > old_byte_capacity) { - // The additional buffer space is 0-initialized for convenience, - // so that other methods can simply bump the length. - memset(mutable_data() + old_byte_capacity, 0, + // The additional buffer space is initialized as true, + // so we can ignore the valid set + memset(mutable_data() + old_byte_capacity, 0xffffffff, static_cast(new_byte_capacity - old_byte_capacity)); } return Status::OK(); @@ -381,6 +384,7 @@ class TypedBufferBuilder { Status Advance(const int64_t length) { ARROW_RETURN_NOT_OK(Reserve(length)); bit_length_ += length; + BitUtil::SetBitsTo(mutable_data(), bit_length_, length, false); false_count_ += length; return Status::OK(); } @@ -448,7 +452,8 @@ class TypedBufferBuilder { void UnsafeAppend(int32_t value) { mutable_data()[length()] = value; - bytes_builder_.UnsafeAdvance(sizeof(uint32_t)); + _mm_prefetch(mutable_data()+length()+64/sizeof(int32_t), _MM_HINT_T0); + bytes_builder_.UnsafeAdvance(sizeof(int32_t)); } void UnsafeAppend(const int32_t* values, int64_t num_elements) { From 04f2b945ba69bf3b8e7e7321b5b5592b21ba5e50 Mon Sep 17 00:00:00 2001 From: binwei Date: Thu, 30 Sep 2021 14:49:17 +0800 Subject: [PATCH 4/7] expose fragment.scan function --- python/pyarrow/_dataset.pyx | 33 ++++++++++++++++++++ python/pyarrow/includes/libarrow_dataset.pxd | 5 +++ 2 files changed, 38 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 37b1d8520ee7b..d16fdbf8b9c4c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -940,6 +940,11 @@ cdef class Fragment(_Weakrefable): def _scanner(self, **kwargs): return Scanner.from_fragment(self, **kwargs) + def scan_(self, Schema schema=None, **kwargs): + cdef ScanOptions options=self._scanner(schema=schema, **kwargs).options() + for maybe_task in GetResultValue(self.fragment.Scan(options.unwrap())): + yield ScanTask.wrap(GetResultValue(move(maybe_task))) + def scan(self, Schema schema=None, **kwargs): """Builds a scan operation against the dataset. @@ -1147,6 +1152,32 @@ class RowGroupInfo: return False return self.id == other.id +cdef class ScanOptions(_Weakrefable): + """Scan options specific to scan operation.""" + + cdef: + shared_ptr[CScanOptions] wrapped + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef init(self, const shared_ptr[CScanOptions]& sp): + self.wrapped = sp + + @staticmethod + cdef wrap(const shared_ptr[CScanOptions]& sp): + cdef ScanOptions self = ScanOptions.__new__(ScanOptions) + self.init(sp) + return self + + cdef shared_ptr[CScanOptions] unwrap(self): + return self.wrapped + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False cdef class FragmentScanOptions(_Weakrefable): """Scan options specific to a particular fragment and scan operation.""" @@ -2933,6 +2964,8 @@ cdef class Scanner(_Weakrefable): result = self.scanner.Head(num_rows) return pyarrow_wrap_table(GetResultValue(result)) + def options(self): + return ScanOptions.wrap(self.scanner.options()) def _get_partition_keys(Expression partition_expression): """ diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 9ae7c2908bbaf..d1caee1ea31cf 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -56,6 +56,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::Deserialize"(shared_ptr[CBuffer]) cdef cppclass CScanOptions "arrow::dataset::ScanOptions": + CExpression filter + CExpression projection + shared_ptr[CSchema] dataset_schema + int64_t batch_size + c_bool use_threads @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) From 5e0d80dabf543e67070e43c6d998b9a9aa279ea0 Mon Sep 17 00:00:00 2001 From: binwei Date: Tue, 11 Jan 2022 19:34:59 +0800 Subject: [PATCH 5/7] specify int32_t for buffer_builder --- cpp/src/arrow/buffer_builder.h | 87 ++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index f525ec23c58af..69721939010b4 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -416,4 +416,91 @@ class TypedBufferBuilder { int64_t false_count_ = 0; }; +/// \brief A BufferBuilder for building a buffer containing a int32_t for offset +template <> +class TypedBufferBuilder { + public: + explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool()) + : bytes_builder_(pool) {} + + explicit TypedBufferBuilder(BufferBuilder builder) + : bytes_builder_(std::move(builder)) {} + + BufferBuilder* bytes_builder() { return &bytes_builder_; } + + Status Append(int32_t value) { + ARROW_RETURN_NOT_OK(Reserve(1)); + UnsafeAppend(value); + return Status::OK(); + } + + Status Append(const int32_t* values, int64_t num_elements) { + ARROW_RETURN_NOT_OK(Reserve(num_elements)); + UnsafeAppend(values, num_elements); + return Status::OK(); + } + + Status Append(const int64_t num_copies, int32_t value) { + ARROW_RETURN_NOT_OK(Reserve(num_copies)); + UnsafeAppend(num_copies, value); + return Status::OK(); + } + + void UnsafeAppend(int32_t value) { + mutable_data()[length()] = value; + bytes_builder_.UnsafeAdvance(sizeof(uint32_t)); + } + + void UnsafeAppend(const int32_t* values, int64_t num_elements) { + for (int64_t i = 0; i < num_elements; i++) { + UnsafeAppend(values[i]); + } + } + + void UnsafeAppend(const int64_t num_copies, int32_t value) { + for (int64_t i = 0; i < num_copies; i++) { + UnsafeAppend(value); + } + } + + template + void UnsafeAppend(Iter values_begin, Iter values_end) { + int64_t num_elements = static_cast(std::distance(values_begin, values_end)); + auto data = mutable_data() + length(); + bytes_builder_.UnsafeAdvance(num_elements * sizeof(int32_t)); + std::copy(values_begin, values_end, data); + } + + Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) { + return bytes_builder_.Resize(new_capacity * sizeof(int32_t), shrink_to_fit); + } + + Status Reserve(const int64_t additional_elements) { + return bytes_builder_.Reserve(additional_elements * sizeof(int32_t)); + } + + Status Advance(const int64_t length) { + return bytes_builder_.Advance(length * sizeof(int32_t)); + } + + Status Finish(std::shared_ptr* out, bool shrink_to_fit = true) { + return bytes_builder_.Finish(out, shrink_to_fit); + } + + Result> Finish(bool shrink_to_fit = true) { + std::shared_ptr out; + ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit)); + return out; + } + + void Reset() { bytes_builder_.Reset(); } + + int64_t length() const { return bytes_builder_.length() / sizeof(int32_t); } + int64_t capacity() const { return bytes_builder_.capacity() / sizeof(int32_t); } + const int32_t* data() const { return reinterpret_cast(bytes_builder_.data()); } + int32_t* mutable_data() { return reinterpret_cast(bytes_builder_.mutable_data()); } + + private: + BufferBuilder bytes_builder_; +}; } // namespace arrow From 898ce00ed49f7f53f470dd9148e0de16a6660474 Mon Sep 17 00:00:00 2001 From: binwei Date: Wed, 12 Jan 2022 14:32:02 +0800 Subject: [PATCH 6/7] 1. add prefetch to build buffer 2. reset null bitmap to all true after allocation, skip bit update if it's true --- cpp/src/arrow/buffer_builder.h | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index 69721939010b4..c2c1741ffc1be 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "arrow/buffer.h" #include "arrow/status.h" @@ -135,6 +136,7 @@ class ARROW_EXPORT BufferBuilder { void UnsafeAppend(const void* data, const int64_t length) { memcpy(data_ + size_, data, static_cast(length)); size_ += length; + _mm_prefetch(data_ + size_ + 64, _MM_HINT_T0); } void UnsafeAppend(const int64_t num_copies, uint8_t value) { @@ -315,9 +317,9 @@ class TypedBufferBuilder { } void UnsafeAppend(bool value) { - BitUtil::SetBitTo(mutable_data(), bit_length_, value); if (!value) { ++false_count_; + BitUtil::SetBitTo(mutable_data(), bit_length_, value); } ++bit_length_; } @@ -334,7 +336,8 @@ class TypedBufferBuilder { } void UnsafeAppend(const int64_t num_copies, bool value) { - BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); + if (!value) + BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); false_count_ += num_copies * !value; bit_length_ += num_copies; } @@ -364,9 +367,9 @@ class TypedBufferBuilder { // so ask it again before calling memset(). const int64_t new_byte_capacity = bytes_builder_.capacity(); if (new_byte_capacity > old_byte_capacity) { - // The additional buffer space is 0-initialized for convenience, - // so that other methods can simply bump the length. - memset(mutable_data() + old_byte_capacity, 0, + // The additional buffer space is initialized as true, + // so we can ignore the valid set + memset(mutable_data() + old_byte_capacity, 0xffffffff, static_cast(new_byte_capacity - old_byte_capacity)); } return Status::OK(); @@ -381,6 +384,7 @@ class TypedBufferBuilder { Status Advance(const int64_t length) { ARROW_RETURN_NOT_OK(Reserve(length)); bit_length_ += length; + BitUtil::SetBitsTo(mutable_data(), bit_length_, length, false); false_count_ += length; return Status::OK(); } @@ -448,7 +452,8 @@ class TypedBufferBuilder { void UnsafeAppend(int32_t value) { mutable_data()[length()] = value; - bytes_builder_.UnsafeAdvance(sizeof(uint32_t)); + _mm_prefetch(mutable_data()+length()+64/sizeof(int32_t), _MM_HINT_T0); + bytes_builder_.UnsafeAdvance(sizeof(int32_t)); } void UnsafeAppend(const int32_t* values, int64_t num_elements) { From 8df4092ff3d5d217d2b1e71be1e2b7fd04cb5b99 Mon Sep 17 00:00:00 2001 From: binwei Date: Wed, 12 Jan 2022 16:14:00 +0800 Subject: [PATCH 7/7] set int32_t buffer prefetch bytes to 1*4 --- cpp/src/arrow/buffer_builder.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index c2c1741ffc1be..cf5b16c994ec7 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -452,7 +452,7 @@ class TypedBufferBuilder { void UnsafeAppend(int32_t value) { mutable_data()[length()] = value; - _mm_prefetch(mutable_data()+length()+64/sizeof(int32_t), _MM_HINT_T0); + _mm_prefetch(mutable_data()+length() + 1, _MM_HINT_T0); bytes_builder_.UnsafeAdvance(sizeof(int32_t)); }