Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Part 6: Add implementation for limiter_node + reservation support for buffers #1425

Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
19a86e2
Add implementation for function_node
kboyarinov Jun 11, 2024
dbe50f7
Fix non-CPF build
kboyarinov Jun 11, 2024
660646e
Add try_put_and_wait_production branch to the CI trigger list
kboyarinov Jun 11, 2024
531da5b
Fix test issues
kboyarinov Jun 13, 2024
f97c794
Fix CI issues
kboyarinov Jun 13, 2024
7afdd9d
+ more use of override keyword
kboyarinov Jun 13, 2024
da15916
vertexes->vertices, remove unnecessary iostream include
kboyarinov Jun 21, 2024
92bf552
Remove unnecessary variadics
kboyarinov Jun 25, 2024
2bb5948
Simplify cache impl
kboyarinov Jun 25, 2024
73eb513
Fix unused variable
kboyarinov Jun 25, 2024
f92503a
Add stub for continue_input
kboyarinov Jun 25, 2024
6d82b64
Fix whitespace
kboyarinov Jun 25, 2024
bc01522
Fix issues
kboyarinov Jun 25, 2024
6d6c145
Save progress
kboyarinov Jun 18, 2024
4e05f31
Add implementation for buffering nodes
kboyarinov Jun 20, 2024
842381c
Add to CI
kboyarinov Jun 20, 2024
2c011ed
Fix copyrights
kboyarinov Jun 21, 2024
67226f3
Add missed file
kboyarinov Jun 21, 2024
708d655
Add test definition for try_get with meta
kboyarinov Jun 21, 2024
ed5d05b
Fix metainfo namespace in test
kboyarinov Jun 21, 2024
811b9b9
join_node::try_get stub
kboyarinov Jun 21, 2024
703104a
Allign with base branch
kboyarinov Jun 25, 2024
b697c09
Add implementation for limiter_node + reservation support for buffers
kboyarinov Jun 26, 2024
a342444
Fix CI
kboyarinov Jun 27, 2024
f6f7673
Address review comments
kboyarinov Jul 8, 2024
e08d81f
Rearrange base task constructors
kboyarinov Jul 8, 2024
4029091
Introduce separate macro for try_put_and_wait feature
kboyarinov Jul 17, 2024
5d66123
Simplify code
kboyarinov Jul 17, 2024
2033045
Address review comments
kboyarinov Jul 22, 2024
967aa53
Add missed comment
kboyarinov Jul 22, 2024
c77e9c5
Update test/tbb/test_function_node.cpp
kboyarinov Jul 22, 2024
5aa2e34
Rename template helpers
kboyarinov Jul 22, 2024
0fd4b26
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
fc1239f
Fix incorrect merging
kboyarinov Jul 23, 2024
0321596
Fix previous review comments
kboyarinov Jul 23, 2024
34ea303
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
ba5d26e
Apply part1 review comments
kboyarinov Jul 23, 2024
3691466
Apply PR review comments
kboyarinov Jul 25, 2024
c7c43e1
Fix review comment
kboyarinov Jul 25, 2024
eeffec0
Rename boolean flags
kboyarinov Jul 26, 2024
0b2b098
Rename metainfo-friendly graph_task class
kboyarinov Jul 26, 2024
03c9391
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
ac27bb1
Add this_thread_in_graph_arena usage
kboyarinov Aug 1, 2024
a23dc02
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
a82c649
Fix merging
kboyarinov Aug 1, 2024
fbf5fbc
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
6787ae5
Remove unnecessary TODO
kboyarinov Aug 1, 2024
078f3ad
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
061e2e7
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
7106396
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 15, 2024
082ea7d
Update test/tbb/test_limiter_node.cpp
kboyarinov Aug 15, 2024
56989b7
Merge branch 'dev/kboyarinov/try_put_and_wait_limiter_node' of https:…
kboyarinov Aug 15, 2024
b5c674d
Fix incorrect merging
kboyarinov Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master, dev/kboyarinov/try_put_and_wait_production]
branches: [master, dev/kboyarinov/try_put_and_wait_production, dev/kboyarinov/try_put_and_wait_buffering_nodes]
types:
- opened
- synchronize
Expand Down
4 changes: 3 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ class threshold_regulator<T, DecrementType,
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
vossmjp marked this conversation as resolved.
Show resolved Hide resolved
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
Expand Down
82 changes: 64 additions & 18 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;
bool successful_get = false;

