Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Part 6: Add implementation for limiter_node + reservation support for buffers #1425

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
19a86e2
Add implementation for function_node
kboyarinov Jun 11, 2024
dbe50f7
Fix non-CPF build
kboyarinov Jun 11, 2024
660646e
Add try_put_and_wait_production branch to the CI trigger list
kboyarinov Jun 11, 2024
531da5b
Fix test issues
kboyarinov Jun 13, 2024
f97c794
Fix CI issues
kboyarinov Jun 13, 2024
7afdd9d
+ more use of override keyword
kboyarinov Jun 13, 2024
da15916
vertexes->vertices, remove unnecessary iostream include
kboyarinov Jun 21, 2024
92bf552
Remove unnecessary variadics
kboyarinov Jun 25, 2024
2bb5948
Simplify cache impl
kboyarinov Jun 25, 2024
73eb513
Fix unused variable
kboyarinov Jun 25, 2024
f92503a
Add stub for continue_input
kboyarinov Jun 25, 2024
6d82b64
Fix whitespace
kboyarinov Jun 25, 2024
bc01522
Fix issues
kboyarinov Jun 25, 2024
6d6c145
Save progress
kboyarinov Jun 18, 2024
4e05f31
Add implementation for buffering nodes
kboyarinov Jun 20, 2024
842381c
Add to CI
kboyarinov Jun 20, 2024
2c011ed
Fix copyrights
kboyarinov Jun 21, 2024
67226f3
Add missed file
kboyarinov Jun 21, 2024
708d655
Add test definition for try_get with meta
kboyarinov Jun 21, 2024
ed5d05b
Fix metainfo namespace in test
kboyarinov Jun 21, 2024
811b9b9
join_node::try_get stub
kboyarinov Jun 21, 2024
703104a
Allign with base branch
kboyarinov Jun 25, 2024
b697c09
Add implementation for limiter_node + reservation support for buffers
kboyarinov Jun 26, 2024
a342444
Fix CI
kboyarinov Jun 27, 2024
f6f7673
Address review comments
kboyarinov Jul 8, 2024
e08d81f
Rearrange base task constructors
kboyarinov Jul 8, 2024
4029091
Introduce separate macro for try_put_and_wait feature
kboyarinov Jul 17, 2024
5d66123
Simplify code
kboyarinov Jul 17, 2024
2033045
Address review comments
kboyarinov Jul 22, 2024
967aa53
Add missed comment
kboyarinov Jul 22, 2024
c77e9c5
Update test/tbb/test_function_node.cpp
kboyarinov Jul 22, 2024
5aa2e34
Rename template helpers
kboyarinov Jul 22, 2024
0fd4b26
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
fc1239f
Fix incorrect merging
kboyarinov Jul 23, 2024
0321596
Fix previous review comments
kboyarinov Jul 23, 2024
34ea303
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
ba5d26e
Apply part1 review comments
kboyarinov Jul 23, 2024
3691466
Apply PR review comments
kboyarinov Jul 25, 2024
c7c43e1
Fix review comment
kboyarinov Jul 25, 2024
eeffec0
Rename boolean flags
kboyarinov Jul 26, 2024
0b2b098
Rename metainfo-friendly graph_task class
kboyarinov Jul 26, 2024
03c9391
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
ac27bb1
Add this_thread_in_graph_arena usage
kboyarinov Aug 1, 2024
a23dc02
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
a82c649
Fix merging
kboyarinov Aug 1, 2024
fbf5fbc
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
6787ae5
Remove unnecessary TODO
kboyarinov Aug 1, 2024
078f3ad
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
061e2e7
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
7106396
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 15, 2024
082ea7d
Update test/tbb/test_limiter_node.cpp
kboyarinov Aug 15, 2024
56989b7
Merge branch 'dev/kboyarinov/try_put_and_wait_limiter_node' of https:…
kboyarinov Aug 15, 2024
b5c674d
Fix incorrect merging
kboyarinov Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ class threshold_regulator<T, DecrementType,
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
vossmjp marked this conversation as resolved.
Show resolved Hide resolved
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
Expand Down
22 changes: 20 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool successful_reserve = false;

do {
Expand All @@ -192,7 +193,14 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
successful_reserve = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
successful_reserve = pred->try_reserve( v, *metainfo );
} else
#endif
{
successful_reserve = pred->try_reserve( v );
}

