Skip to content

Commit

Permalink
[dataflow] Add a flag to indicate that some nodes cannot be put on se…
Browse files Browse the repository at this point in the history
…parate task threads
  • Loading branch information
jcelerier committed Jul 14, 2024
1 parent a481c5a commit b4458be
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
24 changes: 22 additions & 2 deletions src/ossia/dataflow/graph/graph_parallel_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ class executor

void set_task_executor(task_function f) { m_func = std::move(f); }

void enqueue_task(task& task)
{
if(task.m_node->not_threadable())
{
[[unlikely]];
m_not_threadsafe_tasks.enqueue(&task);
}
else
{
[[likely]];
m_tasks.enqueue(&task);
}
}

void run(taskflow& tf)
{
m_tf = &tf;
Expand Down Expand Up @@ -199,7 +213,7 @@ class executor
assert(task.m_dependencies == 0);
#endif
std::atomic_thread_fence(std::memory_order_release);
m_tasks.enqueue(&task);
enqueue_task(task);
}
#if defined(DISABLE_DONE_TASKS)
else
Expand All @@ -221,6 +235,11 @@ class executor
while(m_doneTasks.load(std::memory_order_relaxed) != m_toDoTasks)
{
task* t{};
if(m_not_threadsafe_tasks.wait_dequeue_timed(t, 1))
{
execute(*t);
}

if(m_tasks.wait_dequeue_timed(t, 1))
{
execute(*t);
Expand Down Expand Up @@ -266,7 +285,7 @@ class executor
}
#endif
std::atomic_thread_fence(std::memory_order_release);
m_tasks.enqueue(&nextTask);
enqueue_task(nextTask);
}
#if defined(DISABLE_DONE_TASKS)
else
Expand Down Expand Up @@ -336,6 +355,7 @@ class executor
std::size_t m_toDoTasks = 0;

moodycamel::BlockingConcurrentQueue<task*> m_tasks;
moodycamel::BlockingConcurrentQueue<task*> m_not_threadsafe_tasks;

#if defined(CHECK_EXEC_COUNTS)
std::array<std::atomic_int, 5000> m_checkVec;
Expand Down
11 changes: 11 additions & 0 deletions src/ossia/dataflow/graph_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ class OSSIA_EXPORT graph_node
void set_mute(bool b) noexcept { m_muted = b; }
[[nodiscard]] bool muted() const noexcept { return m_muted; }

/**
* Indicates that the node implementation must always be scheduled on the same thread.
* Main use case: QJSEngine which is not thread-safe.
*/
[[nodiscard]]
bool not_threadable() const noexcept
{
return m_not_threadable;
}

virtual void all_notes_off() noexcept;
token_request_vec requested_tokens;

Expand All @@ -162,6 +172,7 @@ class OSSIA_EXPORT graph_node
outlets m_outlets;

bool m_executed{};
bool m_not_threadable{};

private:
bool m_start_discontinuous{};
Expand Down

0 comments on commit b4458be

Please sign in to comment.