Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move completed-epoch handling out of task_manager #299

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
382 changes: 191 additions & 191 deletions ci/perf/gpuc2_bench.csv

Large diffs are not rendered by default.

384 changes: 192 additions & 192 deletions ci/perf/gpuc2_bench.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion include/distr_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class [[deprecated("Use celerity::queue instead")]] distr_queue {
void submit(CGF cgf) { // NOLINT(readability-convert-member-functions-to-static)
// (Note while this function could be made static, it must not be! Otherwise we can't be sure the runtime has been initialized.)
CELERITY_DETAIL_TRACY_ZONE_SCOPED("distr_queue::submit", Orange3);
[[maybe_unused]] const auto tid = detail::runtime::get_instance().get_task_manager().submit_command_group(std::move(cgf));
[[maybe_unused]] const auto tid = detail::runtime::get_instance().submit(std::move(cgf));
CELERITY_DETAIL_TRACY_ZONE_NAME("T{} submit", tid);
}

Expand Down
8 changes: 4 additions & 4 deletions include/fence.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class buffer_snapshot {
namespace celerity::detail {

template <typename T>
class host_object_fence_promise final : public detail::fence_promise {
class host_object_fence_promise final : public detail::task_promise {
public:
explicit host_object_fence_promise(const T* instance) : m_instance(instance) {}

Expand All @@ -85,7 +85,7 @@ class host_object_fence_promise final : public detail::fence_promise {
};

template <typename DataT, int Dims>
class buffer_fence_promise final : public detail::fence_promise {
class buffer_fence_promise final : public detail::task_promise {
public:
explicit buffer_fence_promise(const subrange<Dims>& sr)
: m_subrange(sr), m_data(std::make_unique<DataT[]>(sr.range.size())), m_aid(runtime::get_instance().create_user_allocation(m_data.get())) {}
Expand All @@ -112,7 +112,7 @@ std::future<T> fence(const experimental::host_object<T>& obj) {
side_effects.add_side_effect(detail::get_host_object_id(obj), experimental::side_effect_order::sequential);
auto promise = std::make_unique<detail::host_object_fence_promise<T>>(detail::get_host_object_instance(obj));
auto future = promise->get_future();
[[maybe_unused]] const auto tid = detail::runtime::get_instance().get_task_manager().generate_fence_task({}, std::move(side_effects), std::move(promise));
[[maybe_unused]] const auto tid = detail::runtime::get_instance().fence({}, std::move(side_effects), std::move(promise));

CELERITY_DETAIL_TRACY_ZONE_NAME("T{} fence", tid);
return future;
Expand All @@ -127,7 +127,7 @@ std::future<buffer_snapshot<DataT, Dims>> fence(const buffer<DataT, Dims>& buf,
std::make_unique<detail::range_mapper<Dims, celerity::access::fixed<Dims>>>(celerity::access::fixed<Dims>(sr), access_mode::read, buf.get_range()));
auto promise = std::make_unique<detail::buffer_fence_promise<DataT, Dims>>(sr);
auto future = promise->get_future();
[[maybe_unused]] const auto tid = detail::runtime::get_instance().get_task_manager().generate_fence_task(std::move(access_map), {}, std::move(promise));
[[maybe_unused]] const auto tid = detail::runtime::get_instance().fence(std::move(access_map), {}, std::move(promise));

CELERITY_DETAIL_TRACY_ZONE_NAME("T{} fence", tid);
return future;
Expand Down
15 changes: 9 additions & 6 deletions include/instruction_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace celerity::detail {

class fence_promise;
class task_promise;

/// A node in the `instruction_graph`. This is not implemented as an `intrusive_graph` but references its predecessors by id to avoid thread-safety and lifetime
/// issues.
Expand Down Expand Up @@ -377,12 +377,12 @@ class reduce_instruction final : public matchbox::implement_acceptor<instruction
/// Fulfills a fence promise. Issued directly after a copy_instruction to user memory in case of a buffer fence.
class fence_instruction final : public matchbox::implement_acceptor<instruction, fence_instruction> {
public:
explicit fence_instruction(const instruction_id iid, const int priority, fence_promise* promise) : acceptor_base(iid, priority), m_promise(promise) {}
explicit fence_instruction(const instruction_id iid, const int priority, task_promise* promise) : acceptor_base(iid, priority), m_promise(promise) {}

fence_promise* get_promise() const { return m_promise; };
task_promise* get_promise() const { return m_promise; };

private:
fence_promise* m_promise;
task_promise* m_promise;
};

/// Host object instances are owned by the instruction executor, so once the last reference to a host_object goes out of scope in userland, this instruction
Expand Down Expand Up @@ -423,16 +423,19 @@ class horizon_instruction final : public matchbox::implement_acceptor<instructio
/// Instruction-graph equivalent of an epoch task or command.
class epoch_instruction final : public matchbox::implement_acceptor<instruction, epoch_instruction> {
public:
explicit epoch_instruction(const instruction_id iid, const int priority, const task_id epoch_tid, const epoch_action action, instruction_garbage garbage)
: acceptor_base(iid, priority), m_epoch_tid(epoch_tid), m_epoch_action(action), m_garbage(std::move(garbage)) {}
explicit epoch_instruction(const instruction_id iid, const int priority, const task_id epoch_tid, const epoch_action action, task_promise* const promise,
instruction_garbage garbage)
: acceptor_base(iid, priority), m_epoch_tid(epoch_tid), m_epoch_action(action), m_promise(promise), m_garbage(std::move(garbage)) {}

task_id get_epoch_task_id() const { return m_epoch_tid; }
task_promise* get_promise() const { return m_promise; };
epoch_action get_epoch_action() const { return m_epoch_action; }
const instruction_garbage& get_garbage() const { return m_garbage; }

private:
task_id m_epoch_tid;
epoch_action m_epoch_action;
task_promise* m_promise;
instruction_garbage m_garbage;
};

Expand Down
4 changes: 2 additions & 2 deletions include/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class queue {

/// Submits a command group to the queue.
template <typename CGF>
void submit(CGF cgf) { // NOLINT(readability-convert-member-functions-to-static)
void submit(CGF&& cgf) { // NOLINT(readability-convert-member-functions-to-static)
// (Note while this function could be made static, it must not be! Otherwise we can't be sure the runtime has been initialized.)
CELERITY_DETAIL_TRACY_ZONE_SCOPED("queue::submit", Orange3);
[[maybe_unused]] const auto tid = detail::runtime::get_instance().get_task_manager().submit_command_group(std::move(cgf));
[[maybe_unused]] const auto tid = detail::runtime::get_instance().submit(std::forward<CGF>(cgf));
CELERITY_DETAIL_TRACY_ZONE_NAME("T{} submit", tid);
}

Expand Down
18 changes: 16 additions & 2 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "executor.h"
#include "recorders.h"
#include "scheduler.h"
#include "task.h"
#include "task_manager.h"
#include "types.h"

Expand Down Expand Up @@ -40,9 +41,17 @@ namespace detail {

~runtime();

task_id sync(detail::epoch_action action);
template <typename CGF>
task_id submit(CGF&& cgf) {
// TODO move task-node generation out of handler to avoid task_manager header dependency here
require_call_from_application_thread();
maybe_prune_task_graph();
return m_task_mngr->submit_command_group(std::forward<CGF>(cgf));
}

task_manager& get_task_manager() const;
task_id fence(buffer_access_map access_map, side_effect_map side_effects, std::unique_ptr<task_promise> fence_promise);
fknorr marked this conversation as resolved.
Show resolved Hide resolved
fknorr marked this conversation as resolved.
Show resolved Hide resolved

task_id sync(detail::epoch_action action);

void create_queue();

Expand Down Expand Up @@ -95,11 +104,14 @@ namespace detail {
host_object_id m_next_host_object_id = 0;
reduction_id m_next_reduction_id = no_reduction_id + 1;

task_graph m_tdag;
fknorr marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<task_manager> m_task_mngr;
std::unique_ptr<scheduler> m_schdlr;
std::unique_ptr<executor> m_exec;

std::optional<task_id> m_latest_horizon_reached; // only accessed by executor thread
std::atomic<size_t> m_latest_epoch_reached; // task_id, but cast to size_t to work with std::atomic
task_id m_last_epoch_pruned_before = 0;

std::unique_ptr<detail::task_recorder> m_task_recorder; // accessed by task manager (application thread)
std::unique_ptr<detail::command_recorder> m_command_recorder; // accessed only by scheduler thread (until shutdown)
Expand All @@ -111,6 +123,8 @@ namespace detail {
/// member functions, we call this check at the beginning of all the non-safe ones.
void require_call_from_application_thread() const;

void maybe_prune_task_graph();

// task_manager::delegate
void task_created(const task* tsk) override;

Expand Down
31 changes: 15 additions & 16 deletions include/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ namespace detail {
void add_side_effect(host_object_id hoid, experimental::side_effect_order order);
};

class fence_promise {
class task_promise {
public:
fence_promise() = default;
fence_promise(const fence_promise&) = delete;
fence_promise& operator=(const fence_promise&) = delete;
virtual ~fence_promise() = default;
task_promise() = default;
task_promise(const task_promise&) = delete;
task_promise(task_promise&&) = delete;
task_promise& operator=(const task_promise&) = delete;
task_promise& operator=(task_promise&&) = delete;
virtual ~task_promise() = default;

virtual void fulfill() = 0;
virtual allocation_id get_user_allocation_id() = 0; // TODO move to struct task instead
Expand Down Expand Up @@ -146,7 +148,7 @@ namespace detail {

epoch_action get_epoch_action() const { return m_epoch_action; }

fence_promise* get_fence_promise() const { return m_fence_promise.get(); }
task_promise* get_task_promise() const { return m_promise.get(); }

template <typename Launcher>
Launcher get_launcher() const {
Expand All @@ -164,8 +166,8 @@ namespace detail {
return nullptr;
}

static std::unique_ptr<task> make_epoch(task_id tid, detail::epoch_action action) {
return std::unique_ptr<task>(new task(tid, task_type::epoch, non_collective_group_id, task_geometry{}, {}, {}, {}, {}, action, nullptr));
static std::unique_ptr<task> make_epoch(task_id tid, detail::epoch_action action, std::unique_ptr<task_promise> promise) {
return std::unique_ptr<task>(new task(tid, task_type::epoch, non_collective_group_id, task_geometry{}, {}, {}, {}, {}, action, std::move(promise)));
}

static std::unique_ptr<task> make_host_compute(task_id tid, task_geometry geometry, host_task_launcher launcher, buffer_access_map access_map,
Expand Down Expand Up @@ -197,9 +199,9 @@ namespace detail {
}

static std::unique_ptr<task> make_fence(
task_id tid, buffer_access_map access_map, side_effect_map side_effect_map, std::unique_ptr<fence_promise> fence_promise) {
task_id tid, buffer_access_map access_map, side_effect_map side_effect_map, std::unique_ptr<task_promise> promise) {
return std::unique_ptr<task>(new task(tid, task_type::fence, non_collective_group_id, task_geometry{}, {}, std::move(access_map),
std::move(side_effect_map), {}, {}, std::move(fence_promise)));
std::move(side_effect_map), {}, {}, std::move(promise)));
}

private:
Expand All @@ -213,16 +215,13 @@ namespace detail {
reduction_set m_reductions;
std::string m_debug_name;
detail::epoch_action m_epoch_action;
// TODO I believe that `struct task` should not store command_group_launchers, fence_promise or other state that is related to execution instead of
// abstract DAG building. For user-initialized buffers we already notify the runtime -> executor of this state directly. Maybe also do that for these.
std::unique_ptr<fence_promise> m_fence_promise;
std::unique_ptr<task_promise> m_promise; // TODO keep user_allocation_id in struct task instead of inside task_promise
std::vector<std::unique_ptr<hint_base>> m_hints;

task(task_id tid, task_type type, collective_group_id cgid, task_geometry geometry, command_group_launcher launcher, buffer_access_map access_map,
detail::side_effect_map side_effects, reduction_set reductions, detail::epoch_action epoch_action, std::unique_ptr<fence_promise> fence_promise)
detail::side_effect_map side_effects, reduction_set reductions, detail::epoch_action epoch_action, std::unique_ptr<task_promise> promise)
: m_tid(tid), m_type(type), m_cgid(cgid), m_geometry(geometry), m_launcher(std::move(launcher)), m_access_map(std::move(access_map)),
m_side_effects(std::move(side_effects)), m_reductions(std::move(reductions)), m_epoch_action(epoch_action),
m_fence_promise(std::move(fence_promise)) {
m_side_effects(std::move(side_effects)), m_reductions(std::move(reductions)), m_epoch_action(epoch_action), m_promise(std::move(promise)) {
assert(type == task_type::host_compute || type == task_type::device_compute || get_granularity().size() == 1);
// Only host tasks can have side effects
assert(this->m_side_effects.empty() || type == task_type::host_compute || type == task_type::collective || type == task_type::master_node
Expand Down
85 changes: 15 additions & 70 deletions include/task_manager.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#pragma once

#include <condition_variable>
#include <cstddef>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

Expand All @@ -17,42 +14,11 @@ namespace detail {

class task_recorder;

// Allows other threads to await an epoch change in the task manager.
// This is worth a separate class to encapsulate the synchronization behavior.
class epoch_monitor {
public:
explicit epoch_monitor(const task_id epoch) : m_this_epoch(epoch) {}

task_id get() const {
std::lock_guard lock{m_mutex};
return m_this_epoch;
}

task_id await(const task_id min_tid_reached) const {
std::unique_lock lock{m_mutex};
m_epoch_changed.wait(lock, [&] { return m_this_epoch >= min_tid_reached; });
return m_this_epoch;
}

void set(const task_id epoch) {
{
std::lock_guard lock{m_mutex};
assert(epoch >= m_this_epoch);
m_this_epoch = epoch;
}
m_epoch_changed.notify_all();
}

private:
task_id m_this_epoch = 0;
mutable std::mutex m_mutex;
mutable std::condition_variable m_epoch_changed;
};

// definition is in handler.h to avoid circular dependency
template <typename CGF>
std::unique_ptr<task> invoke_command_group_function(const task_id tid, size_t num_collective_nodes, CGF&& cgf);

// TODO rename to task_graph_generator eventually
class task_manager {
friend struct task_manager_testspy;

Expand All @@ -76,8 +42,15 @@ namespace detail {
error_policy uninitialized_read_error = error_policy::panic;
};

task_manager(
size_t num_collective_nodes, detail::task_recorder* recorder, task_manager::delegate* dlg, const policy_set& policy = default_policy_set());
task_manager(size_t num_collective_nodes, task_graph& tdag, detail::task_recorder* recorder, task_manager::delegate* dlg,
const policy_set& policy = default_policy_set());

// TODO pimpl this - ctors are explicitly deleted / defaulted to avoid lint on `m_tdag` reference member
task_manager(const task_manager&) = delete;
task_manager(task_manager&&) = delete;
task_manager& operator=(const task_manager) = delete;
task_manager& operator=(task_manager&&) = delete;
~task_manager() = default;

template <typename CGF>
task_id submit_command_group(CGF&& cgf) {
Expand All @@ -95,9 +68,9 @@ namespace detail {
* Inserts an epoch task that depends on the entire execution front and that immediately becomes the current epoch_for_new_tasks and the last writer
* for all buffers.
*/
task_id generate_epoch_task(epoch_action action);
task_id generate_epoch_task(epoch_action action, std::unique_ptr<task_promise> promise = nullptr);

task_id generate_fence_task(buffer_access_map access_map, side_effect_map side_effects, std::unique_ptr<fence_promise> fence_promise);
task_id generate_fence_task(buffer_access_map access_map, side_effect_map side_effects, std::unique_ptr<task_promise> fence_promise);

/**
* @brief Adds a new buffer for dependency tracking
Expand All @@ -113,11 +86,6 @@ namespace detail {

void notify_host_object_destroyed(host_object_id hoid);

/**
* Blocks until an epoch task has executed on this node (or all nodes, if the epoch_for_new_tasks was created with `epoch_action::barrier`).
*/
void await_epoch(task_id epoch);

void set_horizon_step(const int step) {
assert(step >= 0);
m_task_horizon_step_size = step;
Expand All @@ -128,22 +96,6 @@ namespace detail {
m_task_horizon_max_parallelism = para;
}

/**
* @brief Notifies the task manager that the given horizon has been executed (used for task deletion).
*
* notify_horizon_reached and notify_epoch_reached must only ever be called from a single thread, but that thread does not have to be the main
* thread.
*/
void notify_horizon_reached(task_id horizon_tid);

/**
* @brief Notifies the task manager that the given epoch has been executed on this node.
*
* notify_horizon_reached and notify_epoch_reached must only ever be called from a single thread, but that thread does not have to be the main
* thread.
*/
void notify_epoch_reached(task_id epoch_tid);

private:
// default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member
// initializers, within its surrounding class (Clang)
Expand All @@ -164,10 +116,10 @@ namespace detail {

task_manager::delegate* m_delegate;

const size_t m_num_collective_nodes;
size_t m_num_collective_nodes;
policy_set m_policy;

task_graph m_tdag;
task_graph& m_tdag;
task_id m_next_tid = initial_epoch_task;

// The active epoch is used as the last writer for host-initialized buffers.
Expand Down Expand Up @@ -201,13 +153,6 @@ namespace detail {
// The latest horizon task created. Will be applied as the epoch for new tasks once the next horizon is created.
task* m_current_horizon = nullptr;

// The last horizon processed by the executor will become the latest_epoch_reached once the next horizon is completed as well.
// Only accessed in task_manager::notify_*, which are always called from the executor thread - no locking needed.
std::optional<task_id> m_latest_horizon_reached;

// The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread.
epoch_monitor m_latest_epoch_reached{initial_epoch_task};

// Track the number of user-generated task and epochs to heuristically detect programs that lose performance by frequently calling `queue::wait()`.
size_t m_num_user_command_groups_submitted = 0;
size_t m_num_user_epochs_generated = 0;
Expand All @@ -216,7 +161,7 @@ namespace detail {
std::unordered_set<task*> m_execution_front;

// An optional task_recorder which records information about tasks for e.g. printing graphs.
mutable detail::task_recorder* m_task_recorder;
detail::task_recorder* m_task_recorder;

task& register_task_internal(std::unique_ptr<task> task);

Expand Down
1 change: 1 addition & 0 deletions src/dry_run_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void dry_run_executor::thread_main(executor::delegate* const dlg) {
host_object_instances.erase(dhoinstr.get_host_object_id());
},
[&](const epoch_instruction& einstr) {
if(einstr.get_promise() != nullptr) { einstr.get_promise()->fulfill(); }
if(dlg != nullptr) { dlg->epoch_reached(einstr.get_epoch_task_id()); }
shutdown |= einstr.get_epoch_action() == epoch_action::shutdown;
},
Expand Down
Loading
Loading