diff --git a/ark/api/executor.cpp b/ark/api/executor.cpp index d9fc9217b..c8e2e7df6 100644 --- a/ark/api/executor.cpp +++ b/ark/api/executor.cpp @@ -14,11 +14,11 @@ #include "codegen.hpp" #include "env.h" #include "file_io.h" -#include "gpu/gpu.h" -#include "gpu/gpu_event.h" -#include "gpu/gpu_kernel.h" -#include "gpu/gpu_logging.h" -#include "gpu/gpu_manager.h" +#include "gpu/gpu.hpp" +#include "gpu/gpu_event.hpp" +#include "gpu/gpu_kernel.hpp" +#include "gpu/gpu_logging.hpp" +#include "gpu/gpu_manager.hpp" #include "logging.hpp" #include "model/model_buffer.hpp" #include "model/model_data_type.hpp" @@ -140,20 +140,30 @@ static size_t tensor_stride_bytes(const Json &tensor) { class Executor::Impl { public: - Impl(int rank, int world_size, int gpu_id, const std::string &name, - const std::string &plan); - ~Impl() = default; + Impl(int device_id, Stream stream, const std::string &name, bool loop_mode); + ~Impl(); + + void init(const PlanJson& plan); + + int device_id() const { return device_id_; } + + Stream stream() const { return reinterpret_cast(stream_raw_); } + + std::string plan() const { return plan_json_.dump_pretty(); } void compile(); - void launch(int64_t max_spin_count); + void launch(); void run(int iter); void wait(int64_t max_spin_count); float stop(int64_t max_spin_count); void barrier(); - void tensor_read(const Tensor tensor, void *data, size_t bytes) const; - void tensor_write(const Tensor tensor, const void *data, - size_t bytes) const; + uintptr_t tensor_address(const Tensor &tensor) const; + + void tensor_read(const Tensor &tensor, void *data, size_t bytes, + Stream stream, bool is_d2d) const; + void tensor_write(const Tensor &tensor, const void *data, size_t bytes, + Stream stream, bool is_d2d) const; private: void init_communicator(); @@ -162,14 +172,20 @@ class Executor::Impl { void init_channels(const std::set &remote_ranks); protected: - const int rank_; - const int world_size_; - int gpu_id_; + int device_id_; + std::string name_; + bool loop_mode_; + + gpuStream stream_raw_; + + int rank_; + int world_size_; bool is_launched_ = false; bool is_recording_ = false; float elapsed_msec_ = -1; + PlanJson plan_json_; std::map buffer_id_to_offset_; size_t total_bytes_; std::shared_ptr codegen_; @@ -177,8 +193,7 @@ class Executor::Impl { std::shared_ptr timer_end_; std::shared_ptr buffer_; std::shared_ptr flag_; - std::shared_ptr main_stream_; - std::shared_ptr copy_stream_; + std::shared_ptr stream_; std::shared_ptr kernel_; // For communication @@ -190,30 +205,38 @@ class Executor::Impl { rank_to_sm_channels_; }; -Executor::Impl::Impl(int rank, int world_size, int gpu_id, - const std::string &name, const std::string &plan) - : rank_(rank), world_size_(world_size), gpu_id_(gpu_id) { - if (rank < 0 || rank >= world_size) { - ERR(InvalidUsageError, "Invalid rank ", rank, " with world size ", - world_size); +Executor::Impl::Impl(int device_id, Stream stream, const std::string &name, + bool loop_mode) + : device_id_(device_id), name_(name), loop_mode_(loop_mode) { + if (device_id < 0) { + ERR(InvalidUsageError, "Invalid device ID ", device_id); + } + if (stream) { + stream_raw_ = reinterpret_cast(stream); + } else { + stream_ = GpuManager::get_instance(device_id_)->create_stream(); + stream_raw_ = stream_->get(); } - if (gpu_id < 0) { - ERR(InvalidUsageError, "Invalid GPU ID ", gpu_id); +} + +Executor::Impl::~Impl() { + if (is_launched_) stop(-1); +} + +void Executor::Impl::init(const PlanJson &plan_json) { + plan_json_ = plan_json; + rank_ = plan_json_["Rank"].get(); + world_size_ = plan_json_["WorldSize"].get(); + + if (rank_ < 0 || rank_ >= world_size_) { + ERR(InvalidUsageError, "Invalid rank ", rank_, " with world size ", + world_size_); } if (world_size_ > 1) { init_communicator(); } - Json plan_json; - auto &plan_path = get_env().enforce_plan_path; - if (!plan_path.empty()) { - LOG(INFO, "Enforce executor plan path: ", plan_path); - plan_json = Json::parse(read_file(plan_path)); - } else { - plan_json = Json::parse(plan); - } - - auto gpu_manager = GpuManager::get_instance(gpu_id_); + auto gpu_manager = GpuManager::get_instance(device_id_); if (!gpu_manager->info().arch->belongs_to( Arch::from_name(plan_json.at("Architecture")))) { LOG(WARN, "Architecture name of the plan `", @@ -222,7 +245,7 @@ Executor::Impl::Impl(int rank, int world_size, int gpu_id, gpu_manager->info().arch->name(), "`."); } - buffer_id_to_offset_ = init_buffers(plan_json); + buffer_id_to_offset_ = init_buffers(plan_json_); std::string buffer_id_to_offset_str; for (const auto &kv : buffer_id_to_offset_) { @@ -230,34 +253,39 @@ Executor::Impl::Impl(int rank, int world_size, int gpu_id, std::to_string(kv.first) + ": " + std::to_string(kv.second) + ", "; } - codegen_ = - std::make_shared(plan_json, buffer_id_to_offset_, name); + codegen_ = std::make_shared(plan_json_, buffer_id_to_offset_, + name_); timer_begin_ = gpu_manager->create_event(); timer_end_ = gpu_manager->create_event(); buffer_ = gpu_manager->malloc(total_bytes_, 65536); flag_ = gpu_manager->malloc_host( sizeof(int), gpuHostAllocMapped | gpuHostAllocWriteCombined); - main_stream_ = gpu_manager->create_stream(); - copy_stream_ = gpu_manager->create_stream(); int threads_per_block = static_cast( codegen_->num_warps_per_proc() * gpu_manager->info().threads_per_warp); int num_sm = static_cast(codegen_->num_procs()); - int *flag = flag_->ref(); size_t smem_block_total = static_cast(gpu_manager->info().smem_block_total); if (world_size_ > 1) { - auto remote_ranks = init_remote_ranks(plan_json); + auto remote_ranks = init_remote_ranks(plan_json_); init_channels(remote_ranks); } + std::string kernel_name; + if (loop_mode_) { + kernel_name = "ark_loop_kernel"; + } else { + kernel_name = "ark_kernel"; + } + if (!name_.empty()) { + kernel_name += "_" + name_; + } + kernel_ = std::shared_ptr(new GpuKernel( - gpu_id_, codegen_->code(), {threads_per_block, 1, 1}, {num_sm, 1, 1}, - std::max(smem_block_total, size_t(4)), name, - {std::pair{buffer_->ref(), sizeof(buffer_->ref())}, - std::pair{flag, sizeof(flag)}})); + device_id_, codegen_->code(), {threads_per_block, 1, 1}, {num_sm, 1, 1}, + std::max(smem_block_total, size_t(4)), kernel_name)); } void Executor::Impl::init_communicator() { @@ -517,7 +545,7 @@ void Executor::Impl::init_channels(const std::set &remote_ranks) { mscclpp::TransportFlags all_transports = mscclpp::Transport::CudaIpc | mscclpp::Transport::Ethernet; if (!get_env().disable_ib) { - all_transports |= IBs[gpu_id_]; + all_transports |= IBs[device_id_]; } mscclpp::RegisteredMemory regmem = comm_->registerMemory(buffer_->ref(), buffer_->bytes(), all_transports); @@ -538,12 +566,12 @@ void Executor::Impl::init_channels(const std::set &remote_ranks) { if (remote_node == this_node) { add_connection(remote_rank, mscclpp::Transport::CudaIpc); if (!get_env().disable_ib) { - add_connection(remote_rank, IBs[gpu_id_]); + add_connection(remote_rank, IBs[device_id_]); } } else { add_connection(remote_rank, get_env().disable_ib ? mscclpp::Transport::Ethernet - : IBs[gpu_id_]); + : IBs[device_id_]); } comm_->sendMemoryOnSetup(regmem, remote_rank, 0); rank_to_remote_regmem_future[remote_rank] = @@ -596,13 +624,12 @@ void Executor::Impl::init_channels(const std::set &remote_ranks) { void Executor::Impl::compile() { kernel_->compile(); } -void Executor::Impl::launch(int64_t max_spin_count) { +void Executor::Impl::launch() { if (!kernel_->is_compiled()) { ERR(InvalidUsageError, "Need to compile first before initialization."); } if (is_launched_) { - // Wait until previous works finish. - this->wait(max_spin_count); + LOG(WARN, "Ignore launching twice."); return; } auto get_global_rt = [&](const std::string &symbol) { @@ -631,83 +658,102 @@ void Executor::Impl::launch(int64_t max_spin_count) { sm_handles[i] = it2->second[0]->deviceHandle(); } } - GLOG(gpuSetDevice(gpu_id_)); + GLOG(gpuSetDevice(device_id_)); GLOG(gpuMemcpyAsync( proxy_chan_addr, proxy_handles.data(), proxy_handles.size() * sizeof(mscclpp::SimpleProxyChannel::DeviceHandle), - gpuMemcpyHostToDevice, copy_stream_->get())); + gpuMemcpyHostToDevice, stream_raw_)); GLOG(gpuMemcpyAsync( proxy_secondary_chan_addr, proxy_secondary_handles.data(), proxy_secondary_handles.size() * sizeof(mscclpp::SimpleProxyChannel::DeviceHandle), - gpuMemcpyHostToDevice, copy_stream_->get())); + gpuMemcpyHostToDevice, stream_raw_)); GLOG(gpuMemcpyAsync( sm_chan_addr, sm_handles.data(), sm_handles.size() * sizeof(mscclpp::SmChannel::DeviceHandle), - gpuMemcpyHostToDevice, copy_stream_->get())); - copy_stream_->sync(); + gpuMemcpyHostToDevice, stream_raw_)); + GLOG(gpuStreamSynchronize(stream_raw_)); } elapsed_msec_ = -1; - if (!kernel_->is_compiled()) { - ERR(InvalidUsageError, "Need to compile first before initialization."); - } else if (is_launched_) { - LOG(WARN, "Ignore launching twice."); - return; - } - timer_begin_->record(main_stream_); + timer_begin_->record(stream_raw_); if (world_size_ > 1) { proxy_service_->startProxy(); } - // Initialize loop flags. - atomicStoreRelaxed(flag_->ref(), 0); - kernel_->launch(main_stream_); - timer_end_->record(main_stream_); + if (loop_mode_) { + // Initialize loop flags. + atomicStoreRelaxed(flag_->ref(), 0); + void *buf_ptr = buffer_->ref(); + void *flag_ptr = flag_->ref(); + std::vector args = {&buf_ptr, &flag_ptr}; + kernel_->launch(stream_raw_, args); + } is_recording_ = true; is_launched_ = true; } void Executor::Impl::run(int iter) { - if (iter > 0) { + if (iter <= 0) return; + if (loop_mode_) { while (atomicLoadRelaxed(flag_->ref()) > 0) { } atomicStoreRelaxed(flag_->ref(), iter); + } else { + void *buf_ptr = buffer_->ref(); + int i = 0; + std::vector args = {&buf_ptr, reinterpret_cast(&i)}; + for (; i < iter; i++) { + kernel_->launch(stream_raw_, args); + } } } void Executor::Impl::wait(int64_t max_spin_count) { int64_t cnt = max_spin_count; - while (atomicLoadRelaxed(flag_->ref()) > 0) { - if (cnt-- > 0) { - continue; - } - // Check if the kernel encountered an error. - gpuError res = main_stream_->query(); - if (res == gpuSuccess) { - if (atomicLoadRelaxed(flag_->ref()) > 0) { - LOG(WARN, "Stream is finished but the loop flag is still set."); - break; + if (loop_mode_) { + while (atomicLoadRelaxed(flag_->ref()) > 0) { + if (cnt-- > 0) { + continue; + } + // Check if the kernel encountered an error. + gpuError res = gpuStreamQuery(stream_raw_); + if (res == gpuSuccess) { + if (atomicLoadRelaxed(flag_->ref()) > 0) { + LOG(WARN, + "Stream is finished but the loop flag is still set."); + break; + } else { + LOG(WARN, + "wait() is delayed by a stream query. Regarding " + "timing measurements may be inaccurate."); + break; + } + } else if (res == gpuErrorNotReady) { + cnt = max_spin_count; } else { - LOG(WARN, - "wait() is delayed by a stream query. Regarding " - "timing measurements may be inaccurate."); - break; + GLOG(res); } - } else if (res == gpuErrorNotReady) { - cnt = max_spin_count; - } else { - GLOG(res); } + } else { + if (max_spin_count >= 0) { + LOG(WARN, "max_spin_count is ignored in non-loop mode."); + } + GLOG(gpuStreamSynchronize(stream_raw_)); } } float Executor::Impl::stop(int64_t max_spin_count) { this->wait(max_spin_count); - atomicStoreRelaxed(flag_->ref(), -1); - main_stream_->sync(); + if (is_recording_) { + timer_end_->record(stream_raw_); + } + if (loop_mode_) { + atomicStoreRelaxed(flag_->ref(), -1); + } + GLOG(gpuStreamSynchronize(stream_raw_)); if (is_recording_) { elapsed_msec_ = timer_end_->elapsed_msec(*timer_begin_); is_recording_ = false; @@ -725,74 +771,144 @@ void Executor::Impl::barrier() { } } -void Executor::Impl::tensor_read(const Tensor tensor, void *data, - size_t bytes) const { - GLOG(gpuSetDevice(gpu_id_)); +uintptr_t Executor::Impl::tensor_address(const Tensor &tensor) const { + size_t buffer_id = tensor.ref()->buffer()->id(); + if (buffer_id_to_offset_.find(buffer_id) == buffer_id_to_offset_.end()) { + ERR(InternalError, "Invalid buffer ID: ", buffer_id); + } + size_t offset = buffer_id_to_offset_.at(buffer_id); + return reinterpret_cast(buffer_->ref(offset)); +} + +void Executor::Impl::tensor_read(const Tensor &tensor, void *data, size_t bytes, + Stream stream, bool is_d2d) const { + GLOG(gpuSetDevice(device_id_)); + std::shared_ptr copy_stream; + gpuStream copy_stream_raw; + if (stream) { + copy_stream_raw = reinterpret_cast(stream); + if ((stream == stream_raw_) && is_launched_) { + LOG(WARN, + "Reading from a tensor in the same stream of the kernel " + "may cause a deadlock."); + } + } else { + copy_stream = GpuManager::get_instance(device_id_)->create_stream(); + copy_stream_raw = copy_stream->get(); + } size_t tensor_data_bytes = tensor.shape().nelems() * tensor.data_type().bytes(); - if (bytes < tensor_data_bytes) { - ERR(InvalidUsageError, "Data buffer (", bytes, - ") is smaller than the tensor data (", tensor_data_bytes, ")."); + if (bytes != tensor_data_bytes) { + ERR(InvalidUsageError, "Destination bytes (", bytes, + ") mismatches the tensor data bytes (", tensor_data_bytes, ")."); } - size_t tensor_bytes = - tensor.strides().nelems() * tensor.data_type().bytes(); - void *src = - buffer_->ref(buffer_id_to_offset_.at(tensor.ref()->buffer()->id())); + auto kind = (is_d2d) ? gpuMemcpyDeviceToDevice : gpuMemcpyDeviceToHost; + void *src = reinterpret_cast(tensor_address(tensor)); if (tensor.strides() == tensor.shape()) { - GLOG(gpuMemcpyAsync(data, src, bytes, gpuMemcpyDeviceToHost, - copy_stream_->get())); - copy_stream_->sync(); + GLOG(gpuMemcpyAsync(data, src, bytes, kind, copy_stream_raw)); } else { + size_t tensor_bytes = + tensor.strides().nelems() * tensor.data_type().bytes(); std::vector tensor_host(tensor_bytes); GLOG(gpuMemcpyAsync(tensor_host.data(), src, tensor_bytes, - gpuMemcpyDeviceToHost, copy_stream_->get())); - copy_stream_->sync(); - tensor_to_data(tensor_host.data(), static_cast(data), - tensor.shape(), tensor.strides(), tensor.offsets(), + gpuMemcpyDeviceToHost, copy_stream_raw)); + GLOG(gpuStreamSynchronize(copy_stream_raw)); + if (!is_d2d) { + tensor_to_data(tensor_host.data(), static_cast(data), + tensor.shape(), tensor.strides(), tensor.offsets(), + tensor.data_type().bytes()); + return; + } + // TODO: convert data layout on the device directly + std::vector data_host(bytes); + tensor_to_data(tensor_host.data(), data_host.data(), tensor.shape(), + tensor.strides(), tensor.offsets(), tensor.data_type().bytes()); + GLOG(gpuMemcpyAsync(data, data_host.data(), bytes, + gpuMemcpyHostToDevice, copy_stream_raw)); } + GLOG(gpuStreamSynchronize(copy_stream_raw)); } -void Executor::Impl::tensor_write(const Tensor tensor, const void *data, - size_t bytes) const { - GLOG(gpuSetDevice(gpu_id_)); +void Executor::Impl::tensor_write(const Tensor &tensor, const void *data, + size_t bytes, Stream stream, + bool is_d2d) const { + GLOG(gpuSetDevice(device_id_)); + std::shared_ptr copy_stream; + gpuStream copy_stream_raw; + if (stream) { + copy_stream_raw = reinterpret_cast(stream); + if ((stream == stream_raw_) && is_launched_) { + LOG(WARN, + "Writing to a tensor in the same stream of the kernel " + "may cause a deadlock."); + } + } else { + copy_stream = GpuManager::get_instance(device_id_)->create_stream(); + copy_stream_raw = copy_stream->get(); + } size_t tensor_data_bytes = tensor.shape().nelems() * tensor.data_type().bytes(); - if (bytes < tensor_data_bytes) { - ERR(InvalidUsageError, "Data buffer (", bytes, - ") is smaller than the tensor data (", tensor_data_bytes, ")."); + if (bytes != tensor_data_bytes) { + ERR(InvalidUsageError, "Source bytes (", bytes, + ") mismatches the tensor data bytes (", tensor_data_bytes, ")."); } size_t tensor_bytes = tensor.strides().nelems() * tensor.data_type().bytes(); - void *dst = - buffer_->ref(buffer_id_to_offset_.at(tensor.ref()->buffer()->id())); + auto kind = (is_d2d) ? gpuMemcpyDeviceToDevice : gpuMemcpyHostToDevice; + void *dst = reinterpret_cast(tensor_address(tensor)); if (tensor.strides() == tensor.shape()) { - GLOG(gpuMemcpyAsync(dst, data, tensor_bytes, gpuMemcpyHostToDevice, - copy_stream_->get())); + GLOG(gpuMemcpyAsync(dst, data, tensor_bytes, kind, copy_stream_raw)); } else { std::vector tensor_host(tensor_bytes); - GLOG(gpuMemcpyAsync(tensor_host.data(), dst, tensor_bytes, - gpuMemcpyDeviceToHost, copy_stream_->get())); - copy_stream_->sync(); - data_to_tensor(tensor_host.data(), static_cast(data), - tensor.shape(), tensor.strides(), tensor.offsets(), - tensor.data_type().bytes()); + if (!is_d2d) { + GLOG(gpuMemcpyAsync(tensor_host.data(), dst, tensor_bytes, + gpuMemcpyDeviceToHost, copy_stream_raw)); + GLOG(gpuStreamSynchronize(copy_stream_raw)); + data_to_tensor(tensor_host.data(), + static_cast(data), tensor.shape(), + tensor.strides(), tensor.offsets(), + tensor.data_type().bytes()); + } else { + // TODO: convert data layout on the device directly + std::vector tmp(bytes); + GLOG(gpuMemcpyAsync(tmp.data(), data, bytes, gpuMemcpyDeviceToHost, + copy_stream_raw)); + GLOG(gpuStreamSynchronize(copy_stream_raw)); + data_to_tensor(tensor_host.data(), tmp.data(), tensor.shape(), + tensor.strides(), tensor.offsets(), + tensor.data_type().bytes()); + } GLOG(gpuMemcpyAsync(dst, tensor_host.data(), tensor_bytes, - gpuMemcpyHostToDevice, copy_stream_->get())); + gpuMemcpyHostToDevice, copy_stream_raw)); } - copy_stream_->sync(); + GLOG(gpuStreamSynchronize(copy_stream_raw)); } -Executor::Executor(int rank, int world_size, int gpu_id, - const std::string &name, const std::string &plan) - : impl_(std::make_unique(rank, world_size, gpu_id, name, - plan)) {} +Executor::Executor(int device_id, Stream stream, const std::string &name, + const std::string &plan, bool loop_mode) + : impl_(std::make_unique(device_id, stream, name, + loop_mode)) { + auto &plan_path = get_env().enforce_plan_path; + if (!plan_path.empty()) { + LOG(INFO, "Enforce executor plan path: ", plan_path); + impl_->init(Json::parse(read_file(plan_path))); + } else if (!plan.empty()) { + impl_->init(Json::parse(plan)); + } +} Executor::~Executor() = default; +int Executor::device_id() const { return impl_->device_id(); } + +Stream Executor::stream() const { return impl_->stream(); } + +std::string Executor::plan() const { return impl_->plan(); } + void Executor::compile() { impl_->compile(); } -void Executor::launch(int64_t max_spin_count) { impl_->launch(max_spin_count); } +void Executor::launch() { impl_->launch(); } void Executor::run(int iter) { impl_->run(iter); } @@ -808,25 +924,32 @@ void Executor::destroy() { impl_.reset(nullptr); } bool Executor::destroyed() const { return impl_.get() == nullptr; } -void Executor::tensor_read(const Tensor tensor, void *data, - size_t bytes) const { - impl_->tensor_read(tensor, data, bytes); +uintptr_t Executor::tensor_address(const Tensor &tensor) const { + return impl_->tensor_address(tensor); +} + +void Executor::tensor_read(const Tensor &tensor, void *data, size_t bytes, + Stream stream, bool is_d2d) const { + impl_->tensor_read(tensor, data, bytes, stream, is_d2d); } -void Executor::tensor_write(const Tensor tensor, const void *data, - size_t bytes) const { - impl_->tensor_write(tensor, data, bytes); +void Executor::tensor_write(const Tensor &tensor, const void *data, + size_t bytes, Stream stream, bool is_d2d) const { + impl_->tensor_write(tensor, data, bytes, stream, is_d2d); } -DefaultExecutor::DefaultExecutor(const Model &model, int gpu_id, - const std::string &name) - : Executor( - model.rank(), model.world_size(), - (gpu_id < 0) ? (model.rank() % get_env().num_ranks_per_host) : gpu_id, - name, - Planner(model, (gpu_id < 0) - ? (model.rank() % get_env().num_ranks_per_host) - : gpu_id) - .plan()) {} +DefaultExecutor::DefaultExecutor( + const Model &model, int device_id, Stream stream, + const std::vector &config_rules, + const std::string &name, bool loop_mode) + : Executor((device_id < 0) ? (model.rank() % get_env().num_ranks_per_host) + : device_id, + stream, name, "", loop_mode) { + Planner planner(model, impl_->device_id()); + for (const auto &rule : config_rules) { + planner.install_config_rule(rule); + } + impl_->init(Json::parse(planner.plan())); +} } // namespace ark diff --git a/ark/api/executor_test.cpp b/ark/api/executor_test.cpp new file mode 100644 index 000000000..dad0e9d83 --- /dev/null +++ b/ark/api/executor_test.cpp @@ -0,0 +1,192 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include "ark/executor.hpp" + +#include "gpu/gpu.hpp" +#include "model/model_json.hpp" +#include "unittest/unittest_utils.h" + +template +ark::unittest::State test_executor() { + ark::gpuStream stream; + UNITTEST_EQ( + ark::gpuStreamCreateWithFlags(&stream, ark::gpuStreamNonBlocking), + ark::gpuSuccess); + + ark::Model empty; + { + ark::DefaultExecutor executor(empty, 0, stream, {}, "test", LoopMode); + UNITTEST_EQ(executor.device_id(), 0); + UNITTEST_EQ(executor.stream(), stream); + + executor.compile(); + executor.launch(); + executor.run(1); + executor.wait(); + executor.stop(); + executor.destroy(); + + UNITTEST_TRUE(executor.destroyed()); + } + { + ark::DefaultExecutor executor(empty, 0, stream, {}, "test", LoopMode); + executor.compile(); + executor.launch(); + executor.run(1); + executor.wait(); + executor.stop(); + + executor.launch(); + executor.run(1); + executor.wait(); + executor.stop(); + + executor.destroy(); + } + { + ark::DefaultExecutor executor(empty, 0, stream, {}, "test", LoopMode); + UNITTEST_THROW(executor.launch(), ark::InvalidUsageError); + + executor.compile(); + executor.launch(); + executor.launch(); // Will be ignored with a warning. + executor.run(1); + executor.wait(); + executor.wait(); // nothing to do + + // Stop & destroy automatically. + } + + UNITTEST_EQ(ark::gpuStreamDestroy(stream), ark::gpuSuccess); + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_executor_loop() { return test_executor(); } + +ark::unittest::State test_executor_no_loop() { return test_executor(); } + +ark::unittest::State test_executor_tensor_read_write(ark::Dims shape, + ark::Dims stride, + ark::Dims offset) { + // Alloc CPU array + std::vector host_data(shape.nelems()); + for (size_t i = 0; i < host_data.size(); ++i) { + host_data[i] = static_cast(i); + } + + // Alloc GPU array + void *dev_ptr; + UNITTEST_EQ(ark::gpuMalloc(&dev_ptr, shape.nelems() * sizeof(float)), + ark::gpuSuccess); + + // Create an ARK tensor + ark::Model m; + auto tensor = m.tensor(shape, ark::FP32, stride, offset); + m.noop(tensor); + + ark::DefaultExecutor executor(m, 0); + executor.compile(); + executor.launch(); + UNITTEST_GT(executor.tensor_address(tensor), 0); + + // Copy data from CPU array to ARK tensor + executor.tensor_write(tensor, host_data.data(), + shape.nelems() * sizeof(float)); + + // Copy data from ARK tensor to GPU array + executor.tensor_read(tensor, dev_ptr, shape.nelems() * sizeof(float), + nullptr, true); + + // Check the data + std::vector dev_data(shape.nelems()); + executor.tensor_read(tensor, dev_data.data(), + shape.nelems() * sizeof(float)); + for (size_t i = 0; i < dev_data.size(); ++i) { + UNITTEST_EQ(dev_data[i], static_cast(i)); + dev_data[i] = -1; + } + + UNITTEST_EQ( + ark::gpuMemcpy(dev_data.data(), dev_ptr, shape.nelems() * sizeof(float), + ark::gpuMemcpyDeviceToHost), + ark::gpuSuccess); + for (size_t i = 0; i < dev_data.size(); ++i) { + UNITTEST_EQ(dev_data[i], static_cast(i)); + dev_data[i] = -1; + } + + // Copy -1s back to GPU array + UNITTEST_EQ( + ark::gpuMemcpy(dev_ptr, dev_data.data(), shape.nelems() * sizeof(float), + ark::gpuMemcpyHostToDevice), + ark::gpuSuccess); + + // Copy data from GPU array to ARK tensor + executor.tensor_write(tensor, dev_ptr, shape.nelems() * sizeof(float), + nullptr, true); + + // Copy data from ARK tensor to CPU array + executor.tensor_read(tensor, host_data.data(), + shape.nelems() * sizeof(float)); + + // Check the data + for (size_t i = 0; i < host_data.size(); ++i) { + UNITTEST_EQ(host_data[i], -1); + } + + // Provide a stream + ark::gpuStream stream; + UNITTEST_EQ( + ark::gpuStreamCreateWithFlags(&stream, ark::gpuStreamNonBlocking), + ark::gpuSuccess); + executor.tensor_read(tensor, host_data.data(), + shape.nelems() * sizeof(float), stream); + executor.tensor_write(tensor, host_data.data(), + shape.nelems() * sizeof(float), stream); + UNITTEST_EQ(ark::gpuStreamDestroy(stream), ark::gpuSuccess); + + // Invalid copy size + UNITTEST_THROW(executor.tensor_read(tensor, host_data.data(), + shape.nelems() * sizeof(float) + 1), + ark::InvalidUsageError); + UNITTEST_THROW(executor.tensor_write(tensor, host_data.data(), + shape.nelems() * sizeof(float) + 1), + ark::InvalidUsageError); + + executor.stop(); + + UNITTEST_EQ(ark::gpuFree(dev_ptr), ark::gpuSuccess); + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_executor_tensor_read_write_no_stride() { + return test_executor_tensor_read_write({1024}, {}, {}); +} + +ark::unittest::State test_executor_tensor_read_write_stride_offset() { + return test_executor_tensor_read_write({4, 512}, {4, 1024}, {0, 512}); +} + +ark::unittest::State test_executor_invalid() { + // Invalid device ID. + UNITTEST_THROW(ark::Executor(-1, nullptr, "test", ""), + ark::InvalidUsageError); + + // Invalid rank. + ark::PlanJson plan; + plan["Rank"] = 1; + UNITTEST_THROW(ark::Executor(0, nullptr, "test", plan.dump(), true), + ark::InvalidUsageError); + + return ark::unittest::SUCCESS; +} + +int main() { + UNITTEST(test_executor_loop); + UNITTEST(test_executor_no_loop); + UNITTEST(test_executor_tensor_read_write_no_stride); + UNITTEST(test_executor_tensor_read_write_stride_offset); + UNITTEST(test_executor_invalid); + return 0; +} diff --git a/ark/api/planner.cpp b/ark/api/planner.cpp index 22b9b680e..d36f33cbe 100644 --- a/ark/api/planner.cpp +++ b/ark/api/planner.cpp @@ -7,7 +7,7 @@ #include "context_impl.hpp" #include "env.h" #include "file_io.h" -#include "gpu/gpu_manager.h" +#include "gpu/gpu_manager.hpp" #include "model/model_json.hpp" #include "model/model_node.hpp" #include "model/model_op.hpp" diff --git a/ark/codegen.cpp b/ark/codegen.cpp index 6d5d5fc84..54214277d 100644 --- a/ark/codegen.cpp +++ b/ark/codegen.cpp @@ -174,7 +174,7 @@ CodeGenerator::Impl::Impl(const PlanJson &plan, {"@NUM_WARPS_PER_BLOCK@", std::to_string(num_warps_per_proc_)}, {"@DEFINITIONS@", definitions_ss.str()}, {"@BODY@", body_ss.str()}, - {"@NAME@", name_}, + {"@NAME@", (name_.empty() ? "" : "_" + name_)}, }; code_ = replace(template_code, replacements); } diff --git a/ark/gpu/gpu.h b/ark/gpu/gpu.hpp similarity index 98% rename from ark/gpu/gpu.h rename to ark/gpu/gpu.hpp index 2f1eba3ba..531d6c7ee 100644 --- a/ark/gpu/gpu.h +++ b/ark/gpu/gpu.hpp @@ -1,8 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_H_ -#define ARK_GPU_H_ +#ifndef ARK_GPU_HPP_ +#define ARK_GPU_HPP_ #include @@ -125,6 +125,7 @@ ARK_GPU_DEFINE_CONSTANT_ALIAS(gpuPointerAttributeSyncMemops, // runtime API ARK_GPU_DEFINE_FUNC_ALIAS(gpuGetErrorString, cudaGetErrorString, hipGetErrorString); +ARK_GPU_DEFINE_FUNC_ALIAS(gpuGetLastError, cudaGetLastError, hipGetLastError); ARK_GPU_DEFINE_FUNC_ALIAS(gpuDeviceGetAttribute, cudaDeviceGetAttribute, hipDeviceGetAttribute); ARK_GPU_DEFINE_FUNC_ALIAS(gpuDeviceSynchronize, cudaDeviceSynchronize, @@ -183,4 +184,4 @@ ARK_GPU_DEFINE_FUNC_ALIAS(gpuPointerSetAttribute, cuPointerSetAttribute, } // namespace ark -#endif // ARK_GPU_H_ +#endif // ARK_GPU_HPP_ diff --git a/ark/gpu/gpu_compile.cpp b/ark/gpu/gpu_compile.cpp index 4571a9413..21908fa4d 100644 --- a/ark/gpu/gpu_compile.cpp +++ b/ark/gpu/gpu_compile.cpp @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu/gpu_compile.h" +#include "gpu/gpu_compile.hpp" #include #include @@ -22,7 +22,7 @@ #include "cpu_timer.h" #include "env.h" #include "file_io.h" -#include "gpu/gpu_logging.h" +#include "gpu/gpu_logging.hpp" #include "utils/utils_string.hpp" #define ARK_DEBUG_KERNEL 0 diff --git a/ark/gpu/gpu_compile.h b/ark/gpu/gpu_compile.hpp similarity index 78% rename from ark/gpu/gpu_compile.h rename to ark/gpu/gpu_compile.hpp index 58048e78c..8b9e1a9fd 100644 --- a/ark/gpu/gpu_compile.h +++ b/ark/gpu/gpu_compile.hpp @@ -1,8 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_COMPILE_H_ -#define ARK_GPU_COMPILE_H_ +#ifndef ARK_GPU_COMPILE_HPP_ +#define ARK_GPU_COMPILE_HPP_ #include #include @@ -16,4 +16,4 @@ const std::string gpu_compile(const std::vector &codes, } // namespace ark -#endif // ARK_GPU_COMPILE_H_ +#endif // ARK_GPU_COMPILE_HPP_ diff --git a/ark/gpu/gpu_event.cpp b/ark/gpu/gpu_event.cpp index 93ec3fd52..06779b91a 100644 --- a/ark/gpu/gpu_event.cpp +++ b/ark/gpu/gpu_event.cpp @@ -1,11 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu/gpu_event.h" +#include "gpu/gpu_event.hpp" -#include "gpu/gpu.h" -#include "gpu/gpu_logging.h" -#include "gpu/gpu_manager.h" +#include "gpu/gpu_logging.hpp" +#include "gpu/gpu_manager.hpp" namespace ark { class GpuEvent::Impl { @@ -15,7 +14,7 @@ class GpuEvent::Impl { Impl(const Impl&) = delete; Impl& operator=(const Impl&) = delete; - void record(std::shared_ptr stream); + void record(gpuStream stream); float elapsed_msec(const GpuEvent& other) const; private: @@ -32,8 +31,8 @@ GpuEvent::Impl::Impl(bool disable_timing) { GpuEvent::Impl::~Impl() { GLOG(gpuEventDestroy(event_)); } -void GpuEvent::Impl::record(std::shared_ptr stream) { - GLOG(gpuEventRecord(event_, stream->get())); +void GpuEvent::Impl::record(gpuStream stream) { + GLOG(gpuEventRecord(event_, stream)); } float GpuEvent::Impl::elapsed_msec(const GpuEvent& other) const { @@ -45,9 +44,7 @@ float GpuEvent::Impl::elapsed_msec(const GpuEvent& other) const { GpuEvent::GpuEvent(bool disable_timing) : pimpl_(std::make_shared(disable_timing)) {} -void GpuEvent::record(std::shared_ptr stream) { - pimpl_->record(stream); -} +void GpuEvent::record(gpuStream stream) { pimpl_->record(stream); } float GpuEvent::elapsed_msec(const GpuEvent& other) const { return pimpl_->elapsed_msec(other); diff --git a/ark/gpu/gpu_event.h b/ark/gpu/gpu_event.hpp similarity index 78% rename from ark/gpu/gpu_event.h rename to ark/gpu/gpu_event.hpp index 4599ecaa4..bd2a7c952 100644 --- a/ark/gpu/gpu_event.h +++ b/ark/gpu/gpu_event.hpp @@ -1,11 +1,13 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_EVENT_H_ -#define ARK_GPU_EVENT_H_ +#ifndef ARK_GPU_EVENT_HPP_ +#define ARK_GPU_EVENT_HPP_ #include +#include "gpu/gpu.hpp" + namespace ark { class GpuStream; @@ -17,7 +19,7 @@ class GpuEvent { GpuEvent(const GpuEvent &) = delete; GpuEvent &operator=(const GpuEvent &) = delete; - void record(std::shared_ptr stream); + void record(gpuStream stream); float elapsed_msec(const GpuEvent &other) const; protected: @@ -31,4 +33,4 @@ class GpuEvent { }; } // namespace ark -#endif // ARK_GPU_EVENT_H_ +#endif // ARK_GPU_EVENT_HPP_ diff --git a/ark/gpu/gpu_kernel.cpp b/ark/gpu/gpu_kernel.cpp index 44ff43a1d..d4412f80e 100644 --- a/ark/gpu/gpu_kernel.cpp +++ b/ark/gpu/gpu_kernel.cpp @@ -1,50 +1,38 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu_kernel.h" +#include "gpu_kernel.hpp" #include #include -#include "gpu.h" -#include "gpu_compile.h" -#include "gpu_logging.h" -#include "gpu_manager.h" +#include "gpu.hpp" +#include "gpu_compile.hpp" +#include "gpu_logging.hpp" +#include "gpu_manager.hpp" namespace ark { GpuKernel::GpuKernel(int gpu_id, const std::string& code, const std::array& block_dim, const std::array& grid_dim, size_t smem_bytes, - const std::string& kernel_name, - std::initializer_list> args) { - this->init(gpu_id, code, block_dim, grid_dim, smem_bytes, kernel_name, - args); + const std::string& kernel_name) { + this->init(gpu_id, code, block_dim, grid_dim, smem_bytes, kernel_name); } void GpuKernel::init(int gpu_id, const std::string& code, const std::array& block_dim, const std::array& grid_dim, size_t smem_bytes, - const std::string& kernel_name, - std::initializer_list> args) { + const std::string& kernel_name) { gpu_manager_ = GpuManager::get_instance(gpu_id); code_ = code; block_dim_ = block_dim; grid_dim_ = grid_dim; smem_bytes_ = smem_bytes; kernel_name_ = kernel_name; - params_ptr_.resize(args.size()); - args_.resize(args.size()); if (kernel_name_.size() == 0) { ERR(InvalidUsageError, "Invalid kernel name: ", kernel_name_); } - size_t idx = 0; - for (auto& pair : args) { - args_[idx].reset(new uint8_t[pair.second]); - std::memcpy(args_[idx].get(), &(pair.first), pair.second); - params_ptr_[idx] = static_cast(args_[idx].get()); - idx++; - } } void GpuKernel::compile() { @@ -68,12 +56,13 @@ void GpuKernel::compile() { dynamic_smem_size_bytes)); } -void GpuKernel::launch(std::shared_ptr stream) { +void GpuKernel::launch(gpuStream stream, std::vector& args) { if (!this->is_compiled()) { ERR(InvalidUsageError, "Kernel is not compiled yet."); } gpu_manager_->launch(function_, grid_dim_, block_dim_, smem_bytes_, stream, - params_ptr_.data(), nullptr); + args.data(), nullptr); + GLOG(gpuGetLastError()); } gpuDeviceptr GpuKernel::get_global(const std::string& name, diff --git a/ark/gpu/gpu_kernel.h b/ark/gpu/gpu_kernel.hpp similarity index 68% rename from ark/gpu/gpu_kernel.h rename to ark/gpu/gpu_kernel.hpp index c3b60aec4..5308cfead 100644 --- a/ark/gpu/gpu_kernel.h +++ b/ark/gpu/gpu_kernel.hpp @@ -1,13 +1,14 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_KERNEL_H_ -#define ARK_GPU_KERNEL_H_ +#ifndef ARK_GPU_KERNEL_HPP_ +#define ARK_GPU_KERNEL_HPP_ #include #include +#include -#include "gpu_stream.h" +#include "gpu_stream.hpp" namespace ark { @@ -18,16 +19,14 @@ class GpuKernel { GpuKernel(int gpu_id, const std::string& codes, const std::array& block_dim, const std::array& grid_dim, size_t smem_bytes, - const std::string& kernel_name, - std::initializer_list> args = {}); + const std::string& kernel_name); void init(int gpu_id, const std::string& codes, const std::array& block_dim, const std::array& grid_dim, size_t smem_bytes, - const std::string& kernel_name, - std::initializer_list> args = {}); + const std::string& kernel_name); void compile(); - void launch(std::shared_ptr stream); + void launch(gpuStream stream, std::vector& args); gpuDeviceptr get_global(const std::string& name, bool ignore_not_found = false) const; @@ -43,10 +42,8 @@ class GpuKernel { std::string bin_; gpuModule module_; gpuFunction function_ = nullptr; - std::vector params_ptr_; - std::vector> args_; }; } // namespace ark -#endif // ARK_GPU_KERNEL_H_ +#endif // ARK_GPU_KERNEL_HPP_ diff --git a/ark/gpu/gpu_kernel_test.cpp b/ark/gpu/gpu_kernel_test.cpp index 870ad7ab9..342ef9656 100644 --- a/ark/gpu/gpu_kernel_test.cpp +++ b/ark/gpu/gpu_kernel_test.cpp @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu/gpu_kernel.h" +#include "gpu/gpu_kernel.hpp" #include "unittest/unittest_utils.h" @@ -9,7 +9,13 @@ const std::string void_kernel = "extern \"C\" __global__ void kernel() {}"; ark::unittest::State test_gpu_kernel() { ark::GpuKernel kernel(0, void_kernel, {1, 1, 1}, {1, 1, 1}, 0, "kernel"); + UNITTEST_TRUE(!kernel.is_compiled()); kernel.compile(); + UNITTEST_TRUE(kernel.is_compiled()); + std::vector args; + for (int i = 0; i < 10; i++) { + kernel.launch(nullptr, args); + } return ark::unittest::SUCCESS; } diff --git a/ark/gpu/gpu_logging.h b/ark/gpu/gpu_logging.hpp similarity index 92% rename from ark/gpu/gpu_logging.h rename to ark/gpu/gpu_logging.hpp index e67894b9e..9be8f41c4 100644 --- a/ark/gpu/gpu_logging.h +++ b/ark/gpu/gpu_logging.hpp @@ -1,10 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_LOGGING_H_ -#define ARK_GPU_LOGGING_H_ +#ifndef ARK_GPU_LOGGING_HPP_ +#define ARK_GPU_LOGGING_HPP_ -#include "gpu/gpu.h" +#include "gpu/gpu.hpp" #include "logging.hpp" #define GLOG(cmd) \ @@ -29,4 +29,4 @@ } \ } while (0) -#endif // ARK_GPU_LOGGING_H_ +#endif // ARK_GPU_LOGGING_HPP_ diff --git a/ark/gpu/gpu_manager.cpp b/ark/gpu/gpu_manager.cpp index 1aaa365cd..2b5be490b 100644 --- a/ark/gpu/gpu_manager.cpp +++ b/ark/gpu/gpu_manager.cpp @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu/gpu_manager.h" +#include "gpu/gpu_manager.hpp" #include -#include "gpu/gpu_logging.h" +#include "gpu/gpu_logging.hpp" #include "utils/utils_string.hpp" namespace ark { @@ -20,11 +20,10 @@ class GpuManager::Impl { int gpu_id_; GpuManager::Info info_; - std::shared_ptr main_stream_; void launch(gpuFunction kernel, const std::array &grid_dim, const std::array &block_dim, int smem_bytes, - std::shared_ptr stream, void **params, void **extra); + gpuStream stream, void **params, void **extra); }; GpuManager::Impl::Impl(int gpu_id) : gpu_id_(gpu_id) { @@ -78,11 +77,11 @@ GpuManager::Impl::Impl(int gpu_id) : gpu_id_(gpu_id) { void GpuManager::Impl::launch(gpuFunction kernel, const std::array &grid_dim, const std::array &block_dim, - int smem_bytes, std::shared_ptr stream, - void **params, void **extra) { + int smem_bytes, gpuStream stream, void **params, + void **extra) { GLOG_DRV(gpuModuleLaunchKernel( kernel, grid_dim[0], grid_dim[1], grid_dim[2], block_dim[0], - block_dim[1], block_dim[2], smem_bytes, stream->get(), params, extra)); + block_dim[1], block_dim[2], smem_bytes, stream, params, extra)); } std::shared_ptr GpuManager::get_instance(int gpu_id) { @@ -104,9 +103,7 @@ std::shared_ptr GpuManager::get_instance(int gpu_id) { } } -GpuManager::GpuManager(int gpu_id) : pimpl_(std::make_shared(gpu_id)) { - this->pimpl_->main_stream_ = std::shared_ptr(new GpuStream()); -} +GpuManager::GpuManager(int gpu_id) : pimpl_(std::make_shared(gpu_id)) {} std::shared_ptr GpuManager::malloc(size_t bytes, size_t align, bool expose) { @@ -128,8 +125,6 @@ std::shared_ptr GpuManager::create_stream() const { return std::shared_ptr(new GpuStream()); } -int GpuManager::get_gpu_id() const { return pimpl_->gpu_id_; } - const GpuManager::Info &GpuManager::info() const { return pimpl_->info_; } void GpuManager::set_current() const { GLOG(gpuSetDevice(pimpl_->gpu_id_)); } @@ -137,8 +132,7 @@ void GpuManager::set_current() const { GLOG(gpuSetDevice(pimpl_->gpu_id_)); } void GpuManager::launch(gpuFunction function, const std::array &grid_dim, const std::array &block_dim, int smem_bytes, - std::shared_ptr stream, void **params, - void **extra) const { + gpuStream stream, void **params, void **extra) const { this->set_current(); pimpl_->launch(function, grid_dim, block_dim, smem_bytes, stream, params, extra); diff --git a/ark/gpu/gpu_manager.h b/ark/gpu/gpu_manager.hpp similarity index 82% rename from ark/gpu/gpu_manager.h rename to ark/gpu/gpu_manager.hpp index 05014ac47..eeeda4d94 100644 --- a/ark/gpu/gpu_manager.h +++ b/ark/gpu/gpu_manager.hpp @@ -1,16 +1,16 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_MANAGER_H_ -#define ARK_GPU_MANAGER_H_ +#ifndef ARK_GPU_MANAGER_HPP_ +#define ARK_GPU_MANAGER_HPP_ #include #include "arch.hpp" -#include "gpu/gpu.h" -#include "gpu/gpu_event.h" -#include "gpu/gpu_memory.h" -#include "gpu/gpu_stream.h" +#include "gpu/gpu.hpp" +#include "gpu/gpu_event.hpp" +#include "gpu/gpu_memory.hpp" +#include "gpu/gpu_stream.hpp" namespace ark { @@ -30,11 +30,9 @@ class GpuManager { std::shared_ptr create_event(bool disable_timing = false) const; std::shared_ptr create_stream() const; - int get_gpu_id() const; void launch(gpuFunction function, const std::array &grid_dim, const std::array &block_dim, int smem_bytes, - std::shared_ptr stream, void **params, - void **extra) const; + gpuStream stream, void **params, void **extra) const; struct Info; const Info &info() const; @@ -64,4 +62,4 @@ class GpuManager { } // namespace ark -#endif // ARK_GPU_MANAGER_H_ +#endif // ARK_GPU_MANAGER_HPP_ diff --git a/ark/gpu/gpu_memory.cpp b/ark/gpu/gpu_memory.cpp index 446222e24..fac8d3672 100644 --- a/ark/gpu/gpu_memory.cpp +++ b/ark/gpu/gpu_memory.cpp @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu/gpu_memory.h" +#include "gpu/gpu_memory.hpp" -#include "gpu/gpu.h" -#include "gpu/gpu_logging.h" -#include "gpu/gpu_manager.h" +#include "gpu/gpu.hpp" +#include "gpu/gpu_logging.hpp" +#include "gpu/gpu_manager.hpp" namespace ark { diff --git a/ark/gpu/gpu_memory.h b/ark/gpu/gpu_memory.hpp similarity index 87% rename from ark/gpu/gpu_memory.h rename to ark/gpu/gpu_memory.hpp index cd7a6f04f..6b277d40b 100644 --- a/ark/gpu/gpu_memory.h +++ b/ark/gpu/gpu_memory.hpp @@ -1,13 +1,13 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_MEMORY_H_ -#define ARK_GPU_MEMORY_H_ +#ifndef ARK_GPU_MEMORY_HPP_ +#define ARK_GPU_MEMORY_HPP_ #include #include -#include "gpu/gpu.h" +#include "gpu/gpu.hpp" namespace ark { @@ -40,7 +40,7 @@ class GpuHostMemory { GpuHostMemory(const GpuHostMemory&) = delete; GpuHostMemory& operator=(const GpuHostMemory&) = delete; - template + template T* ref() const { return reinterpret_cast(ptr_); } @@ -54,4 +54,4 @@ class GpuHostMemory { } // namespace ark -#endif // ARK_GPU_MEMORY_H_ +#endif // ARK_GPU_MEMORY_HPP_ diff --git a/ark/gpu/gpu_stream.cpp b/ark/gpu/gpu_stream.cpp index 52502365a..17d4e21f5 100644 --- a/ark/gpu/gpu_stream.cpp +++ b/ark/gpu/gpu_stream.cpp @@ -1,10 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "gpu/gpu_stream.h" +#include "gpu/gpu_stream.hpp" -#include "gpu/gpu_logging.h" -#include "gpu/gpu_manager.h" +#include "gpu/gpu_logging.hpp" +#include "gpu/gpu_manager.hpp" namespace ark { class GpuStream::Impl { diff --git a/ark/gpu/gpu_stream.h b/ark/gpu/gpu_stream.hpp similarity index 79% rename from ark/gpu/gpu_stream.h rename to ark/gpu/gpu_stream.hpp index e76f01827..9d8775f95 100644 --- a/ark/gpu/gpu_stream.h +++ b/ark/gpu/gpu_stream.hpp @@ -1,12 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef ARK_GPU_STREAM_H_ -#define ARK_GPU_STREAM_H_ +#ifndef ARK_GPU_STREAM_HPP_ +#define ARK_GPU_STREAM_HPP_ #include -#include "gpu/gpu.h" +#include "gpu/gpu.hpp" namespace ark { @@ -30,4 +30,4 @@ class GpuStream { }; } // namespace ark -#endif // ARK_GPU_STREAM_H_ +#endif // ARK_GPU_STREAM_HPP_ diff --git a/ark/include/ark/executor.hpp b/ark/include/ark/executor.hpp index 4682af7d0..14ca87618 100644 --- a/ark/include/ark/executor.hpp +++ b/ark/include/ark/executor.hpp @@ -5,6 +5,7 @@ #define ARK_EXECUTOR_HPP #include +#include #include #include #include @@ -12,21 +13,33 @@ namespace ark { +using Stream = void *; + /// Convenience class for executing a model. class Executor { public: /// Constructor. - Executor(int rank, int world_size, int gpu_id, const std::string &name, - const std::string &plan); + Executor(int device_id, Stream stream, const std::string &name, + const std::string &plan, bool loop_mode = true); + /// Destructor. ~Executor(); + /// Return the device ID. + int device_id() const; + + /// Return the stream of the executor. + Stream stream() const; + + /// Return the plan string. + std::string plan() const; + /// Compile the model. This must be called before `launch()`. void compile(); /// Launch the model (not running yet). This must be called after /// `compile()`. - void launch(int64_t max_spin_count = -1); + void launch(); /// Run the model for `iter` iterations. void run(int iter); @@ -39,30 +52,39 @@ class Executor { /// again. float stop(int64_t max_spin_count = -1); + /// Barrier for all rank executors. void barrier(); + /// Destroy the executor. void destroy(); + /// Return whether the executor is destroyed. bool destroyed() const; + /// Return the raw virtual address of the tensor. + uintptr_t tensor_address(const Tensor &tensor) const; + template - void tensor_read(const Tensor tensor, std::vector &data) const { + void tensor_read(const Tensor &tensor, std::vector &data, + Stream stream = nullptr) const { tensor_read(tensor, reinterpret_cast(data.data()), - data.size() * sizeof(T)); + data.size() * sizeof(T), stream); } template - void tensor_write(const Tensor tensor, const std::vector &data) const { + void tensor_write(const Tensor &tensor, const std::vector &data, + Stream stream = nullptr) const { tensor_write(tensor, reinterpret_cast(data.data()), - data.size() * sizeof(T)); + data.size() * sizeof(T), stream); } - void tensor_read(const Tensor tensor, void *data, size_t bytes) const; + void tensor_read(const Tensor &tensor, void *data, size_t bytes, + Stream stream = nullptr, bool is_d2d = false) const; - void tensor_write(const Tensor tensor, const void *data, - size_t bytes) const; + void tensor_write(const Tensor &tensor, const void *data, size_t bytes, + Stream stream = nullptr, bool is_d2d = false) const; - private: + protected: class Impl; std::unique_ptr impl_; }; @@ -71,8 +93,10 @@ class Model; class DefaultExecutor : public Executor { public: - DefaultExecutor(const Model &model, int gpu_id = -1, - const std::string &name = "DefaultExecutor"); + DefaultExecutor( + const Model &model, int device_id = -1, Stream stream = nullptr, + const std::vector &config_rules = {}, + const std::string &name = "DefaultExecutor", bool loop_mode = true); }; } // namespace ark diff --git a/ark/include/kernels/kernel_template.in b/ark/include/kernels/kernel_template.in index ea1862920..a8a56f141 100644 --- a/ark/include/kernels/kernel_template.in +++ b/ark/include/kernels/kernel_template.in @@ -33,12 +33,12 @@ __device__ sync::State ARK_LOOP_SYNC_STATE; @DEFINITIONS@ -__device__ void ark_loop_body(char *_buf, int _iter) { +__device__ void ark_body(char *_buf, int _iter) { @BODY@ } extern "C" __global__ __launch_bounds__(ARK_WARPS_PER_BLOCK * Arch::ThreadsPerWarp, 1) -void @NAME@(char *_buf, int *_iter) { +void ark_loop_kernel@NAME@(char *_buf, int *_iter) { int *shared_mem = (int *)_ARK_SMEM; for (int i = threadIdx.x; i < ARK_SMEM_RESERVED_BYTES / sizeof(int); i += blockDim.x) { shared_mem[i] = 0; @@ -52,10 +52,10 @@ void @NAME@(char *_buf, int *_iter) { sync_gpu<@NUM_BLOCKS@>(ARK_LOOP_SYNC_STATE); if (ARK_ITER < 0) return; - ark_loop_body(_buf, 0); + ark_body(_buf, 0); for (int _i = 1; _i < ARK_ITER; ++_i) { sync_gpu<@NUM_BLOCKS@>(ARK_LOOP_SYNC_STATE); - ark_loop_body(_buf, _i); + ark_body(_buf, _i); } if (threadIdx.x == 0) { __threadfence_system(); @@ -67,3 +67,12 @@ void @NAME@(char *_buf, int *_iter) { sync_gpu<@NUM_BLOCKS@>(ARK_LOOP_SYNC_STATE); } } + +extern "C" __global__ __launch_bounds__(ARK_WARPS_PER_BLOCK * Arch::ThreadsPerWarp, 1) +void ark_kernel@NAME@(char *_buf, int _iter) { + int *shared_mem = (int *)_ARK_SMEM; + for (int i = threadIdx.x; i < ARK_SMEM_RESERVED_BYTES / sizeof(int); i += blockDim.x) { + shared_mem[i] = 0; + } + ark_body(_buf, _iter); +} diff --git a/ark/model/model_json.cpp b/ark/model/model_json.cpp index b82f9e484..c2099e2c9 100644 --- a/ark/model/model_json.cpp +++ b/ark/model/model_json.cpp @@ -287,6 +287,7 @@ PlanJson::PlanJson(const Json &json) : Json((json != nullptr) ? json : Json{{"Rank", 0}, {"WorldSize", 1}, + {"Architecture", "ANY"}, {"NumProcessors", 1}, {"NumWarpsPerProcessor", 1}, {"TaskInfos", Json::array()}, diff --git a/ark/ops/ops_all_reduce_test.cpp b/ark/ops/ops_all_reduce_test.cpp index a37d211f7..8cf68b085 100644 --- a/ark/ops/ops_all_reduce_test.cpp +++ b/ark/ops/ops_all_reduce_test.cpp @@ -35,10 +35,9 @@ void test_all_reduce_internal(ark::DimType nelem) { std::vector ones_vec(ones.shape().nelems(), ark::half_t(1.0f)); - auto result = - ark::op_test("all_reduce", m, {ones}, {output}, - baseline_all_reduce, - {ones_vec.data()}, false, gpu_id, NumGpus); + auto result = ark::op_test( + "all_reduce", m, {ones}, {output}, + baseline_all_reduce, {ones_vec.data()}); UNITTEST_LOG(result); UNITTEST_EQ(result.max_diff[0], 0.0f); return ark::unittest::SUCCESS; @@ -126,10 +125,9 @@ void test_all_reduce_packet_internal(ark::DimType nelem) { std::vector ones_vec(ones.shape().nelems(), ark::half_t(1.0f)); - auto result = - ark::op_test("all_reduce_packet", m, {ones}, {output}, - baseline_all_reduce, - {ones_vec.data()}, false, gpu_id, NumGpus); + auto result = ark::op_test( + "all_reduce_packet", m, {ones}, {output}, + baseline_all_reduce, {ones_vec.data()}); UNITTEST_LOG(result); UNITTEST_EQ(result.max_diff[0], 0.0f); return ark::unittest::SUCCESS; @@ -233,10 +231,10 @@ void test_all_reduce_sm_internal(ark::DimType nelem) { std::vector ones_vec(ones.shape().nelems(), ark::half_t(1.0f)); - auto result = ark::op_test( - "all_reduce_sm", m, {ones}, {output}, - baseline_all_reduce, {ones_vec.data()}, - false, gpu_id, NumGpus, config_rule); + auto result = + ark::op_test("all_reduce_sm", m, {ones}, {output}, + baseline_all_reduce, + {ones_vec.data()}, {config_rule}); UNITTEST_LOG(result); UNITTEST_EQ(result.max_diff[0], 0.0f); return ark::unittest::SUCCESS; diff --git a/ark/ops/ops_communication_test.cpp b/ark/ops/ops_communication_test.cpp index dec310331..8cdad41b2 100644 --- a/ark/ops/ops_communication_test.cpp +++ b/ark/ops/ops_communication_test.cpp @@ -231,9 +231,7 @@ ark::unittest::State test_communication_send_recv_bidir_sm() { ark::Tensor tns2 = model.identity(tns2_data, {tns}); tns2 = model.recv(tns2_data, remote_gpu_id, tag); - ark::Planner planner(model, gpu_id); - planner.install_config_rule(config_rule); - ark::Executor exe(gpu_id, 2, gpu_id, "Executor", planner.plan()); + ark::DefaultExecutor exe(model, gpu_id, nullptr, {config_rule}); exe.compile(); std::vector data(1024); @@ -277,9 +275,7 @@ ark::unittest::State test_communication_send_recv_bidir_sm() { ark::Tensor sum = model.add(tns2, tns_data); - ark::Planner planner(model, gpu_id); - planner.install_config_rule(config_rule); - ark::Executor exe(gpu_id, 2, gpu_id, "Executor", planner.plan()); + ark::DefaultExecutor exe(model, gpu_id, nullptr, {config_rule}); exe.compile(); std::vector data(1024); @@ -437,7 +433,7 @@ ark::unittest::State test_communication_send_recv_reduce() { ark::Planner planner(model, gpu_id); planner.install_config_rule(config_rule); - ark::Executor exe(gpu_id, 2, gpu_id, "Executor", planner.plan()); + ark::Executor exe(gpu_id, nullptr, "Executor", planner.plan()); exe.compile(); std::vector data(1024); diff --git a/ark/ops/ops_embedding_test.cpp b/ark/ops/ops_embedding_test.cpp index a458ae7b2..222605296 100644 --- a/ark/ops/ops_embedding_test.cpp +++ b/ark/ops/ops_embedding_test.cpp @@ -78,9 +78,9 @@ ark::unittest::State test_embedding() { } else if (std::is_same::value) { type_str = "bf16"; } - auto result = ark::op_test("embedding_" + type_str, m, {ti, tw}, {to}, - baseline_embedding, - {ti_data.data(), tw_data.data()}, true); + auto result = + ark::op_test("embedding_" + type_str, m, {ti, tw}, {to}, + baseline_embedding, {ti_data.data(), tw_data.data()}); UNITTEST_LOG(result); UNITTEST_EQ(result.max_diff[0], 0.0f); return ark::unittest::SUCCESS; diff --git a/ark/ops/ops_matmul_test.cpp b/ark/ops/ops_matmul_test.cpp index b86a4bc3e..11682ca49 100644 --- a/ark/ops/ops_matmul_test.cpp +++ b/ark/ops/ops_matmul_test.cpp @@ -3,7 +3,7 @@ #include -#include "gpu/gpu.h" +#include "gpu/gpu.hpp" #include "logging.hpp" #include "model/model_node.hpp" #include "model/model_op.hpp" diff --git a/ark/ops/ops_test_common.cpp b/ark/ops/ops_test_common.cpp index 0e8f215ae..4e94d06a7 100644 --- a/ark/ops/ops_test_common.cpp +++ b/ark/ops/ops_test_common.cpp @@ -10,7 +10,7 @@ #include "ark/planner.hpp" #include "ark/random.hpp" #include "env.h" -#include "gpu/gpu_logging.h" +#include "gpu/gpu_logging.hpp" #include "logging.hpp" #include "model/model_data_type.hpp" #include "model/model_tensor.hpp" @@ -31,16 +31,13 @@ std::ostream &operator<<(std::ostream &os, const OpsTestResult &result) { return os; } -OpsTestResult op_test(const std::string &test_name_prefix, const Model &model, - const std::vector &inputs, - const std::vector &outputs, - OpsTestBaseline baseline, - const std::vector &inputs_data, - bool print_on_error, int rank, int world_size, - Planner::ConfigRule config_rule) { - Planner planner(model, rank); - planner.install_config_rule(config_rule); - Executor exe(rank, world_size, rank, "Executor", planner.plan()); +OpsTestResult op_test( + const std::string &test_name_prefix, const Model &model, + const std::vector &inputs, const std::vector &outputs, + OpsTestBaseline baseline, const std::vector &inputs_data, + const std::vector &config_rules, + bool print_on_error) { + DefaultExecutor exe(model, -1, nullptr, config_rules); exe.compile(); std::vector>> inputs_data_storages; @@ -136,7 +133,8 @@ OpsTestResult op_test(const std::string &test_name_prefix, const Model &model, for (auto t : gt) { gt_ptrs.push_back(t->data()); } - baseline(gt_ptrs, output_shapes, inputs_data_refs, input_shapes, rank); + baseline(gt_ptrs, output_shapes, inputs_data_refs, input_shapes, + model.rank()); std::stringstream test_name; test_name << test_name_prefix; @@ -150,6 +148,7 @@ OpsTestResult op_test(const std::string &test_name_prefix, const Model &model, OpsTestResult result; result.test_name = test_name.str(); + result.plan = exe.plan(); // Compare results with the ground truth. for (size_t i = 0; i < outputs.size(); i++) { @@ -190,7 +189,7 @@ OpsTestResult op_test(const std::string &test_name_prefix, const Model &model, GLOG(gpuDeviceSynchronize()); // Throughput test. - if (world_size > 1) { + if (model.world_size() > 1) { // For multi-GPU, we need to make sure that all GPUs run the same // number of iterations. Rather than doing allgather, we just // use a magic number here. diff --git a/ark/ops/ops_test_common.hpp b/ark/ops/ops_test_common.hpp index e5b6c4f8e..3848773e6 100644 --- a/ark/ops/ops_test_common.hpp +++ b/ark/ops/ops_test_common.hpp @@ -134,6 +134,7 @@ TensorCompareResult tensor_compare(T *ground_truth, T *res, Dims shape, struct OpsTestResult { std::string test_name; + std::string plan; int iter; float msec_per_iter; std::vector mse; @@ -166,14 +167,12 @@ using OpsTestBaseline = std::function &inputs, - const std::vector &outputs, - OpsTestBaseline baseline, - const std::vector &inputs_data = {}, - bool print_on_error = false, int rank = 0, - int world_size = 1, - Planner::ConfigRule config_rule = nullptr); +OpsTestResult op_test( + const std::string &test_name_prefix, const Model &model, + const std::vector &inputs, const std::vector &outputs, + OpsTestBaseline baseline, const std::vector &inputs_data = {}, + const std::vector &config_rules = {}, + bool print_on_error = false); OpsTestGpuMem to_gpu(void *host_ptr, size_t size); diff --git a/python/ark/runtime.py b/python/ark/runtime.py index ab844708e..495fc1c24 100644 --- a/python/ark/runtime.py +++ b/python/ark/runtime.py @@ -77,6 +77,8 @@ def launch( self, plan: Plan = None, device_id: int = 0, + stream: int = 0, + loop_mode: bool = True, ): """ Create an executor and schedule the ARK model. The scheduler will generate @@ -96,11 +98,11 @@ def launch( _RuntimeState.executor.destroy() _RuntimeState.executor = Executor( - plan.rank, - plan.world_size, device_id, + stream, "ArkRuntime", str(plan), + loop_mode, ) self.executor = _RuntimeState.executor self.executor.compile() diff --git a/python/ark/tensor.py b/python/ark/tensor.py index 316d18566..d69f2aabc 100644 --- a/python/ark/tensor.py +++ b/python/ark/tensor.py @@ -48,7 +48,9 @@ def dtype(self) -> DataType: """ return DataType.from_ctype(self._tensor.data_type()) - def to_numpy(self, ndarray: np.ndarray = None) -> np.ndarray: + def to_numpy( + self, ndarray: np.ndarray = None, stream: int = 0 + ) -> np.ndarray: """ Copy a tensor from device to host. If `ndarray` is None, a new numpy array will be created. If the tensor is not allocated, @@ -68,10 +70,10 @@ def to_numpy(self, ndarray: np.ndarray = None) -> np.ndarray: raise ValueError("ndarray dtype does not match the tensor") elif ndarray.nbytes != self.nelems() * self.dtype().element_size(): raise ValueError("ndarray size does not match the tensor") - rt.executor.tensor_read(self._tensor, ndarray) + rt.executor.tensor_read(self._tensor, ndarray, stream) return ndarray - def from_numpy(self, ndarray: np.ndarray) -> "Tensor": + def from_numpy(self, ndarray: np.ndarray, stream: int = 0) -> "Tensor": """ Copies the tensor from a host numpy array to the device. """ @@ -86,7 +88,7 @@ def from_numpy(self, ndarray: np.ndarray) -> "Tensor": ndarray = np.ascontiguousarray(ndarray) if ndarray.nbytes != self.nelems() * self.dtype().element_size(): raise ValueError("ndarray size does not match the tensor") - rt.executor.tensor_write(self._tensor, ndarray) + rt.executor.tensor_write(self._tensor, ndarray, stream) return self diff --git a/python/executor_py.cpp b/python/executor_py.cpp index dc2840329..b1e468608 100644 --- a/python/executor_py.cpp +++ b/python/executor_py.cpp @@ -11,33 +11,76 @@ namespace py = pybind11; static void tensor_write(ark::Executor *exe, const ark::Tensor &tensor, - py::buffer host_buffer) { + py::buffer host_buffer, uintptr_t stream) { py::buffer_info info = host_buffer.request(); exe->tensor_write(tensor, reinterpret_cast(info.ptr), - info.size * info.itemsize); + info.size * info.itemsize, + reinterpret_cast(stream), false); +} + +static void tensor_write(ark::Executor *exe, const ark::Tensor &tensor, + size_t address, size_t bytes, uintptr_t stream, + bool is_d2d) { + exe->tensor_write(tensor, reinterpret_cast(address), bytes, + reinterpret_cast(stream), is_d2d); } static void tensor_read(ark::Executor *exe, const ark::Tensor &tensor, - py::buffer host_buffer) { + py::buffer host_buffer, uintptr_t stream) { py::buffer_info info = host_buffer.request(); exe->tensor_read(tensor, reinterpret_cast(info.ptr), - info.size * info.itemsize); + info.size * info.itemsize, + reinterpret_cast(stream), false); +} + +static void tensor_read(ark::Executor *exe, const ark::Tensor &tensor, + size_t address, size_t bytes, uintptr_t stream, + bool is_d2d) { + exe->tensor_read(tensor, reinterpret_cast(address), bytes, + reinterpret_cast(stream), is_d2d); } void register_executor(py::module &m) { py::class_(m, "_Executor") - .def( - py::init(), - py::arg("rank"), py::arg("world_size"), py::arg("gpu_id"), - py::arg("name"), py::arg("plan")) + .def(py::init([](int device_id, uintptr_t stream, + const std::string &name, const std::string &plan, + bool loop_mode) { + return new ark::Executor(device_id, + reinterpret_cast(stream), + name, plan, loop_mode); + })) + .def("device_id", &ark::Executor::device_id) + .def("stream", + [](ark::Executor *self) { + return reinterpret_cast(self->stream()); + }) + .def("plan", &ark::Executor::plan) .def("compile", &ark::Executor::compile) - .def("launch", &ark::Executor::launch, py::arg("max_spin_count") = -1) + .def("launch", &ark::Executor::launch) .def("run", &ark::Executor::run, py::arg("iter")) .def("wait", &ark::Executor::wait, py::arg("max_spin_count") = -1) .def("stop", &ark::Executor::stop, py::arg("max_spin_count") = -1) .def("barrier", &ark::Executor::barrier) .def("destroy", &ark::Executor::destroy) .def("destroyed", &ark::Executor::destroyed) - .def("tensor_read", &tensor_read, py::arg("tensor"), py::arg("data")) - .def("tensor_write", &tensor_write, py::arg("tensor"), py::arg("data")); + .def("tensor_address", &ark::Executor::tensor_address, + py::arg("tensor")) + .def("tensor_read", + py::overload_cast(&tensor_read), + py::arg("tensor"), py::arg("data"), py::arg("stream")) + .def("tensor_read", + py::overload_cast(&tensor_read), + py::arg("tensor"), py::arg("address"), py::arg("bytes"), + py::arg("stream"), py::arg("is_d2d")) + .def("tensor_write", + py::overload_cast(&tensor_write), + py::arg("tensor"), py::arg("data"), py::arg("stream")) + .def("tensor_write", + py::overload_cast(&tensor_write), + py::arg("tensor"), py::arg("address"), py::arg("bytes"), + py::arg("stream"), py::arg("is_d2d")); }