do {
predecessor_type *src;
Expand All @@ -113,18 +116,35 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr) {
successful_get = src->try_get( v, *metainfo_ptr );
} else
#endif
{
successful_get = src->try_get( v );
}

if (msg == false) {
if (successful_get == false) {
// Relinquish ownership of the edge
register_successor(*src, *my_owner);
} else {
// Retain ownership of the edge
this->add(*src);
}
} while ( msg == false );
return msg;
} while ( successful_get == false );
return successful_get;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif

// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
Expand Down Expand Up @@ -157,8 +177,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
bool msg = false;
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool successful_reserve = false;

do {
predecessor_type* pred = nullptr;
Expand All @@ -172,9 +193,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
msg = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
successful_reserve = pred->try_reserve( v, *metainfo );
} else
#endif
{
successful_reserve = pred->try_reserve(v);
}

if (msg == false) {
if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
// Relinquish ownership of the edge
register_successor( *pred, *this->my_owner );
Expand All @@ -183,11 +211,21 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Retain ownership of the edge
this->add( *pred);
}
} while ( msg == false );
} while ( successful_reserve == false );

return msg;
return successful_reserve;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
reserved_src.store(nullptr, std::memory_order_relaxed);
Expand Down Expand Up @@ -342,7 +380,7 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
Expand Down Expand Up @@ -425,11 +463,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -444,10 +486,14 @@ class round_robin_cache : public successor_cache<T, M> {
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for round robin cache
graph_task* try_put_task( const T& t, const message_metainfo& ) override {
return try_put_task(t);
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};
Expand Down
26 changes: 24 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,44 @@ class trackable_messages_graph_task : public graph_task {
}
}

trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority,
std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
: graph_task(g, allocator, node_priority)
, my_msg_wait_context_vertices(std::move(msg_waiters))
{
}

trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
const std::forward_list<d1::wait_context_vertex*>& msg_waiters)
: trackable_messages_graph_task(g, allocator, no_priority, msg_waiters) {}

trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
: trackable_messages_graph_task(g, allocator, no_priority, std::move(msg_waiters)) {}

const std::forward_list<d1::wait_context_vertex*> get_msg_wait_context_vertices() const {
return my_msg_wait_context_vertices;
}

