diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index 153413b33cab5..b77d52a23eedb 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -49,9 +49,11 @@ set(ARROW_ACERO_SRCS project_node.cc query_context.cc sink_node.cc + sorted_merge_node.cc source_node.cc swiss_join.cc task_util.cc + time_series_util.cc tpch_node.cc union_node.cc util.cc) @@ -173,11 +175,13 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc test_nodes.cc) -# asof_join_node uses std::thread internally +# asof_join_node and sorted_merge_node use std::thread internally # and doesn't use ThreadPool so it will # be broken if threading is turned off if(ARROW_ENABLE_THREADING) add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc) + add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc + test_nodes.cc) endif() add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index d19d2db299cba..4a3b6b199c4c0 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -16,6 +16,8 @@ // under the License. #include "arrow/acero/asof_join_node.h" +#include "arrow/acero/backpressure_handler.h" +#include "arrow/acero/concurrent_queue_internal.h" #include #include @@ -30,6 +32,7 @@ #include "arrow/acero/exec_plan.h" #include "arrow/acero/options.h" +#include "arrow/acero/unmaterialized_table.h" #ifndef NDEBUG #include "arrow/acero/options_internal.h" #endif @@ -41,6 +44,7 @@ #ifndef NDEBUG #include "arrow/compute/function_internal.h" #endif +#include "arrow/acero/time_series_util.h" #include "arrow/compute/key_hash.h" #include "arrow/compute/light_array.h" #include "arrow/record_batch.h" @@ -122,92 +126,12 @@ struct TolType { typedef uint64_t row_index_t; typedef int col_index_t; -// normalize the value to 64-bits while preserving ordering of values -template ::value, bool> = true> -static inline uint64_t time_value(T t) { - uint64_t bias = std::is_signed::value ? (uint64_t)1 << (8 * sizeof(T) - 1) : 0; - return t < 0 ? static_cast(t + bias) : static_cast(t); -} - // indicates normalization of a key value template ::value, bool> = true> static inline uint64_t key_value(T t) { return static_cast(t); } -/** - * Simple implementation for an unbound concurrent queue - */ -template -class ConcurrentQueue { - public: - T Pop() { - std::unique_lock lock(mutex_); - cond_.wait(lock, [&] { return !queue_.empty(); }); - return PopUnlocked(); - } - - T PopUnlocked() { - auto item = queue_.front(); - queue_.pop(); - return item; - } - - void Push(const T& item) { - std::unique_lock lock(mutex_); - return PushUnlocked(item); - } - - void PushUnlocked(const T& item) { - queue_.push(item); - cond_.notify_one(); - } - - void Clear() { - std::unique_lock lock(mutex_); - ClearUnlocked(); - } - - void ClearUnlocked() { queue_ = std::queue(); } - - std::optional TryPop() { - std::unique_lock lock(mutex_); - return TryPopUnlocked(); - } - - std::optional TryPopUnlocked() { - // Try to pop the oldest value from the queue (or return nullopt if none) - if (queue_.empty()) { - return std::nullopt; - } else { - auto item = queue_.front(); - queue_.pop(); - return item; - } - } - - bool Empty() const { - std::unique_lock lock(mutex_); - return queue_.empty(); - } - - // Un-synchronized access to front - // For this to be "safe": - // 1) the caller logically guarantees that queue is not empty - // 2) pop/try_pop cannot be called concurrently with this - const T& UnsyncFront() const { return queue_.front(); } - - size_t UnsyncSize() const { return queue_.size(); } - - protected: - std::mutex& GetMutex() { return mutex_; } - - private: - std::queue queue_; - mutable std::mutex mutex_; - std::condition_variable cond_; -}; - class AsofJoinNode; #ifndef NDEBUG @@ -547,104 +471,6 @@ class BackpressureController : public BackpressureControl { std::atomic& backpressure_counter_; }; -class BackpressureHandler { - private: - BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold, - std::unique_ptr backpressure_control) - : input_(input), - low_threshold_(low_threshold), - high_threshold_(high_threshold), - backpressure_control_(std::move(backpressure_control)) {} - - public: - static Result Make( - ExecNode* input, size_t low_threshold, size_t high_threshold, - std::unique_ptr backpressure_control) { - if (low_threshold >= high_threshold) { - return Status::Invalid("low threshold (", low_threshold, - ") must be less than high threshold (", high_threshold, ")"); - } - if (backpressure_control == NULLPTR) { - return Status::Invalid("null backpressure control parameter"); - } - BackpressureHandler backpressure_handler(input, low_threshold, high_threshold, - std::move(backpressure_control)); - return std::move(backpressure_handler); - } - - void Handle(size_t start_level, size_t end_level) { - if (start_level < high_threshold_ && end_level >= high_threshold_) { - backpressure_control_->Pause(); - } else if (start_level > low_threshold_ && end_level <= low_threshold_) { - backpressure_control_->Resume(); - } - } - - Status ForceShutdown() { - // It may be unintuitive to call Resume() here, but this is to avoid a deadlock. - // Since acero's executor won't terminate if any one node is paused, we need to - // force resume the node before stopping production. - backpressure_control_->Resume(); - return input_->StopProducing(); - } - - private: - ExecNode* input_; - size_t low_threshold_; - size_t high_threshold_; - std::unique_ptr backpressure_control_; -}; - -template -class BackpressureConcurrentQueue : public ConcurrentQueue { - private: - struct DoHandle { - explicit DoHandle(BackpressureConcurrentQueue& queue) - : queue_(queue), start_size_(queue_.UnsyncSize()) {} - - ~DoHandle() { - size_t end_size = queue_.UnsyncSize(); - queue_.handler_.Handle(start_size_, end_size); - } - - BackpressureConcurrentQueue& queue_; - size_t start_size_; - }; - - public: - explicit BackpressureConcurrentQueue(BackpressureHandler handler) - : handler_(std::move(handler)) {} - - T Pop() { - std::unique_lock lock(ConcurrentQueue::GetMutex()); - DoHandle do_handle(*this); - return ConcurrentQueue::PopUnlocked(); - } - - void Push(const T& item) { - std::unique_lock lock(ConcurrentQueue::GetMutex()); - DoHandle do_handle(*this); - ConcurrentQueue::PushUnlocked(item); - } - - void Clear() { - std::unique_lock lock(ConcurrentQueue::GetMutex()); - DoHandle do_handle(*this); - ConcurrentQueue::ClearUnlocked(); - } - - std::optional TryPop() { - std::unique_lock lock(ConcurrentQueue::GetMutex()); - DoHandle do_handle(*this); - return ConcurrentQueue::TryPopUnlocked(); - } - - Status ForceShutdown() { return handler_.ForceShutdown(); } - - private: - BackpressureHandler handler_; -}; - class InputState { // InputState correponds to an input // Input record batches are queued up in InputState until processed and @@ -783,29 +609,8 @@ class InputState { } inline OnType GetLatestTime() const { - return GetTime(GetLatestBatch().get(), latest_ref_row_); - } - - inline ByType GetTime(const RecordBatch* batch, row_index_t row) const { - auto data = batch->column_data(time_col_index_); - switch (time_type_id_) { - LATEST_VAL_CASE(INT8, time_value) - LATEST_VAL_CASE(INT16, time_value) - LATEST_VAL_CASE(INT32, time_value) - LATEST_VAL_CASE(INT64, time_value) - LATEST_VAL_CASE(UINT8, time_value) - LATEST_VAL_CASE(UINT16, time_value) - LATEST_VAL_CASE(UINT32, time_value) - LATEST_VAL_CASE(UINT64, time_value) - LATEST_VAL_CASE(DATE32, time_value) - LATEST_VAL_CASE(DATE64, time_value) - LATEST_VAL_CASE(TIME32, time_value) - LATEST_VAL_CASE(TIME64, time_value) - LATEST_VAL_CASE(TIMESTAMP, time_value) - default: - DCHECK(false); - return 0; // cannot happen - } + return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_, + latest_ref_row_); } #undef LATEST_VAL_CASE @@ -832,7 +637,9 @@ class InputState { have_active_batch &= !queue_.TryPop(); if (have_active_batch) { DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed - memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed + memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), time_type_id_, + time_col_index_, + 0)); // time changed } } } @@ -988,35 +795,25 @@ class InputState { std::vector> src_to_dst_; }; +/// Wrapper around UnmaterializedCompositeTable that knows how to emplace +/// the join row-by-row template -struct CompositeReferenceRow { - struct Entry { - arrow::RecordBatch* batch; // can be NULL if there's no value - row_index_t row; - }; - Entry refs[MAX_TABLES]; -}; +class CompositeTableBuilder { + using SliceBuilder = UnmaterializedSliceBuilder; + using CompositeTable = UnmaterializedCompositeTable; -// A table of composite reference rows. Rows maintain pointers to the -// constituent record batches, but the overall table retains shared_ptr -// references to ensure memory remains resident while the table is live. -// -// The main reason for this is that, especially for wide tables, joins -// are effectively row-oriented, rather than column-oriented. Separating -// the join part from the columnar materialization part simplifies the -// logic around data types and increases efficiency. -// -// We don't put the shared_ptr's into the rows for efficiency reasons. -template -class CompositeReferenceTable { public: - NDEBUG_EXPLICIT CompositeReferenceTable(DEBUG_ADD(size_t n_tables, AsofJoinNode* node)) - : DEBUG_ADD(n_tables_(n_tables), node_(node)) { + NDEBUG_EXPLICIT CompositeTableBuilder( + const std::vector>& inputs, + const std::shared_ptr& schema, arrow::MemoryPool* pool, + DEBUG_ADD(size_t n_tables, AsofJoinNode* node)) + : unmaterialized_table(InitUnmaterializedTable(schema, inputs, pool)), + DEBUG_ADD(n_tables_(n_tables), node_(node)) { DCHECK_GE(n_tables_, 1); DCHECK_LE(n_tables_, MAX_TABLES); } - size_t n_rows() const { return rows_.size(); } + size_t n_rows() const { return unmaterialized_table.Size(); } // Adds the latest row from the input state as a new composite reference row // - LHS must have a valid key,timestep,and latest rows @@ -1037,14 +834,16 @@ class CompositeReferenceTable { // On the first row of the batch, we resize the destination. // The destination size is dictated by the size of the LHS batch. row_index_t new_batch_size = lhs_latest_batch->num_rows(); - row_index_t new_capacity = rows_.size() + new_batch_size; - if (rows_.capacity() < new_capacity) rows_.reserve(new_capacity); + row_index_t new_capacity = unmaterialized_table.Size() + new_batch_size; + if (unmaterialized_table.capacity() < new_capacity) { + unmaterialized_table.reserve(new_capacity); + } } - rows_.resize(rows_.size() + 1); - auto& row = rows_.back(); - row.refs[0].batch = lhs_latest_batch.get(); - row.refs[0].row = lhs_latest_row; - AddRecordBatchRef(lhs_latest_batch); + + SliceBuilder new_row{&unmaterialized_table}; + + // Each item represents a portion of the columns of the output table + new_row.AddEntry(lhs_latest_batch, lhs_latest_row, lhs_latest_row + 1); DEBUG_SYNC(node_, "Emplace: key=", key, " lhs_latest_row=", lhs_latest_row, " lhs_latest_time=", lhs_latest_time, DEBUG_MANIP(std::endl)); @@ -1068,100 +867,25 @@ class CompositeReferenceTable { if (tolerance.Accepts(lhs_latest_time, (*opt_entry)->time)) { // Have a valid entry const MemoStore::Entry* entry = *opt_entry; - row.refs[i].batch = entry->batch.get(); - row.refs[i].row = entry->row; - AddRecordBatchRef(entry->batch); + new_row.AddEntry(entry->batch, entry->row, entry->row + 1); continue; } } - row.refs[i].batch = NULL; - row.refs[i].row = 0; + new_row.AddEntry(nullptr, 0, 1); } + new_row.Finalize(); } // Materializes the current reference table into a target record batch - Result> Materialize( - MemoryPool* memory_pool, const std::shared_ptr& output_schema, - const std::vector>& state) { - DCHECK_EQ(state.size(), n_tables_); - - // Don't build empty batches - size_t n_rows = rows_.size(); - if (!n_rows) return NULLPTR; - - // Build the arrays column-by-column from the rows - std::vector> arrays(output_schema->num_fields()); - for (size_t i_table = 0; i_table < n_tables_; ++i_table) { - int n_src_cols = state.at(i_table)->get_schema()->num_fields(); - { - for (col_index_t i_src_col = 0; i_src_col < n_src_cols; ++i_src_col) { - std::optional i_dst_col_opt = - state[i_table]->MapSrcToDst(i_src_col); - if (!i_dst_col_opt) continue; - col_index_t i_dst_col = *i_dst_col_opt; - const auto& src_field = state[i_table]->get_schema()->field(i_src_col); - const auto& dst_field = output_schema->field(i_dst_col); - DCHECK(src_field->type()->Equals(dst_field->type())); - DCHECK_EQ(src_field->name(), dst_field->name()); - const auto& field_type = src_field->type(); - -#define ASOFJOIN_MATERIALIZE_CASE(id) \ - case Type::id: { \ - using T = typename TypeIdTraits::Type; \ - ARROW_ASSIGN_OR_RAISE( \ - arrays.at(i_dst_col), \ - MaterializeColumn(memory_pool, field_type, i_table, i_src_col)); \ - break; \ - } - - switch (field_type->id()) { - ASOFJOIN_MATERIALIZE_CASE(BOOL) - ASOFJOIN_MATERIALIZE_CASE(INT8) - ASOFJOIN_MATERIALIZE_CASE(INT16) - ASOFJOIN_MATERIALIZE_CASE(INT32) - ASOFJOIN_MATERIALIZE_CASE(INT64) - ASOFJOIN_MATERIALIZE_CASE(UINT8) - ASOFJOIN_MATERIALIZE_CASE(UINT16) - ASOFJOIN_MATERIALIZE_CASE(UINT32) - ASOFJOIN_MATERIALIZE_CASE(UINT64) - ASOFJOIN_MATERIALIZE_CASE(FLOAT) - ASOFJOIN_MATERIALIZE_CASE(DOUBLE) - ASOFJOIN_MATERIALIZE_CASE(DATE32) - ASOFJOIN_MATERIALIZE_CASE(DATE64) - ASOFJOIN_MATERIALIZE_CASE(TIME32) - ASOFJOIN_MATERIALIZE_CASE(TIME64) - ASOFJOIN_MATERIALIZE_CASE(TIMESTAMP) - ASOFJOIN_MATERIALIZE_CASE(STRING) - ASOFJOIN_MATERIALIZE_CASE(LARGE_STRING) - ASOFJOIN_MATERIALIZE_CASE(BINARY) - ASOFJOIN_MATERIALIZE_CASE(LARGE_BINARY) - default: - return Status::Invalid("Unsupported data type ", - src_field->type()->ToString(), " for field ", - src_field->name()); - } - -#undef ASOFJOIN_MATERIALIZE_CASE - } - } - } - - // Build the result - DCHECK_LE(n_rows, (uint64_t)std::numeric_limits::max()); - std::shared_ptr r = - arrow::RecordBatch::Make(output_schema, (int64_t)n_rows, arrays); - return r; + Result>> Materialize() { + return unmaterialized_table.Materialize(); } // Returns true if there are no rows - bool empty() const { return rows_.empty(); } + bool empty() const { return unmaterialized_table.Empty(); } private: - // Contains shared_ptr refs for all RecordBatches referred to by the contents of rows_ - std::unordered_map> _ptr2ref; - - // Row table references - std::vector> rows_; + CompositeTable unmaterialized_table; // Total number of tables in the composite table size_t n_tables_; @@ -1171,70 +895,20 @@ class CompositeReferenceTable { AsofJoinNode* node_; #endif - // Adds a RecordBatch ref to the mapping, if needed - void AddRecordBatchRef(const std::shared_ptr& ref) { - if (!_ptr2ref.count((uintptr_t)ref.get())) _ptr2ref[(uintptr_t)ref.get()] = ref; - } - - template ::BuilderType> - enable_if_boolean static BuilderAppend( - Builder& builder, const std::shared_ptr& source, row_index_t row) { - if (source->IsNull(row)) { - builder.UnsafeAppendNull(); - return Status::OK(); - } - builder.UnsafeAppend(bit_util::GetBit(source->template GetValues(1), row)); - return Status::OK(); - } - - template ::BuilderType> - enable_if_t::value && !is_boolean_type::value, - Status> static BuilderAppend(Builder& builder, - const std::shared_ptr& source, - row_index_t row) { - if (source->IsNull(row)) { - builder.UnsafeAppendNull(); - return Status::OK(); - } - using CType = typename TypeTraits::CType; - builder.UnsafeAppend(source->template GetValues(1)[row]); - return Status::OK(); - } - - template ::BuilderType> - enable_if_base_binary static BuilderAppend( - Builder& builder, const std::shared_ptr& source, row_index_t row) { - if (source->IsNull(row)) { - return builder.AppendNull(); - } - using offset_type = typename Type::offset_type; - const uint8_t* data = source->buffers[2]->data(); - const offset_type* offsets = source->GetValues(1); - const offset_type offset0 = offsets[row]; - const offset_type offset1 = offsets[row + 1]; - return builder.Append(data + offset0, offset1 - offset0); - } - - template ::BuilderType> - Result> MaterializeColumn(MemoryPool* memory_pool, - const std::shared_ptr& type, - size_t i_table, col_index_t i_col) { - ARROW_ASSIGN_OR_RAISE(auto a_builder, MakeBuilder(type, memory_pool)); - Builder& builder = *checked_cast(a_builder.get()); - ARROW_RETURN_NOT_OK(builder.Reserve(rows_.size())); - for (row_index_t i_row = 0; i_row < rows_.size(); ++i_row) { - const auto& ref = rows_[i_row].refs[i_table]; - if (ref.batch) { - Status st = - BuilderAppend(builder, ref.batch->column_data(i_col), ref.row); - ARROW_RETURN_NOT_OK(st); - } else { - builder.UnsafeAppendNull(); + static CompositeTable InitUnmaterializedTable( + const std::shared_ptr& schema, + const std::vector>& inputs, arrow::MemoryPool* pool) { + std::unordered_map> dst_to_src; + for (size_t i = 0; i < inputs.size(); i++) { + auto& input = inputs[i]; + for (int src = 0; src < input->get_schema()->num_fields(); src++) { + auto dst = input->MapSrcToDst(src); + if (dst.has_value()) { + dst_to_src[dst.value()] = std::make_pair(static_cast(i), src); + } } } - std::shared_ptr result; - ARROW_RETURN_NOT_OK(builder.Finish(&result)); - return result; + return CompositeTable{schema, inputs.size(), dst_to_src, pool}; } }; @@ -1279,7 +953,9 @@ class AsofJoinNode : public ExecNode { auto& lhs = *state_.at(0); // Construct new target table if needed - CompositeReferenceTable dst(DEBUG_ADD(state_.size(), this)); + CompositeTableBuilder dst(state_, output_schema_, + plan()->query_context()->memory_pool(), + DEBUG_ADD(state_.size(), this)); // Generate rows into the dst table until we either run out of data or hit the row // limit, or run out of input @@ -1318,8 +994,8 @@ class AsofJoinNode : public ExecNode { if (dst.empty()) { return NULLPTR; } else { - return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(), - state_); + ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize()); + return out.has_value() ? out.value() : NULLPTR; } } diff --git a/cpp/src/arrow/acero/backpressure_handler.h b/cpp/src/arrow/acero/backpressure_handler.h new file mode 100644 index 0000000000000..178272315d7fb --- /dev/null +++ b/cpp/src/arrow/acero/backpressure_handler.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" + +#include + +namespace arrow::acero { + +class BackpressureHandler { + private: + BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold, + std::unique_ptr backpressure_control) + : input_(input), + low_threshold_(low_threshold), + high_threshold_(high_threshold), + backpressure_control_(std::move(backpressure_control)) {} + + public: + static Result Make( + ExecNode* input, size_t low_threshold, size_t high_threshold, + std::unique_ptr backpressure_control) { + if (low_threshold >= high_threshold) { + return Status::Invalid("low threshold (", low_threshold, + ") must be less than high threshold (", high_threshold, ")"); + } + if (backpressure_control == NULLPTR) { + return Status::Invalid("null backpressure control parameter"); + } + BackpressureHandler backpressure_handler(input, low_threshold, high_threshold, + std::move(backpressure_control)); + return std::move(backpressure_handler); + } + + void Handle(size_t start_level, size_t end_level) { + if (start_level < high_threshold_ && end_level >= high_threshold_) { + backpressure_control_->Pause(); + } else if (start_level > low_threshold_ && end_level <= low_threshold_) { + backpressure_control_->Resume(); + } + } + + Status ForceShutdown() { + // It may be unintuitive to call Resume() here, but this is to avoid a deadlock. + // Since acero's executor won't terminate if any one node is paused, we need to + // force resume the node before stopping production. + backpressure_control_->Resume(); + return input_->StopProducing(); + } + + private: + ExecNode* input_; + size_t low_threshold_; + size_t high_threshold_; + std::unique_ptr backpressure_control_; +}; + +} // namespace arrow::acero diff --git a/cpp/src/arrow/acero/concurrent_queue_internal.h b/cpp/src/arrow/acero/concurrent_queue_internal.h new file mode 100644 index 0000000000000..f530394187299 --- /dev/null +++ b/cpp/src/arrow/acero/concurrent_queue_internal.h @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include "arrow/acero/backpressure_handler.h" + +namespace arrow::acero { + +/** + * Simple implementation for a thread safe blocking unbound multi-consumer / + * multi-producer concurrent queue + */ +template +class ConcurrentQueue { + public: + // Pops the last item from the queue. Must be called on a non-empty queue + // + T Pop() { + std::unique_lock lock(mutex_); + cond_.wait(lock, [&] { return !queue_.empty(); }); + return PopUnlocked(); + } + + // Pops the last item from the queue, or returns a nullopt if empty + // + std::optional TryPop() { + std::unique_lock lock(mutex_); + return TryPopUnlocked(); + } + + // Pushes an item to the queue + // + void Push(const T& item) { + std::unique_lock lock(mutex_); + return PushUnlocked(item); + } + + // Clears the queue + // + void Clear() { + std::unique_lock lock(mutex_); + ClearUnlocked(); + } + + bool Empty() const { + std::unique_lock lock(mutex_); + return queue_.empty(); + } + + // Un-synchronized access to front + // For this to be "safe": + // 1) the caller logically guarantees that queue is not empty + // 2) pop/try_pop cannot be called concurrently with this + const T& UnsyncFront() const { return queue_.front(); } + + size_t UnsyncSize() const { return queue_.size(); } + + protected: + std::mutex& GetMutex() { return mutex_; } + + T PopUnlocked() { + auto item = queue_.front(); + queue_.pop(); + return item; + } + + void PushUnlocked(const T& item) { + queue_.push(item); + cond_.notify_one(); + } + + void ClearUnlocked() { queue_ = std::queue(); } + + std::optional TryPopUnlocked() { + // Try to pop the oldest value from the queue (or return nullopt if none) + if (queue_.empty()) { + return std::nullopt; + } else { + auto item = queue_.front(); + queue_.pop(); + return item; + } + } + std::queue queue_; + + private: + mutable std::mutex mutex_; + std::condition_variable cond_; +}; + +template +class BackpressureConcurrentQueue : public ConcurrentQueue { + private: + struct DoHandle { + explicit DoHandle(BackpressureConcurrentQueue& queue) + : queue_(queue), start_size_(queue_.UnsyncSize()) {} + + ~DoHandle() { + // unsynced access is safe since DoHandle is internally only used when the + // lock is held + size_t end_size = queue_.UnsyncSize(); + queue_.handler_.Handle(start_size_, end_size); + } + + BackpressureConcurrentQueue& queue_; + size_t start_size_; + }; + + public: + explicit BackpressureConcurrentQueue(BackpressureHandler handler) + : handler_(std::move(handler)) {} + + T Pop() { + std::unique_lock lock(ConcurrentQueue::GetMutex()); + DoHandle do_handle(*this); + return ConcurrentQueue::PopUnlocked(); + } + + void Push(const T& item) { + std::unique_lock lock(ConcurrentQueue::GetMutex()); + DoHandle do_handle(*this); + ConcurrentQueue::PushUnlocked(item); + } + + void Clear() { + std::unique_lock lock(ConcurrentQueue::GetMutex()); + DoHandle do_handle(*this); + ConcurrentQueue::ClearUnlocked(); + } + + std::optional TryPop() { + std::unique_lock lock(ConcurrentQueue::GetMutex()); + DoHandle do_handle(*this); + return ConcurrentQueue::TryPopUnlocked(); + } + + Status ForceShutdown() { return handler_.ForceShutdown(); } + + private: + BackpressureHandler handler_; +}; + +} // namespace arrow::acero diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 541e5fed6206b..97119726d4b17 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -1114,6 +1114,7 @@ void RegisterAggregateNode(ExecFactoryRegistry*); void RegisterSinkNode(ExecFactoryRegistry*); void RegisterHashJoinNode(ExecFactoryRegistry*); void RegisterAsofJoinNode(ExecFactoryRegistry*); +void RegisterSortedMergeNode(ExecFactoryRegistry*); } // namespace internal @@ -1132,6 +1133,7 @@ ExecFactoryRegistry* default_exec_factory_registry() { internal::RegisterSinkNode(this); internal::RegisterHashJoinNode(this); internal::RegisterAsofJoinNode(this); + internal::RegisterSortedMergeNode(this); } Result GetFactory(const std::string& factory_name) override { diff --git a/cpp/src/arrow/acero/sorted_merge_node.cc b/cpp/src/arrow/acero/sorted_merge_node.cc new file mode 100644 index 0000000000000..f3b934eda186b --- /dev/null +++ b/cpp/src/arrow/acero/sorted_merge_node.cc @@ -0,0 +1,609 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include "arrow/acero/concurrent_queue_internal.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace { + +// Each slice is associated with a single input source, so we only need 1 record +// batch per slice +using SingleRecordBatchSliceBuilder = arrow::acero::UnmaterializedSliceBuilder<1>; +using SingleRecordBatchCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>; + +using row_index_t = uint64_t; +using time_unit_t = uint64_t; +using col_index_t = int; + +constexpr bool kNewTask = true; +constexpr bool kPoisonPill = false; + +class BackpressureController : public BackpressureControl { + public: + BackpressureController(ExecNode* node, ExecNode* output, + std::atomic& backpressure_counter) + : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} + + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } + + private: + ExecNode* node_; + ExecNode* output_; + std::atomic& backpressure_counter_; +}; + +/// InputState correponds to an input. Input record batches are queued up in InputState +/// until processed and turned into output record batches. +class InputState { + public: + InputState(size_t index, BackpressureHandler handler, + const std::shared_ptr& schema, const int time_col_index) + : index_(index), + queue_(std::move(handler)), + schema_(schema), + time_col_index_(time_col_index), + time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {} + + template + static arrow::Result Make(size_t index, arrow::acero::ExecNode* input, + arrow::acero::ExecNode* output, + std::atomic& backpressure_counter, + const std::shared_ptr& schema, + const col_index_t time_col_index) { + constexpr size_t low_threshold = 4, high_threshold = 8; + std::unique_ptr backpressure_control = + std::make_unique(input, output, backpressure_counter); + ARROW_ASSIGN_OR_RAISE(auto handler, + BackpressureHandler::Make(input, low_threshold, high_threshold, + std::move(backpressure_control))); + return PtrType(new InputState(index, std::move(handler), schema, time_col_index)); + } + + bool IsTimeColumn(col_index_t i) const { + DCHECK_LT(i, schema_->num_fields()); + return (i == time_col_index_); + } + + // Gets the latest row index, assuming the queue isn't empty + row_index_t GetLatestRow() const { return latest_ref_row_; } + + bool Empty() const { + // cannot be empty if ref row is >0 -- can avoid slow queue lock + // below + if (latest_ref_row_ > 0) { + return false; + } + return queue_.Empty(); + } + + size_t index() const { return index_; } + + int total_batches() const { return total_batches_; } + + // Gets latest batch (precondition: must not be empty) + const std::shared_ptr& GetLatestBatch() const { + return queue_.UnsyncFront(); + } + +#define LATEST_VAL_CASE(id, val) \ + case arrow::Type::id: { \ + using T = typename arrow::TypeIdTraits::Type; \ + using CType = typename arrow::TypeTraits::CType; \ + return val(data->GetValues(1)[row]); \ + } + + inline time_unit_t GetLatestTime() const { + return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_, + latest_ref_row_); + } + +#undef LATEST_VAL_CASE + + bool Finished() const { return batches_processed_ == total_batches_; } + + void Advance(SingleRecordBatchSliceBuilder& builder) { + // Advance the row until a new time is encountered or the record batch + // ends. This will return a range of {-1, -1} and a nullptr if there is + // no input + bool active = + (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty(); + + if (!active) { + return; + } + + row_index_t start = latest_ref_row_; + row_index_t end = latest_ref_row_; + time_unit_t startTime = GetLatestTime(); + std::shared_ptr batch = queue_.UnsyncFront(); + auto rows_in_batch = (row_index_t)batch->num_rows(); + + while (GetLatestTime() == startTime) { + end = ++latest_ref_row_; + if (latest_ref_row_ >= rows_in_batch) { + // hit the end of the batch, need to get the next batch if + // possible. + ++batches_processed_; + latest_ref_row_ = 0; + active &= !queue_.TryPop(); + if (active) { + DCHECK_GT(queue_.UnsyncFront()->num_rows(), + 0); // empty batches disallowed, sanity check + } + break; + } + } + builder.AddEntry(batch, start, end); + } + + arrow::Status Push(const std::shared_ptr& rb) { + if (rb->num_rows() > 0) { + queue_.Push(rb); + } else { + ++batches_processed_; // don't enqueue empty batches, just record + // as processed + } + return arrow::Status::OK(); + } + + const std::shared_ptr& get_schema() const { return schema_; } + + void set_total_batches(int n) { total_batches_ = n; } + + private: + size_t index_; + // Pending record batches. The latest is the front. Batches cannot be empty. + BackpressureConcurrentQueue> queue_; + // Schema associated with the input + std::shared_ptr schema_; + // Total number of batches (only int because InputFinished uses int) + std::atomic total_batches_{-1}; + // Number of batches processed so far (only int because InputFinished uses + // int) + std::atomic batches_processed_{0}; + // Index of the time col + col_index_t time_col_index_; + // Type id of the time column + arrow::Type::type time_type_id_; + // Index of the latest row reference within; if >0 then queue_ cannot be + // empty Must be < queue_.front()->num_rows() if queue_ is non-empty + row_index_t latest_ref_row_ = 0; + // Time of latest row + time_unit_t latest_time_ = std::numeric_limits::lowest(); +}; + +struct InputStateComparator { + bool operator()(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const { + // True if lhs is ahead of time of rhs + if (lhs->Finished()) { + return false; + } + if (rhs->Finished()) { + return false; + } + time_unit_t lFirst = lhs->GetLatestTime(); + time_unit_t rFirst = rhs->GetLatestTime(); + return lFirst > rFirst; + } +}; + +class SortedMergeNode : public ExecNode { + static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024; + + public: + SortedMergeNode(arrow::acero::ExecPlan* plan, + std::vector inputs, + std::shared_ptr output_schema, + arrow::Ordering new_ordering) + : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)), + ordering_(std::move(new_ordering)), + input_counter(inputs_.size()), + output_counter(inputs_.size()), + process_thread() { + SetLabel("sorted_merge"); + } + + ~SortedMergeNode() override { + process_queue.Push( + kPoisonPill); // poison pill + // We might create a temporary (such as to inspect the output + // schema), in which case there isn't anything to join + if (process_thread.joinable()) { + process_thread.join(); + } + } + + static arrow::Result Make( + arrow::acero::ExecPlan* plan, std::vector inputs, + const arrow::acero::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast(inputs.size()), + "SortedMergeNode")); + + if (inputs.size() < 1) { + return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs"); + } + + const auto schema = inputs.at(0)->output_schema(); + for (const auto& input : inputs) { + if (!input->output_schema()->Equals(schema)) { + return Status::Invalid( + "SortedMergeNode input schemas must all " + "match, first schema " + "was: ", + schema->ToString(), " got schema: ", input->output_schema()->ToString()); + } + } + + const auto& order_options = + arrow::internal::checked_cast(options); + + if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { + return Status::Invalid("`ordering` must be an explicit non-empty ordering"); + } + + std::shared_ptr output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode( + plan, std::move(inputs), std::move(output_schema), order_options.ordering); + } + + const char* kind_name() const override { return "SortedMergeNode"; } + + const arrow::Ordering& ordering() const override { return ordering_; } + + arrow::Status Init() override { + ARROW_CHECK(ordering_.sort_keys().size() == 1) << "Only one sort key supported"; + + auto inputs = this->inputs(); + for (size_t i = 0; i < inputs.size(); i++) { + ExecNode* input = inputs[i]; + const auto& schema = input->output_schema(); + + const auto& sort_key = ordering_.sort_keys()[0]; + if (sort_key.order != arrow::compute::SortOrder::Ascending) { + return Status::NotImplemented("Only ascending sort order is supported"); + } + + const FieldRef& ref = sort_key.target; + auto match_res = ref.FindOne(*schema); + if (!match_res.ok()) { + return Status::Invalid("Bad sort key : ", match_res.status().message()); + } + ARROW_ASSIGN_OR_RAISE(auto match, match_res); + ARROW_DCHECK(match.indices().size() == 1); + + ARROW_ASSIGN_OR_RAISE(auto input_state, + InputState::Make>( + i, input, this, backpressure_counter, schema, + std::move(match.indices()[0]))); + state.push_back(std::move(input_state)); + } + return Status::OK(); + } + + arrow::Status InputReceived(arrow::acero::ExecNode* input, + arrow::ExecBatch batch) override { + ARROW_DCHECK(std_has(inputs_, input)); + const size_t index = std_find(inputs_, input) - inputs_.begin(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr rb, + batch.ToRecordBatch(output_schema_)); + + // Push into the queue. Note that we don't need to lock since + // InputState's ConcurrentQueue manages locking + input_counter[index] += rb->num_rows(); + ARROW_RETURN_NOT_OK(state[index]->Push(rb)); + process_queue.Push(kNewTask); + return Status::OK(); + } + + arrow::Status InputFinished(arrow::acero::ExecNode* input, int total_batches) override { + ARROW_DCHECK(std_has(inputs_, input)); + { + std::lock_guard guard(gate); + ARROW_DCHECK(std_has(inputs_, input)); + size_t k = std_find(inputs_, input) - inputs_.begin(); + state.at(k)->set_total_batches(total_batches); + } + // Trigger a final process call for stragglers + process_queue.Push(kNewTask); + return Status::OK(); + } + + arrow::Status StartProducing() override { + ARROW_ASSIGN_OR_RAISE(process_task, plan_->query_context()->BeginExternalTask( + "SortedMergeNode::ProcessThread")); + if (!process_task.is_valid()) { + // Plan has already aborted. Do not start process thread + return Status::OK(); + } + process_thread = std::thread(&SortedMergeNode::StartPoller, this); + return Status::OK(); + } + + arrow::Status StopProducingImpl() override { + process_queue.Clear(); + process_queue.Push(kPoisonPill); + return Status::OK(); + } + + // handled by the backpressure controller + void PauseProducing(arrow::acero::ExecNode* output, int32_t counter) override {} + void ResumeProducing(arrow::acero::ExecNode* output, int32_t counter) override {} + + protected: + std::string ToStringExtra(int indent) const override { + std::stringstream ss; + ss << "ordering=" << ordering_.ToString(); + return ss.str(); + } + + private: + void EndFromProcessThread(arrow::Status st = arrow::Status::OK()) { + ARROW_CHECK(!cleanup_started); + for (size_t i = 0; i < input_counter.size(); ++i) { + ARROW_CHECK(input_counter[i] == output_counter[i]) + << input_counter[i] << " != " << output_counter[i]; + } + + ARROW_UNUSED( + plan_->query_context()->executor()->Spawn([this, st = std::move(st)]() mutable { + Defer cleanup([this, &st]() { process_task.MarkFinished(st); }); + if (st.ok()) { + st = output_->InputFinished(this, batches_produced); + } + })); + } + + bool CheckEnded() { + bool all_finished = true; + for (const auto& s : state) { + all_finished &= s->Finished(); + } + if (all_finished) { + EndFromProcessThread(); + return false; + } + return true; + } + + /// Streams the input states in sorted order until we run out of input + arrow::Result> getNextBatch() { + DCHECK(!state.empty()); + for (const auto& s : state) { + if (s->Empty() && !s->Finished()) { + return nullptr; // not enough data, wait + } + } + + std::vector> heap = state; + // filter out finished states + heap.erase(std::remove_if( + heap.begin(), heap.end(), + [](const std::shared_ptr& s) { return s->Finished(); }), + heap.end()); + + // If any are Empty(), then return early since we don't have enough data + if (std::any_of(heap.begin(), heap.end(), + [](const std::shared_ptr& s) { return s->Empty(); })) { + return nullptr; + } + + // Currently we only support one sort key + const auto sort_col = *ordering_.sort_keys().at(0).target.name(); + const auto comp = InputStateComparator(); + std::make_heap(heap.begin(), heap.end(), comp); + + // Each slice only has one record batch with the same schema as the output + std::unordered_map> output_col_to_src; + for (int i = 0; i < output_schema_->num_fields(); i++) { + output_col_to_src[i] = std::make_pair(0, i); + } + SingleRecordBatchCompositeTable output(output_schema(), 1, + std::move(output_col_to_src), + plan()->query_context()->memory_pool()); + + // Generate rows until we run out of data or we exceed the target output + // size + bool waiting_for_more_data = false; + while (!waiting_for_more_data && !heap.empty() && + output.Size() < kTargetOutputBatchSize) { + std::pop_heap(heap.begin(), heap.end(), comp); + + auto& next_item = heap.back(); + time_unit_t latest_time = std::numeric_limits::min(); + time_unit_t new_time = next_item->GetLatestTime(); + ARROW_CHECK(new_time >= latest_time) + << "Input state " << next_item->index() + << " has out of order data. newTime=" << new_time + << " latestTime=" << latest_time; + + latest_time = new_time; + SingleRecordBatchSliceBuilder builder{&output}; + next_item->Advance(builder); + + if (builder.Size() > 0) { + output_counter[next_item->index()] += builder.Size(); + builder.Finalize(); + } + if (next_item->Finished()) { + heap.pop_back(); + } else if (next_item->Empty()) { + // We've run out of data on one of the inputs + waiting_for_more_data = true; + continue; // skip the unnecessary make_heap + } + std::make_heap(heap.begin(), heap.end(), comp); + } + + // Emit the batch + if (output.Size() == 0) { + return nullptr; + } + + ARROW_ASSIGN_OR_RAISE(auto maybe_rb, output.Materialize()); + return maybe_rb.value_or(nullptr); + } + /// Gets a batch. Returns true if there is more data to process, false if we + /// are done or an error occurred + bool PollOnce() { + std::lock_guard guard(gate); + if (!CheckEnded()) { + return false; + } + + // Process batches while we have data + for (;;) { + Result> result = getNextBatch(); + + if (result.ok()) { + auto out_rb = *result; + if (!out_rb) { + break; + } + ExecBatch out_b(*out_rb); + out_b.index = batches_produced++; + Status st = output_->InputReceived(this, std::move(out_b)); + if (!st.ok()) { + ARROW_LOG(FATAL) << "Error in output_::InputReceived: " << st.ToString(); + EndFromProcessThread(std::move(st)); + } + } else { + EndFromProcessThread(result.status()); + return false; + } + } + + // Report to the output the total batch count, if we've already + // finished everything (there are two places where this can happen: + // here and InputFinished) + // + // It may happen here in cases where InputFinished was called before + // we were finished producing results (so we didn't know the output + // size at that time) + if (!CheckEnded()) { + return false; + } + + // There is no more we can do now but there is still work remaining + // for later when more data arrives. + return true; + } + + void EmitBatches() { + while (true) { + // Implementation note: If the queue is empty, we will block here + if (process_queue.Pop() == kPoisonPill) { + EndFromProcessThread(); + } + // Either we're out of data or something went wrong + if (!PollOnce()) { + return; + } + } + } + + /// The entry point for processThread + static void StartPoller(SortedMergeNode* node) { node->EmitBatches(); } + + arrow::Ordering ordering_; + + // Each input state corresponds to an input (e.g. a parquet data file) + std::vector> state; + std::vector input_counter; + std::vector output_counter; + std::mutex gate; + + std::atomic cleanup_started{false}; + + // Backpressure counter common to all input states + std::atomic backpressure_counter; + + std::atomic batches_produced{0}; + + // Queue to trigger processing of a given input. False acts as a poison pill + ConcurrentQueue process_queue; + // Once StartProducing is called, we initialize this thread to poll the + // input states and emit batches + std::thread process_thread; + arrow::Future<> process_task; + + // Map arg index --> completion counter + std::vector counter_; + // Map arg index --> data + std::vector accumulation_queue_; + std::mutex mutex_; + std::atomic total_batches_{0}; +}; + +} // namespace + +namespace internal { +void RegisterSortedMergeNode(ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("sorted_merge", SortedMergeNode::Make)); +} +} // namespace internal + +} // namespace arrow::acero diff --git a/cpp/src/arrow/acero/sorted_merge_node_test.cc b/cpp/src/arrow/acero/sorted_merge_node_test.cc new file mode 100644 index 0000000000000..55446d631d90c --- /dev/null +++ b/cpp/src/arrow/acero/sorted_merge_node_test.cc @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/map_node.h" +#include "arrow/acero/options.h" +#include "arrow/acero/test_nodes.h" +#include "arrow/array/builder_base.h" +#include "arrow/array/concatenate.h" +#include "arrow/compute/ordering.h" +#include "arrow/result.h" +#include "arrow/scalar.h" +#include "arrow/table.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" + +namespace arrow::acero { + +std::shared_ptr TestTable(int start, int step, int rows_per_batch, + int num_batches) { + return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)}, + {"str", gen::Random(utf8())}}) + ->FailOnError() + ->Table(rows_per_batch, num_batches); +} + +TEST(SortedMergeNode, Basic) { + auto table1 = TestTable( + /*start=*/0, + /*step=*/2, + /*rows_per_batch=*/2, + /*num_batches=*/3); + auto table2 = TestTable( + /*start=*/1, + /*step=*/2, + /*rows_per_batch=*/3, + /*num_batches=*/2); + auto table3 = TestTable( + /*start=*/3, + /*step=*/3, + /*rows_per_batch=*/6, + /*num_batches=*/1); + std::vector src_decls; + src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1))); + src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2))); + src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3))); + + auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")})); + + Declaration sorted_merge{"sorted_merge", src_decls, ops}; + // We can't use threads for sorted merging since it relies on + // ascending deterministic order of timestamps + ASSERT_OK_AND_ASSIGN(auto output, + DeclarationToTable(sorted_merge, /*use_threads=*/false)); + ASSERT_EQ(output->num_rows(), 18); + + ASSERT_OK_AND_ASSIGN(auto expected_ts_builder, + MakeBuilder(int32(), default_memory_pool())); + for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) { + ASSERT_OK(expected_ts_builder->AppendScalar(*MakeScalar(i))); + } + ASSERT_OK_AND_ASSIGN(auto expected_ts, expected_ts_builder->Finish()); + auto output_col = output->column(0); + ASSERT_OK_AND_ASSIGN(auto output_ts, Concatenate(output_col->chunks())); + + AssertArraysEqual(*expected_ts, *output_ts); +} + +} // namespace arrow::acero diff --git a/cpp/src/arrow/acero/time_series_util.cc b/cpp/src/arrow/acero/time_series_util.cc new file mode 100644 index 0000000000000..71133fef47306 --- /dev/null +++ b/cpp/src/arrow/acero/time_series_util.cc @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/data.h" + +#include "arrow/acero/time_series_util.h" +#include "arrow/util/logging.h" + +namespace arrow::acero { + +template ::value, bool>> +inline uint64_t NormalizeTime(T t) { + uint64_t bias = + std::is_signed::value ? static_cast(1) << (8 * sizeof(T) - 1) : 0; + return t < 0 ? static_cast(t + bias) : static_cast(t); +} + +uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row) { +#define LATEST_VAL_CASE(id, val) \ + case Type::id: { \ + using T = typename TypeIdTraits::Type; \ + using CType = typename TypeTraits::CType; \ + return val(data->GetValues(1)[row]); \ + } + + auto data = batch->column_data(col); + switch (time_type) { + LATEST_VAL_CASE(INT8, NormalizeTime) + LATEST_VAL_CASE(INT16, NormalizeTime) + LATEST_VAL_CASE(INT32, NormalizeTime) + LATEST_VAL_CASE(INT64, NormalizeTime) + LATEST_VAL_CASE(UINT8, NormalizeTime) + LATEST_VAL_CASE(UINT16, NormalizeTime) + LATEST_VAL_CASE(UINT32, NormalizeTime) + LATEST_VAL_CASE(UINT64, NormalizeTime) + LATEST_VAL_CASE(DATE32, NormalizeTime) + LATEST_VAL_CASE(DATE64, NormalizeTime) + LATEST_VAL_CASE(TIME32, NormalizeTime) + LATEST_VAL_CASE(TIME64, NormalizeTime) + LATEST_VAL_CASE(TIMESTAMP, NormalizeTime) + default: + DCHECK(false); + return 0; // cannot happen + } + +#undef LATEST_VAL_CASE +} + +} // namespace arrow::acero diff --git a/cpp/src/arrow/acero/time_series_util.h b/cpp/src/arrow/acero/time_series_util.h new file mode 100644 index 0000000000000..97707f43bf20b --- /dev/null +++ b/cpp/src/arrow/acero/time_series_util.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" + +namespace arrow::acero { + +// normalize the value to unsigned 64-bits while preserving ordering of values +template ::value, bool> = true> +uint64_t NormalizeTime(T t); + +uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row); + +} // namespace arrow::acero diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h new file mode 100644 index 0000000000000..05d6c866936e0 --- /dev/null +++ b/cpp/src/arrow/acero/unmaterialized_table.h @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/array/builder_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/memory_pool.h" +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" +#include "arrow/util/logging.h" + +namespace arrow::acero { + +/// Lightweight representation of a cell of an unmaterialized table. +/// +struct CompositeEntry { + RecordBatch* batch; + uint64_t start; + uint64_t end; +}; + +// Forward declare the builder +template +class UnmaterializedSliceBuilder; + +/// A table of composite reference rows. Rows maintain pointers to the +/// constituent record batches, but the overall table retains shared_ptr +/// references to ensure memory remains resident while the table is live. +/// +/// The main reason for this is that, especially for wide tables, some operations +/// such as sorted_merge or asof_join are effectively row-oriented, rather than +/// column-oriented. Separating the join part from the columnar materialization +/// part simplifies the logic around data types and increases efficiency. +/// +/// We don't put the shared_ptr's into the rows for efficiency reasons. Use +/// UnmaterializedSliceBuilder to add ranges of record batches to this table +template +class UnmaterializedCompositeTable { + public: + UnmaterializedCompositeTable( + const std::shared_ptr& output_schema, size_t num_composite_tables, + std::unordered_map> output_col_to_src_, + arrow::MemoryPool* pool_ = arrow::default_memory_pool()) + : schema(output_schema), + num_composite_tables(num_composite_tables), + output_col_to_src(std::move(output_col_to_src_)), + pool{pool_} {} + + // Shallow wrappers around std::vector for performance + inline size_t capacity() { return slices.capacity(); } + inline void reserve(size_t num_slices) { slices.reserve(num_slices); } + + inline size_t Size() const { return num_rows; } + inline size_t Empty() const { return num_rows == 0; } + + Result>> Materialize() { + // Don't build empty batches + if (Empty()) { + return std::nullopt; + } + DCHECK_LE(Size(), (uint64_t)std::numeric_limits::max()); + std::vector> arrays(schema->num_fields()); + +#define MATERIALIZE_CASE(id) \ + case arrow::Type::id: { \ + using T = typename arrow::TypeIdTraits::Type; \ + ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn(field_type, i_col)); \ + break; \ + } + + // Build the arrays column-by-column from the rows + for (int i_col = 0; i_col < schema->num_fields(); ++i_col) { + const std::shared_ptr& field = schema->field(i_col); + const auto& field_type = field->type(); + + switch (field_type->id()) { + MATERIALIZE_CASE(BOOL) + MATERIALIZE_CASE(INT8) + MATERIALIZE_CASE(INT16) + MATERIALIZE_CASE(INT32) + MATERIALIZE_CASE(INT64) + MATERIALIZE_CASE(UINT8) + MATERIALIZE_CASE(UINT16) + MATERIALIZE_CASE(UINT32) + MATERIALIZE_CASE(UINT64) + MATERIALIZE_CASE(FLOAT) + MATERIALIZE_CASE(DOUBLE) + MATERIALIZE_CASE(DATE32) + MATERIALIZE_CASE(DATE64) + MATERIALIZE_CASE(TIME32) + MATERIALIZE_CASE(TIME64) + MATERIALIZE_CASE(TIMESTAMP) + MATERIALIZE_CASE(STRING) + MATERIALIZE_CASE(LARGE_STRING) + MATERIALIZE_CASE(BINARY) + MATERIALIZE_CASE(LARGE_BINARY) + default: + return arrow::Status::Invalid("Unsupported data type ", + field->type()->ToString(), " for field ", + field->name()); + } + } + +#undef MATERIALIZE_CASE + + std::shared_ptr r = + arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays); + return r; + } + + private: + struct UnmaterializedSlice { + CompositeEntry components[MAX_COMPOSITE_TABLES]; + size_t num_components; + + inline int64_t Size() const { + if (num_components == 0) { + return 0; + } + return components[0].end - components[0].start; + } + }; + + // Mapping from an output column ID to a source table ID and column ID + std::shared_ptr schema; + size_t num_composite_tables; + std::unordered_map> output_col_to_src; + + arrow::MemoryPool* pool; + + /// A map from address of a record batch to the record batch. Used to + /// maintain the lifetime of the record batch in case it goes out of scope + /// by the main exec node thread + std::unordered_map> ptr2Ref = {}; + std::vector slices; + + size_t num_rows = 0; + + // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice + friend class UnmaterializedSliceBuilder; + + void AddRecordBatchRef(const std::shared_ptr& ref) { + ptr2Ref[(uintptr_t)ref.get()] = ref; + } + void AddSlice(const UnmaterializedSlice& slice) { + slices.push_back(slice); + num_rows += slice.Size(); + } + + template ::BuilderType> + enable_if_boolean static BuilderAppend( + Builder& builder, const std::shared_ptr& source, uint64_t row) { + if (source->IsNull(row)) { + builder.UnsafeAppendNull(); + return Status::OK(); + } + builder.UnsafeAppend(bit_util::GetBit(source->template GetValues(1), row)); + return Status::OK(); + } + + template ::BuilderType> + enable_if_t::value && !is_boolean_type::value, + Status> static BuilderAppend(Builder& builder, + const std::shared_ptr& source, + uint64_t row) { + if (source->IsNull(row)) { + builder.UnsafeAppendNull(); + return Status::OK(); + } + using CType = typename TypeTraits::CType; + builder.UnsafeAppend(source->template GetValues(1)[row]); + return Status::OK(); + } + + template ::BuilderType> + enable_if_base_binary static BuilderAppend( + Builder& builder, const std::shared_ptr& source, uint64_t row) { + if (source->IsNull(row)) { + return builder.AppendNull(); + } + using offset_type = typename Type::offset_type; + const uint8_t* data = source->buffers[2]->data(); + const offset_type* offsets = source->GetValues(1); + const offset_type offset0 = offsets[row]; + const offset_type offset1 = offsets[row + 1]; + return builder.Append(data + offset0, offset1 - offset0); + } + + template ::BuilderType> + arrow::Result> materializeColumn( + const std::shared_ptr& type, int i_col) { + ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool)); + Builder& builder = *arrow::internal::checked_cast(builderPtr.get()); + ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); + + const auto& [table_index, column_index] = output_col_to_src[i_col]; + + for (const auto& unmaterialized_slice : slices) { + const auto& [batch, start, end] = unmaterialized_slice.components[table_index]; + if (batch) { + for (uint64_t rowNum = start; rowNum < end; ++rowNum) { + arrow::Status st = BuilderAppend( + builder, batch->column_data(column_index), rowNum); + ARROW_RETURN_NOT_OK(st); + } + } else { + for (uint64_t rowNum = start; rowNum < end; ++rowNum) { + ARROW_RETURN_NOT_OK(builder.AppendNull()); + } + } + } + std::shared_ptr result; + ARROW_RETURN_NOT_OK(builder.Finish(&result)); + return Result{std::move(result)}; + } +}; + +/// A builder class that can append blocks of data to a row. A "slice" +/// is built by horizontally concatenating record batches. +template +class UnmaterializedSliceBuilder { + public: + explicit UnmaterializedSliceBuilder( + UnmaterializedCompositeTable* table_) + : table(table_) {} + + void AddEntry(std::shared_ptr rb, uint64_t start, uint64_t end) { + if (rb) { + table->AddRecordBatchRef(rb); + } + if (slice.num_components) { + size_t last_index = slice.num_components - 1; + DCHECK_EQ(slice.components[last_index].end - slice.components[last_index].start, + end - start) + << "Slices should be the same length. "; + } + slice.components[slice.num_components++] = CompositeEntry{rb.get(), start, end}; + } + + void Finalize() { table->AddSlice(slice); } + int64_t Size() { return slice.Size(); } + + private: + using TUnmaterializedCompositeTable = + UnmaterializedCompositeTable; + using TUnmaterializedSlice = + typename TUnmaterializedCompositeTable::UnmaterializedSlice; + + TUnmaterializedCompositeTable* table; + TUnmaterializedSlice slice{}; +}; + +} // namespace arrow::acero