diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index dc321148ef..cd4d81f94e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -372,7 +372,9 @@ class threshold_regulator { // Do not work with the passed pointer here as it may not be fully initialized yet } - bool try_reserve( output_type &v ) { +private: + bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) { bool successful_reserve = false; do { @@ -192,7 +193,14 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > { } // Try to get from this sender - successful_reserve = pred->try_reserve( v ); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (metainfo) { + successful_reserve = pred->try_reserve( v, *metainfo ); + } else +#endif + { + successful_reserve = pred->try_reserve( v ); + } if (successful_reserve == false) { typename mutex_type::scoped_lock lock(this->my_mutex); @@ -207,6 +215,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > { return successful_reserve; } +public: + bool try_reserve( output_type& v ) { + return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr)); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_reserve( output_type& v, message_metainfo& metainfo ) { + return try_reserve_impl(v, &metainfo); + } +#endif bool try_release() { reserved_src.load(std::memory_order_relaxed)->try_release(); diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 4ec5507c0f..616738672f 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -139,7 +139,6 @@ class item_buffer { } #endif - // move an existing item from one slot to another. The moved-to slot must be unoccupied, // the moved-from slot must exist and not be reserved. The after, from will be empty, // to will be occupied but not reserved @@ -401,6 +400,18 @@ class reservable_item_buffer : public item_buffer { return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool reserve_front(T& v, message_metainfo& metainfo) { + if (my_reserved || !my_item_valid(this->my_head)) return false; + my_reserved = true; + // reserving the head + v = this->front(); + metainfo = this->front_metainfo(); + this->reserve_item(this->my_head); + return true; + } +#endif + void consume_front() { __TBB_ASSERT(my_reserved, "Attempt to consume a non-reserved item"); this->destroy_front(); diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 0342ee213a..ea5730cd9e 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1465,7 +1465,13 @@ class buffer_node virtual void internal_reserve(buffer_operation *op) { __TBB_ASSERT(op->elem, nullptr); - if(this->reserve_front(*(op->elem))) { +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool reserve_result = op->metainfo ? this->reserve_front(*(op->elem), *(op->metainfo)) + : this->reserve_front(*(op->elem)); +#else + bool reserve_result = this->reserve_front(*(op->elem)); +#endif + if (reserve_result) { op->status.store(SUCCEEDED, std::memory_order_release); } else { @@ -1567,12 +1573,13 @@ class buffer_node } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -private: - // TODO: add real implementation - bool try_reserve(output_type& v, message_metainfo&) override { - return try_reserve(v); + bool try_reserve( output_type& v, message_metainfo& metainfo ) override { + buffer_operation op_data(res_item, metainfo); + op_data.elem = &v; + my_aggregator.execute(&op_data); + (void)enqueue_forwarding_task(op_data); + return op_data.status==SUCCEEDED; } -public: #endif //! Release a reserved item. @@ -1698,7 +1705,15 @@ class queue_node : public buffer_node { op->status.store(FAILED, std::memory_order_release); } else { - this->reserve_front(*(op->elem)); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (op->metainfo) { + this->reserve_front(*(op->elem), *(op->metainfo)); + } + else +#endif + { + this->reserve_front(*(op->elem)); + } op->status.store(SUCCEEDED, std::memory_order_release); } } @@ -1913,6 +1928,12 @@ class priority_queue_node : public buffer_node { } this->my_reserved = true; *(op->elem) = prio(); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (op->metainfo) { + *(op->metainfo) = std::move(prio_metainfo()); + reserved_metainfo = *(op->metainfo); + } +#endif reserved_item = *(op->elem); op->status.store(SUCCEEDED, std::memory_order_release); prio_pop(); @@ -1922,13 +1943,27 @@ class priority_queue_node : public buffer_node { op->status.store(SUCCEEDED, std::memory_order_release); this->my_reserved = false; reserved_item = input_type(); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + for (auto waiter : reserved_metainfo.waiters()) { + waiter->release(1); + } + + reserved_metainfo = message_metainfo{}; +#endif } void internal_release(prio_operation *op) override { op->status.store(SUCCEEDED, std::memory_order_release); - prio_push(reserved_item); + prio_push(reserved_item __TBB_FLOW_GRAPH_METAINFO_ARG(reserved_metainfo)); this->my_reserved = false; reserved_item = input_type(); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + for (auto waiter : reserved_metainfo.waiters()) { + waiter->release(1); + } + + reserved_metainfo = message_metainfo{}; +#endif } private: @@ -1959,6 +1994,9 @@ class priority_queue_node : public buffer_node { size_type mark; input_type reserved_item; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo reserved_metainfo; +#endif // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item bool prio_use_tail() { @@ -2137,9 +2175,12 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > //SUCCESS // if we can reserve and can put, we consume the reservation // we increment the count and decrement the tries - if ( (my_predecessors.try_reserve(v)) == true ) { +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo metainfo; +#endif + if ( (my_predecessors.try_reserve(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) == true ) { reserved = true; - if ( (rval = my_successors.try_put_task(v)) != nullptr ) { + if ( (rval = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) != nullptr ) { { spin_mutex::scoped_lock lock(my_mutex); ++my_count; @@ -2268,8 +2309,10 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > template< typename R, typename B > friend class run_and_put_task; template friend class broadcast_cache; template friend class round_robin_cache; + +private: //! Puts an item to this receiver - graph_task* try_put_task( const T &t ) override { + graph_task* try_put_task_impl( const T &t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) { { spin_mutex::scoped_lock lock(my_mutex); if ( my_count + my_tries >= my_threshold ) @@ -2278,7 +2321,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > ++my_tries; } - graph_task* rtask = my_successors.try_put_task(t); + graph_task* rtask = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); if ( !rtask ) { // try_put_task failed. spin_mutex::scoped_lock lock(my_mutex); --my_tries; @@ -2306,10 +2349,13 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > return rtask; } +protected: + graph_task* try_put_task(const T& t) override { + return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - // TODO: add support for limiter_node - graph_task* try_put_task(const T& t, const message_metainfo&) override { - return try_put_task(t); + graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override { + return try_put_task_impl(t, metainfo); } #endif diff --git a/test/tbb/test_buffer_node.cpp b/test/tbb/test_buffer_node.cpp index 54e6e88a79..527005aecb 100644 --- a/test/tbb/test_buffer_node.cpp +++ b/test/tbb/test_buffer_node.cpp @@ -564,7 +564,49 @@ void test_buffer_node_try_put_and_wait() { CHECK(check_index == processed_items.size()); } - // TODO: add try_reserve tests after implementing limiter_node + // Test reserve + { + int thresholds[] = { 1, 2 }; + + for (int threshold : thresholds) { + std::vector processed_items; + + // test_buffer_reserve tests the following graph + // buffer -> limiter -> function + // function is a rejecting serial function_node that puts an item to the decrementer port + // of the limiter inside of the body + + std::size_t after_start = test_buffer_reserve>(threshold, + start_work_items, wait_message, new_work_items, processed_items); + + // Expected effect: + // 1. start_work_items would be pushed to the buffer + // 2. wait_message_would be pushed to the buffer + // 3. forward task of the buffer would push wait_message to the limiter node. + // Since the limiter threshold is not reached, it would be directly passed to the function + // 4. function would spawn the task for wait_message processing + // 5. wait_message would be processed that would add new_work_items to the buffer + // 6. decrementer.try_put() would be called and the limiter node would + // process all of the items from the buffer using the try_reserve/try_consume/try_release semantics + // Since the reservation always accepts the front element of the buffer + // it is expected that the items would be taken from the buffer in FIFO order + // instead of LIFO on try_get for buffer_node + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == 1, "try_put_and_wait should process only wait_message"); + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected wait_message processing"); + + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing"); + } + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing"); + } + + } + } } #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/test/tbb/test_buffering_try_put_and_wait.h b/test/tbb/test_buffering_try_put_and_wait.h index d9e4578178..300521233f 100644 --- a/test/tbb/test_buffering_try_put_and_wait.h +++ b/test/tbb/test_buffering_try_put_and_wait.h @@ -133,6 +133,56 @@ std::size_t test_buffer_pull(const std::vector& start_work_items, return after_try_put_and_wait_start_index; } +template +std::size_t test_buffer_reserve(std::size_t limiter_threshold, + const std::vector& start_work_items, + int wait_message, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + tbb::task_arena arena(1); + std::size_t after_try_put_and_wait_start_index = 0; + + arena.execute([&] { + tbb::flow::graph g; + + BufferingNode buffer(g, args...); + + tbb::flow::limiter_node limiter(g, limiter_threshold); + tbb::flow::function_node function(g, tbb::flow::serial, + [&](int input) { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer.try_put(item); + } + } + // Explicitly put to the decrementer instead of making edge + // to guarantee that the next task would be spawned and not returned + // to the current thread as the next task + // Otherwise, all elements would be processed during the try_put_and_wait + limiter.decrementer().try_put(1); + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(buffer, limiter); + tbb::flow::make_edge(limiter, function); + + for (auto item : start_work_items) { + buffer.try_put(item); + } + + buffer.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + } // test_try_put_and_wait #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index 66611ce19e..0bf4912f8a 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -546,6 +546,67 @@ void test_decrement_while_try_put_task() { CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work"); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +void test_try_put_and_wait() { + tbb::task_arena arena(1); + + arena.execute([] { + tbb::flow::graph g; + + std::vector start_work_items; + std::vector processed_items; + std::vector new_work_items; + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + std::size_t threshold = start_work_items.size() + 1; + CHECK_MESSAGE(new_work_items.size() < threshold, "Incorrect test setup"); + + tbb::flow::limiter_node limiter(g, threshold); + tbb::flow::function_node function(g, tbb::flow::serial, + [&](int input) { + if (input == wait_message) { + for (auto item : new_work_items) { + limiter.try_put(item); + } + } + processed_items.emplace_back(input); + }); + + tbb::flow::make_edge(limiter, function); + tbb::flow::make_edge(function, limiter.decrementer()); + + for (auto item : start_work_items) { + limiter.try_put(item); + } + + limiter.try_put_and_wait(wait_message); + + // Since function is a serial queueing function_node, all start_work_items would be added to the queue + // and processed in FIFO order. wait_message would be added and processed last. Each item in start_work_items + // should put an item to a decrementer edge and hence new_work_items should not be missed as well + + std::size_t check_index = 0; + + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected wait_message processing"); + CHECK_MESSAGE(check_index == processed_items.size(), "Unexpected number of messages"); + + g.wait_for_all(); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected new_work_items processing"); + } + CHECK(check_index == processed_items.size()); + }); +} +#endif //! Test puts on limiter_node with decrements and varying parallelism levels //! \brief \ref error_guessing @@ -635,3 +696,10 @@ TEST_CASE("Test correct node deallocation while using small_object_pool") { tbb::task_scheduler_handle handle{ tbb::attach{} }; tbb::finalize( handle, std::nothrow ); } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test limiter_node try_put_and_wait") { + test_try_put_and_wait(); +} +#endif diff --git a/test/tbb/test_priority_queue_node.cpp b/test/tbb/test_priority_queue_node.cpp index 4d9ffe368f..18a60eb935 100644 --- a/test/tbb/test_priority_queue_node.cpp +++ b/test/tbb/test_priority_queue_node.cpp @@ -492,7 +492,50 @@ void test_pqueue_node_try_put_and_wait() { CHECK(check_index == processed_items.size()); } - // TODO: add try_reserve tests after implementing limiter_node + // Test reserve + { + int thresholds[] = { 1, 2 }; + + for (int threshold : thresholds) { + std::vector processed_items; + + // test_buffer_reserve tests the following graph + // buffer -> limiter -> function + // function is a rejecting serial function_node that puts an item to the decrementer port + // of the limiter inside of the body + + std::size_t after_start = test_buffer_reserve>(threshold, + start_work_items, wait_message, new_work_items, processed_items); + + // Expected effect: + // 1. start_work_items would be pushed to the buffer + // 2. wait_message_would be pushed to the buffer + // 3. forward task of the buffer would push the first message to the limiter node. + // Since the limiter threshold is not reached, it would be directly passed to the function + // 4. function would spawn the task for the first message processing + // 5. the first would be processed + // 6. decrementer.try_put() would be called and the limiter node would + // process all of the items from the buffer using the try_reserve/try_consume/try_release semantics + // in the priority (greatest first) order + // 7. When the wait_message would be taken from the queue, the try_put_and_wait would exit + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == start_work_items.size() + 1, + "try_put_and_wait should start_work_items and wait_message"); + for (std::size_t index = start_work_items.size(); index != 0; --index) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[index - 1], + "Unexpected start_work_items processing"); + } + + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected wait_message processing"); + + for (std::size_t index = new_work_items.size(); index != 0; --index) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[index - 1], + "Unexpected new_work_items processing"); + } + } + } } #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/test/tbb/test_queue_node.cpp b/test/tbb/test_queue_node.cpp index 893738745d..546b47edae 100644 --- a/test/tbb/test_queue_node.cpp +++ b/test/tbb/test_queue_node.cpp @@ -608,7 +608,46 @@ void test_queue_node_try_put_and_wait() { CHECK(check_index == processed_items.size()); } - // TODO: add try_reserve tests after implementing limiter_node + // Test reserve + { + int thresholds[] = { 1, 2 }; + + for (int threshold : thresholds) { + std::vector processed_items; + + // test_buffer_reserve tests the following graph + // buffer -> limiter -> function + // function is a rejecting serial function_node that puts an item to the decrementer port + // of the limiter inside of the body + + std::size_t after_start = test_buffer_reserve>(threshold, + start_work_items, wait_message, new_work_items, processed_items); + + // Expected effect: + // 1. start_work_items would be pushed to the buffer + // 2. wait_message_would be pushed to the buffer + // 3. forward task of the buffer would push the first message to the limiter node. + // Since the limiter threshold is not reached, it would be directly passed to the function + // 4. function would spawn the task for the first message processing + // 5. the first would be processed + // 6. decrementer.try_put() would be called and the limiter node would + // process all of the items from the buffer using the try_reserve/try_consume/try_release semantics + // 7. When the wait_message would be taken from the queue, the try_put_and_wait would exit + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == start_work_items.size() + 1, + "try_put_and_wait should start_work_items and wait_message"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected wait_message processing"); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing"); + } + } + } } #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/test/tbb/test_sequencer_node.cpp b/test/tbb/test_sequencer_node.cpp index 1714087d66..1e6494d69b 100644 --- a/test/tbb/test_sequencer_node.cpp +++ b/test/tbb/test_sequencer_node.cpp @@ -555,7 +555,49 @@ void test_seq_node_try_put_and_wait() { CHECK(check_index == processed_items.size()); } - // TODO: add try_reserve tests after implementing limiter_node + // Test reserve + { + int thresholds[] = { 1, 2 }; + + for (int threshold : thresholds) { + std::vector processed_items; + + // test_buffer_reserve tests the following graph + // buffer -> limiter -> function + // function is a rejecting serial function_node that puts an item to the decrementer port + // of the limiter inside of the body + + std::size_t after_start = test_buffer_reserve>(threshold, + start_work_items, wait_message, new_work_items, processed_items, simple_sequencer); + + // Expected effect: + // 1. start_work_items would be pushed to the buffer + // 2. wait_message_would be pushed to the buffer + // 3. forward task of the buffer would push the first message to the limiter node. + // Since the limiter threshold is not reached, it would be directly passed to the function + // 4. function would spawn the task for the first message processing + // 5. the first would be processed + // 6. decrementer.try_put() would be called and the limiter node would + // process all of the items from the buffer using the try_reserve/try_consume/try_release semantics + // 7. When the wait_message would be taken from the buffer, the try_put_and_wait would exit + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == start_work_items.size() + 1, + "start_work_items, occupier and wait_message should be processed by try_put_and_wait"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process start_work_items FIFO"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing by try_put_and_wait"); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process new_work_items FIFO"); + } + CHECK(check_index == processed_items.size()); + } + } } #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT