Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Part 6: Add implementation for limiter_node + reservation support for buffers #1425

Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
19a86e2
Add implementation for function_node
kboyarinov Jun 11, 2024
dbe50f7
Fix non-CPF build
kboyarinov Jun 11, 2024
660646e
Add try_put_and_wait_production branch to the CI trigger list
kboyarinov Jun 11, 2024
531da5b
Fix test issues
kboyarinov Jun 13, 2024
f97c794
Fix CI issues
kboyarinov Jun 13, 2024
7afdd9d
+ more use of override keyword
kboyarinov Jun 13, 2024
da15916
vertexes->vertices, remove unnecessary iostream include
kboyarinov Jun 21, 2024
92bf552
Remove unnecessary variadics
kboyarinov Jun 25, 2024
2bb5948
Simplify cache impl
kboyarinov Jun 25, 2024
73eb513
Fix unused variable
kboyarinov Jun 25, 2024
f92503a
Add stub for continue_input
kboyarinov Jun 25, 2024
6d82b64
Fix whitespace
kboyarinov Jun 25, 2024
bc01522
Fix issues
kboyarinov Jun 25, 2024
6d6c145
Save progress
kboyarinov Jun 18, 2024
4e05f31
Add implementation for buffering nodes
kboyarinov Jun 20, 2024
842381c
Add to CI
kboyarinov Jun 20, 2024
2c011ed
Fix copyrights
kboyarinov Jun 21, 2024
67226f3
Add missed file
kboyarinov Jun 21, 2024
708d655
Add test definition for try_get with meta
kboyarinov Jun 21, 2024
ed5d05b
Fix metainfo namespace in test
kboyarinov Jun 21, 2024
811b9b9
join_node::try_get stub
kboyarinov Jun 21, 2024
703104a
Allign with base branch
kboyarinov Jun 25, 2024
b697c09
Add implementation for limiter_node + reservation support for buffers
kboyarinov Jun 26, 2024
a342444
Fix CI
kboyarinov Jun 27, 2024
f6f7673
Address review comments
kboyarinov Jul 8, 2024
e08d81f
Rearrange base task constructors
kboyarinov Jul 8, 2024
4029091
Introduce separate macro for try_put_and_wait feature
kboyarinov Jul 17, 2024
5d66123
Simplify code
kboyarinov Jul 17, 2024
2033045
Address review comments
kboyarinov Jul 22, 2024
967aa53
Add missed comment
kboyarinov Jul 22, 2024
c77e9c5
Update test/tbb/test_function_node.cpp
kboyarinov Jul 22, 2024
5aa2e34
Rename template helpers
kboyarinov Jul 22, 2024
0fd4b26
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
fc1239f
Fix incorrect merging
kboyarinov Jul 23, 2024
0321596
Fix previous review comments
kboyarinov Jul 23, 2024
34ea303
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
ba5d26e
Apply part1 review comments
kboyarinov Jul 23, 2024
3691466
Apply PR review comments
kboyarinov Jul 25, 2024
c7c43e1
Fix review comment
kboyarinov Jul 25, 2024
eeffec0
Rename boolean flags
kboyarinov Jul 26, 2024
0b2b098
Rename metainfo-friendly graph_task class
kboyarinov Jul 26, 2024
03c9391
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
ac27bb1
Add this_thread_in_graph_arena usage
kboyarinov Aug 1, 2024
a23dc02
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
a82c649
Fix merging
kboyarinov Aug 1, 2024
fbf5fbc
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
6787ae5
Remove unnecessary TODO
kboyarinov Aug 1, 2024
078f3ad
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
061e2e7
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
7106396
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 15, 2024
082ea7d
Update test/tbb/test_limiter_node.cpp
kboyarinov Aug 15, 2024
56989b7
Merge branch 'dev/kboyarinov/try_put_and_wait_limiter_node' of https:…
kboyarinov Aug 15, 2024
b5c674d
Fix incorrect merging
kboyarinov Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master]
branches: [master, dev/kboyarinov/try_put_and_wait_production, dev/kboyarinov/try_put_and_wait_function_node, dev/kboyarinov/try_put_and_wait_buffering_nodes]
types:
- opened
- synchronize
Expand Down
7 changes: 6 additions & 1 deletion include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -521,6 +521,11 @@
#define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
#endif

#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \
|| TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT)
#endif

#if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS
#define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1
#endif
Expand Down
57 changes: 47 additions & 10 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -272,29 +272,57 @@ class forward_task_bypass : public graph_task {

//! A task that calls a node's apply_body_bypass function, passing in an input of type Input
// return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
template< typename NodeType, typename Input >
class apply_body_task_bypass : public graph_task {
template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
class apply_body_task_bypass
: public BaseTaskType
{
NodeType &my_node;
Input my_input;

using check_metainfo = std::is_same<BaseTaskType, graph_task>;
using without_metainfo = std::true_type;
using with_metainfo = std::false_type;

graph_task* call_apply_body_bypass_impl(without_metainfo) {
return my_node.apply_body_bypass(my_input
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* call_apply_body_bypass_impl(with_metainfo) {
return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
}
#endif

graph_task* call_apply_body_bypass() {
return call_apply_body_bypass_impl(check_metainfo{});
}

public:
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
node_priority_t node_priority, Metainfo&& metainfo )
: BaseTaskType(g, allocator, node_priority, std::forward<Metainfo>(metainfo).waiters())
, my_node(n), my_input(i) {}
#endif

apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
node_priority_t node_priority = no_priority )
: BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}

d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
graph_task* next_task = call_apply_body_bypass();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
else if (next_task)
next_task = prioritize_task(my_node.graph_reference(), *next_task);
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return next_task;
}

d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return nullptr;
}
};
Expand Down Expand Up @@ -343,6 +371,15 @@ class threshold_regulator<T, DecrementType,
return result;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
vossmjp marked this conversation as resolved.
Show resolved Hide resolved
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
#endif

graph& graph_reference() const override {
return my_node->my_graph;
}
Expand Down
100 changes: 84 additions & 16 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,7 +98,10 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;

Expand All @@ -113,7 +116,15 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr)
{
msg = src->try_get( v, *metainfo_ptr );
} else
#endif
{
msg = src->try_get( v );
}

if (msg == false) {
// Relinquish ownership of the edge
Expand All @@ -125,6 +136,16 @@ class predecessor_cache : public node_cache< sender<T>, M > {
} while ( msg == false );
return msg;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif

// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
Expand Down Expand Up @@ -157,7 +178,8 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool msg = false;

do {
Expand All @@ -172,7 +194,14 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
msg = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
msg = pred->try_reserve( v, *metainfo );
} else
#endif
{
msg = pred->try_reserve(v);
}

if (msg == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
Expand All @@ -187,6 +216,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {

return msg;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
Expand Down Expand Up @@ -268,6 +307,9 @@ class successor_cache : no_copy {
}

virtual graph_task* try_put_task( const T& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache<T>

//! An abstract cache of successors, specialized to continue_msg
Expand Down Expand Up @@ -327,6 +369,9 @@ class successor_cache< continue_msg, M > : no_copy {
}

virtual graph_task* try_put_task( const continue_msg& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache< continue_msg >

//! A cache of successors that are broadcast to
Expand All @@ -336,19 +381,12 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task *new_task = (*i)->try_put_task(t);
graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
// workaround for icc bug
graph& graph_ref = (*i)->graph_reference();
last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
Expand All @@ -365,6 +403,21 @@ class broadcast_cache : public successor_cache<T, M> {
}
return last_task;
}
public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

graph_task* try_put_task( const T &t ) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
Expand Down Expand Up @@ -411,11 +464,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -429,6 +486,17 @@ class round_robin_cache : public successor_cache<T, M> {
}
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};

#endif // __TBB__flow_graph_cache_impl_H
Loading
Loading