Skip to content

Commit

Permalink
[try_put_and_wait] Part 6: Add implementation for limiter_node + rese…
Browse files Browse the repository at this point in the history
…rvation support for buffers (#1425)

Co-authored-by: Aleksei Fedotov <[email protected]>
Co-authored-by: Mike Voss <[email protected]>
  • Loading branch information
3 people authored Aug 16, 2024
1 parent 5cd159a commit d67ee73
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 23 deletions.
4 changes: 3 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ class threshold_regulator<T, DecrementType,
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
Expand Down
22 changes: 20 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool successful_reserve = false;

do {
Expand All @@ -192,7 +193,14 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
successful_reserve = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
successful_reserve = pred->try_reserve( v, *metainfo );
} else
#endif
{
successful_reserve = pred->try_reserve( v );
}

if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
Expand All @@ -207,6 +215,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {

return successful_reserve;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
Expand Down
13 changes: 12 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class item_buffer {
}
#endif


// move an existing item from one slot to another. The moved-to slot must be unoccupied,
// the moved-from slot must exist and not be reserved. The after, from will be empty,
// to will be occupied but not reserved
Expand Down Expand Up @@ -401,6 +400,18 @@ class reservable_item_buffer : public item_buffer<T, A> {
return true;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool reserve_front(T& v, message_metainfo& metainfo) {
if (my_reserved || !my_item_valid(this->my_head)) return false;
my_reserved = true;
// reserving the head
v = this->front();
metainfo = this->front_metainfo();
this->reserve_item(this->my_head);
return true;
}
#endif

void consume_front() {
__TBB_ASSERT(my_reserved, "Attempt to consume a non-reserved item");
this->destroy_front();
Expand Down
76 changes: 61 additions & 15 deletions include/oneapi/tbb/flow_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,13 @@ class buffer_node

virtual void internal_reserve(buffer_operation *op) {
__TBB_ASSERT(op->elem, nullptr);
if(this->reserve_front(*(op->elem))) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool reserve_result = op->metainfo ? this->reserve_front(*(op->elem), *(op->metainfo))
: this->reserve_front(*(op->elem));
#else
bool reserve_result = this->reserve_front(*(op->elem));
#endif
if (reserve_result) {
op->status.store(SUCCEEDED, std::memory_order_release);
}
else {
Expand Down Expand Up @@ -1567,12 +1573,13 @@ class buffer_node
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
private:
// TODO: add real implementation
bool try_reserve(output_type& v, message_metainfo&) override {
return try_reserve(v);
bool try_reserve( output_type& v, message_metainfo& metainfo ) override {
buffer_operation op_data(res_item, metainfo);
op_data.elem = &v;
my_aggregator.execute(&op_data);
(void)enqueue_forwarding_task(op_data);
return op_data.status==SUCCEEDED;
}
public:
#endif

//! Release a reserved item.
Expand Down Expand Up @@ -1698,7 +1705,15 @@ class queue_node : public buffer_node<T> {
op->status.store(FAILED, std::memory_order_release);
}
else {
this->reserve_front(*(op->elem));
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (op->metainfo) {
this->reserve_front(*(op->elem), *(op->metainfo));
}
else
#endif
{
this->reserve_front(*(op->elem));
}
op->status.store(SUCCEEDED, std::memory_order_release);
}
}
Expand Down Expand Up @@ -1913,6 +1928,12 @@ class priority_queue_node : public buffer_node<T> {
}
this->my_reserved = true;
*(op->elem) = prio();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (op->metainfo) {
*(op->metainfo) = std::move(prio_metainfo());
reserved_metainfo = *(op->metainfo);
}
#endif
reserved_item = *(op->elem);
op->status.store(SUCCEEDED, std::memory_order_release);
prio_pop();
Expand All @@ -1922,13 +1943,27 @@ class priority_queue_node : public buffer_node<T> {
op->status.store(SUCCEEDED, std::memory_order_release);
this->my_reserved = false;
reserved_item = input_type();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
for (auto waiter : reserved_metainfo.waiters()) {
waiter->release(1);
}

reserved_metainfo = message_metainfo{};
#endif
}

void internal_release(prio_operation *op) override {
op->status.store(SUCCEEDED, std::memory_order_release);
prio_push(reserved_item);
prio_push(reserved_item __TBB_FLOW_GRAPH_METAINFO_ARG(reserved_metainfo));
this->my_reserved = false;
reserved_item = input_type();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
for (auto waiter : reserved_metainfo.waiters()) {
waiter->release(1);
}

reserved_metainfo = message_metainfo{};
#endif
}

private:
Expand Down Expand Up @@ -1959,6 +1994,9 @@ class priority_queue_node : public buffer_node<T> {
size_type mark;

input_type reserved_item;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo reserved_metainfo;
#endif

// in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
bool prio_use_tail() {
Expand Down Expand Up @@ -2137,9 +2175,12 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
//SUCCESS
// if we can reserve and can put, we consume the reservation
// we increment the count and decrement the tries
if ( (my_predecessors.try_reserve(v)) == true ) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo metainfo;
#endif
if ( (my_predecessors.try_reserve(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) == true ) {
reserved = true;
if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
if ( (rval = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) != nullptr ) {
{
spin_mutex::scoped_lock lock(my_mutex);
++my_count;
Expand Down Expand Up @@ -2268,8 +2309,10 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class broadcast_cache;
template<typename X, typename Y> friend class round_robin_cache;

private:
//! Puts an item to this receiver
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T &t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
{
spin_mutex::scoped_lock lock(my_mutex);
if ( my_count + my_tries >= my_threshold )
Expand All @@ -2278,7 +2321,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
++my_tries;
}

graph_task* rtask = my_successors.try_put_task(t);
graph_task* rtask = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( !rtask ) { // try_put_task failed.
spin_mutex::scoped_lock lock(my_mutex);
--my_tries;
Expand Down Expand Up @@ -2306,10 +2349,13 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
return rtask;
}

protected:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
graph_task* try_put_task(const T& t, const message_metainfo&) override {
return try_put_task(t);
graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
return try_put_task_impl(t, metainfo);
}
#endif

Expand Down
44 changes: 43 additions & 1 deletion test/tbb/test_buffer_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,49 @@ void test_buffer_node_try_put_and_wait() {
CHECK(check_index == processed_items.size());
}

// TODO: add try_reserve tests after implementing limiter_node
// Test reserve
{
int thresholds[] = { 1, 2 };

for (int threshold : thresholds) {
std::vector<int> processed_items;

// test_buffer_reserve tests the following graph
// buffer -> limiter -> function
// function is a rejecting serial function_node that puts an item to the decrementer port
// of the limiter inside of the body

std::size_t after_start = test_buffer_reserve<tbb::flow::buffer_node<int>>(threshold,
start_work_items, wait_message, new_work_items, processed_items);

// Expected effect:
// 1. start_work_items would be pushed to the buffer
// 2. wait_message_would be pushed to the buffer
// 3. forward task of the buffer would push wait_message to the limiter node.
// Since the limiter threshold is not reached, it would be directly passed to the function
// 4. function would spawn the task for wait_message processing
// 5. wait_message would be processed that would add new_work_items to the buffer
// 6. decrementer.try_put() would be called and the limiter node would
// process all of the items from the buffer using the try_reserve/try_consume/try_release semantics
// Since the reservation always accepts the front element of the buffer
// it is expected that the items would be taken from the buffer in FIFO order
// instead of LIFO on try_get for buffer_node

std::size_t check_index = 0;

CHECK_MESSAGE(after_start == 1, "try_put_and_wait should process only wait_message");
CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected wait_message processing");

for (auto item : start_work_items) {
CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing");
}

for (auto item : new_work_items) {
CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing");
}

}
}
}
#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT

Expand Down
50 changes: 50 additions & 0 deletions test/tbb/test_buffering_try_put_and_wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,56 @@ std::size_t test_buffer_pull(const std::vector<int>& start_work_items,
return after_try_put_and_wait_start_index;
}

template <typename BufferingNode, typename... Args>
std::size_t test_buffer_reserve(std::size_t limiter_threshold,
const std::vector<int>& start_work_items,
int wait_message,
const std::vector<int>& new_work_items,
std::vector<int>& processed_items,
Args... args)
{
tbb::task_arena arena(1);
std::size_t after_try_put_and_wait_start_index = 0;

arena.execute([&] {
tbb::flow::graph g;

BufferingNode buffer(g, args...);

tbb::flow::limiter_node<int, int> limiter(g, limiter_threshold);
tbb::flow::function_node<int, int, tbb::flow::rejecting> function(g, tbb::flow::serial,
[&](int input) {
if (input == wait_message) {
for (auto item : new_work_items) {
buffer.try_put(item);
}
}
// Explicitly put to the decrementer instead of making edge
// to guarantee that the next task would be spawned and not returned
// to the current thread as the next task
// Otherwise, all elements would be processed during the try_put_and_wait
limiter.decrementer().try_put(1);
processed_items.emplace_back(input);
return 0;
});

tbb::flow::make_edge(buffer, limiter);
tbb::flow::make_edge(limiter, function);

for (auto item : start_work_items) {
buffer.try_put(item);
}

buffer.try_put_and_wait(wait_message);

after_try_put_and_wait_start_index = processed_items.size();

g.wait_for_all();
});

return after_try_put_and_wait_start_index;
}

} // test_try_put_and_wait

#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
Expand Down
Loading

0 comments on commit d67ee73

Please sign in to comment.