protected:
template <typename DerivedType>
void finalize(const d1::execution_data& ed) {
auto wait_context_vertices = std::move(my_msg_wait_context_vertices);
auto msg_reference_vertices = std::move(my_msg_reference_vertices);
graph_task::finalize<DerivedType>(ed);

for (auto& msg_waiter : msg_reference_vertices) {
msg_waiter->release(1);
// If there is no thread reference vertices associated with the task
// then this task was created by transferring the ownership from other metainfo
// instance (e.g. while taking from the buffer)
if (msg_reference_vertices.empty()) {
for (auto& msg_waiter : wait_context_vertices) {
msg_waiter->release(1);
}
} else {
for (auto& msg_waiter : msg_reference_vertices) {
msg_waiter->release(1);
}
}
}
private:
Expand Down
63 changes: 61 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class item_buffer {
return element(i).item;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo& get_my_metainfo(size_t i) {
__TBB_ASSERT(my_item_valid(i), "attempt to get invalid item");
return element(i).metainfo;
}
#endif

// may be called with an empty slot or a slot that has already been constructed into.
void set_my_item(size_t i, const item_type &o
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
Expand All @@ -102,22 +109,45 @@ class item_buffer {
#endif
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
void set_my_item(size_t i, const item_type& o, message_metainfo&& metainfo) {
if(element(i).state != no_item) {
destroy_item(i);
}

new(&(element(i).item)) item_type(o);
new(&element(i).metainfo) message_metainfo(std::move(metainfo));
// Skipping the reservation on metainfo.waiters since the ownership
// is moving from metainfo to the cache
element(i).state = has_item;
}
#endif

// destructively-fetch an object from the buffer
void fetch_item(size_t i, item_type &o) {
__TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot");
o = get_my_item(i); // could have std::move assign semantics
destroy_item(i);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
void fetch_item(size_t i, item_type& o, message_metainfo& metainfo) {
__TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot");
o = get_my_item(i); // could have std::move assign semantics
metainfo = std::move(get_my_metainfo(i));
destroy_item(i);
}
#endif

// move an existing item from one slot to another. The moved-to slot must be unoccupied,
// the moved-from slot must exist and not be reserved. The after, from will be empty,
// to will be occupied but not reserved
void move_item(size_t to, size_t from) {
__TBB_ASSERT(!my_item_valid(to), "Trying to move to a non-empty slot");
__TBB_ASSERT(my_item_valid(from), "Trying to move from an empty slot");
set_my_item(to, get_my_item(from)); // could have std::move semantics
// could have std::move semantics
set_my_item(to, get_my_item(from) __TBB_FLOW_GRAPH_METAINFO_ARG(get_my_metainfo(from)));
destroy_item(from);

}

// put an item in an empty slot. Return true if successful, else false
Expand All @@ -129,12 +159,29 @@ class item_buffer {
return true;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
bool place_item(size_t here, const item_type &me, Metainfo&& metainfo) {
#if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
if(my_item_valid(here)) return false;
#endif
set_my_item(here, me, std::forward<Metainfo>(metainfo));
return true;
}
#endif

// could be implemented with std::move semantics
void swap_items(size_t i, size_t j) {
__TBB_ASSERT(my_item_valid(i) && my_item_valid(j), "attempt to swap invalid item(s)");
item_type temp = get_my_item(i);
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo temp_metainfo = get_my_metainfo(i);
set_my_item(i, get_my_item(j), get_my_metainfo(j));
set_my_item(j, temp, temp_metainfo);
#else
set_my_item(i, get_my_item(j));
set_my_item(j, temp);
#endif
}

void destroy_item(size_type i) {
Expand Down Expand Up @@ -353,6 +400,18 @@ class reservable_item_buffer : public item_buffer<T, A> {
return true;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool reserve_front(T& v, message_metainfo& metainfo) {
if (my_reserved || !my_item_valid(this->my_head)) return false;
my_reserved = true;
// reserving the head
v = this->front();
metainfo = this->front_metainfo();
this->reserve_item(this->my_head);
return true;
}
#endif

void consume_front() {
__TBB_ASSERT(my_reserved, "Attempt to consume a non-reserved item");
this->destroy_front();
Expand Down
7 changes: 7 additions & 0 deletions include/oneapi/tbb/detail/_flow_graph_join_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,13 @@
return op_data.status == SUCCEEDED;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: implement try_get with metainfo for join_node
bool try_get( output_type &v, message_metainfo& ) override {
return try_get(v);
}
#endif

protected:
void reset_node(reset_flags f) override {
input_ports_type::reset(f);
Expand Down
17 changes: 12 additions & 5 deletions include/oneapi/tbb/detail/_flow_graph_node_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,12 @@ class function_input_base : public receiver<Input>, no_assign {
}
else {
input_type i;
if(my_predecessors.get_item(i)) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo metainfo;
#endif
if(my_predecessors.get_item(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) {
++my_concurrency;
new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo)));
}
}
return new_task;
Expand Down Expand Up @@ -351,8 +354,12 @@ class function_input_base : public receiver<Input>, no_assign {
}

//! allocates a task to apply a body
graph_task* create_body_task( const input_type &input
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
graph_task* create_body_task( const input_type &input, Metainfo&& metainfo )
#else
graph_task* create_body_task( const input_type &input )
#endif
{
if (!is_graph_active(my_graph_ref)) {
return nullptr;
Expand All @@ -363,7 +370,7 @@ class function_input_base : public receiver<Input>, no_assign {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (!metainfo.empty()) {
using task_type = apply_body_task_bypass<class_type, input_type, trackable_messages_graph_task>;
t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority, metainfo);
t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority, std::forward<Metainfo>(metainfo));
} else
#endif
{
Expand Down
Loading
Loading