if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
Expand All @@ -207,6 +215,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {

return successful_reserve;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
Expand Down
13 changes: 12 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class item_buffer {
}
#endif


// move an existing item from one slot to another. The moved-to slot must be unoccupied,
// the moved-from slot must exist and not be reserved. The after, from will be empty,
// to will be occupied but not reserved
Expand Down Expand Up @@ -401,6 +400,18 @@ class reservable_item_buffer : public item_buffer<T, A> {
return true;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool reserve_front(T& v, message_metainfo& metainfo) {
if (my_reserved || !my_item_valid(this->my_head)) return false;
my_reserved = true;
// reserving the head
v = this->front();
metainfo = this->front_metainfo();
this->reserve_item(this->my_head);
return true;
}
#endif

void consume_front() {
__TBB_ASSERT(my_reserved, "Attempt to consume a non-reserved item");
this->destroy_front();
Expand Down
76 changes: 61 additions & 15 deletions include/oneapi/tbb/flow_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,13 @@ class buffer_node

virtual void internal_reserve(buffer_operation *op) {
__TBB_ASSERT(op->elem, nullptr);
if(this->reserve_front(*(op->elem))) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool reserve_result = op->metainfo ? this->reserve_front(*(op->elem), *(op->metainfo))
: this->reserve_front(*(op->elem));
#else
bool reserve_result = this->reserve_front(*(op->elem));
#endif
if (reserve_result) {
op->status.store(SUCCEEDED, std::memory_order_release);
}
else {
Expand Down Expand Up @@ -1567,12 +1573,13 @@ class buffer_node
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
private:
// TODO: add real implementation
bool try_reserve(output_type& v, message_metainfo&) override {
return try_reserve(v);
bool try_reserve( output_type& v, message_metainfo& metainfo ) override {
buffer_operation op_data(res_item, metainfo);
op_data.elem = &v;
my_aggregator.execute(&op_data);
(void)enqueue_forwarding_task(op_data);
return op_data.status==SUCCEEDED;
}
public:
#endif

//! Release a reserved item.
Expand Down Expand Up @@ -1698,7 +1705,15 @@ class queue_node : public buffer_node<T> {
op->status.store(FAILED, std::memory_order_release);
}
else {
this->reserve_front(*(op->elem));
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (op->metainfo) {
this->reserve_front(*(op->elem), *(op->metainfo));
}
else
#endif
{
this->reserve_front(*(op->elem));
}
op->status.store(SUCCEEDED, std::memory_order_release);
}
}
Expand Down Expand Up @@ -1913,6 +1928,12 @@ class priority_queue_node : public buffer_node<T> {
}
this->my_reserved = true;
*(op->elem) = prio();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (op->metainfo) {
*(op->metainfo) = std::move(prio_metainfo());
reserved_metainfo = *(op->metainfo);
}
#endif
reserved_item = *(op->elem);
op->status.store(SUCCEEDED, std::memory_order_release);
prio_pop();
Expand All @@ -1922,13 +1943,27 @@ class priority_queue_node : public buffer_node<T> {
op->status.store(SUCCEEDED, std::memory_order_release);
this->my_reserved = false;
reserved_item = input_type();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
for (auto waiter : reserved_metainfo.waiters()) {
waiter->release(1);
}

reserved_metainfo = message_metainfo{};
#endif
}

void internal_release(prio_operation *op) override {
op->status.store(SUCCEEDED, std::memory_order_release);
prio_push(reserved_item);
prio_push(reserved_item __TBB_FLOW_GRAPH_METAINFO_ARG(reserved_metainfo));
this->my_reserved = false;
reserved_item = input_type();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
for (auto waiter : reserved_metainfo.waiters()) {
waiter->release(1);
}

reserved_metainfo = message_metainfo{};
#endif
}

private:
Expand Down Expand Up @@ -1959,6 +1994,9 @@ class priority_queue_node : public buffer_node<T> {
size_type mark;

input_type reserved_item;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo reserved_metainfo;
#endif

// in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
bool prio_use_tail() {
Expand Down Expand Up @@ -2137,9 +2175,12 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
//SUCCESS
// if we can reserve and can put, we consume the reservation
// we increment the count and decrement the tries
if ( (my_predecessors.try_reserve(v)) == true ) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo metainfo;
#endif
if ( (my_predecessors.try_reserve(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) == true ) {
reserved = true;
if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
if ( (rval = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) != nullptr ) {
{
spin_mutex::scoped_lock lock(my_mutex);
++my_count;
Expand Down Expand Up @@ -2268,8 +2309,10 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class broadcast_cache;
template<typename X, typename Y> friend class round_robin_cache;

private:
//! Puts an item to this receiver
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T &t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
{
spin_mutex::scoped_lock lock(my_mutex);
if ( my_count + my_tries >= my_threshold )
Expand All @@ -2278,7 +2321,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
++my_tries;
}

graph_task* rtask = my_successors.try_put_task(t);
graph_task* rtask = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( !rtask ) { // try_put_task failed.
spin_mutex::scoped_lock lock(my_mutex);
--my_tries;
Expand Down Expand Up @@ -2306,10 +2349,13 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T >
return rtask;
}

protected:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
graph_task* try_put_task(const T& t, const message_metainfo&) override {
return try_put_task(t);
graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
return try_put_task_impl(t, metainfo);
}
#endif

Expand Down
44 changes: 43 additions & 1 deletion test/tbb/test_buffer_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,49 @@ void test_buffer_node_try_put_and_wait() {
CHECK(check_index == processed_items.size());
}

// TODO: add try_reserve tests after implementing limiter_node
// Test reserve
{
int thresholds[] = { 1, 2 };

for (int threshold : thresholds) {
std::vector<int> processed_items;

// test_buffer_reserve tests the following graph
// buffer -> limiter -> function
// function is a rejecting serial function_node that puts an item to the decrementer port
// of the limiter inside of the body

std::size_t after_start = test_buffer_reserve<tbb::flow::buffer_node<int>>(threshold,
start_work_items, wait_message, new_work_items, processed_items);

// Expected effect:
// 1. start_work_items would be pushed to the buffer
// 2. wait_message_would be pushed to the buffer
// 3. forward task of the buffer would push wait_message to the limiter node.
// Since the limiter threshold is not reached, it would be directly passed to the function
// 4. function would spawn the task for wait_message processing
// 5. wait_message would be processed that would add new_work_items to the buffer
// 6. decrementer.try_put() would be called and the limiter node would
// process all of the items from the buffer using the try_reserve/try_consume/try_release semantics
// Since the reservation always accepts the front element of the buffer
// it is expected that the items would be taken from the buffer in FIFO order
// instead of LIFO on try_get for buffer_node

std::size_t check_index = 0;

CHECK_MESSAGE(after_start == 1, "try_put_and_wait should process only wait_message");
CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected wait_message processing");

for (auto item : start_work_items) {
CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing");
}

for (auto item : new_work_items) {
CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected start_work_items processing");
}

}
}
}
#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT

Expand Down
50 changes: 50 additions & 0 deletions test/tbb/test_buffering_try_put_and_wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,56 @@ std::size_t test_buffer_pull(const std::vector<int>& start_work_items,
return after_try_put_and_wait_start_index;
}

template <typename BufferingNode, typename... Args>
std::size_t test_buffer_reserve(std::size_t limiter_threshold,
const std::vector<int>& start_work_items,
int wait_message,
const std::vector<int>& new_work_items,
std::vector<int>& processed_items,
Args... args)
{
tbb::task_arena arena(1);
std::size_t after_try_put_and_wait_start_index = 0;

arena.execute([&] {
tbb::flow::graph g;

BufferingNode buffer(g, args...);

tbb::flow::limiter_node<int, int> limiter(g, limiter_threshold);
tbb::flow::function_node<int, int, tbb::flow::rejecting> function(g, tbb::flow::serial,
[&](int input) {
if (input == wait_message) {
for (auto item : new_work_items) {
buffer.try_put(item);
}
}
// Explicitly put to the decrementer instead of making edge
// to guarantee that the next task would be spawned and not returned
// to the current thread as the next task
// Otherwise, all elements would be processed during the try_put_and_wait
limiter.decrementer().try_put(1);
processed_items.emplace_back(input);
return 0;
});

tbb::flow::make_edge(buffer, limiter);
tbb::flow::make_edge(limiter, function);

for (auto item : start_work_items) {
buffer.try_put(item);
}

buffer.try_put_and_wait(wait_message);

after_try_put_and_wait_start_index = processed_items.size();

g.wait_for_all();
});

return after_try_put_and_wait_start_index;
}

} // test_try_put_and_wait

#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
Expand Down
Loading
Loading