Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into audio_resampling_gp…
Browse files Browse the repository at this point in the history
…u_op

Signed-off-by: Joaquin Anton <[email protected]>
  • Loading branch information
jantonguirao committed May 18, 2022
2 parents 934c17e + 6c844cc commit 4e08539
Show file tree
Hide file tree
Showing 16 changed files with 831 additions and 235 deletions.
6 changes: 3 additions & 3 deletions dali/operators/audio/resample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "dali/operators/audio/resample.h"
#include <map>
#include <vector>
#include "dali/operators/audio/resample.h"
#include "dali/operators/audio/resampling_params.h"
#include "dali/core/convert.h"
#include "dali/kernels/kernel_params.h"
#include "dali/kernels/signal/resampling_cpu.h"
#include "dali/core/convert.h"
#include "dali/operators/audio/resampling_params.h"

namespace dali {

Expand Down
202 changes: 114 additions & 88 deletions dali/pipeline/operator/eager_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>


#include "dali/core/cuda_stream_pool.h"
#include "dali/core/nvtx.h"
#include "dali/pipeline/data/backend.h"
#include "dali/pipeline/data/tensor_list.h"
#include "dali/pipeline/operator/op_spec.h"
#include "dali/pipeline/operator/operator.h"
Expand All @@ -35,71 +36,114 @@
namespace dali {

template <typename Backend>
std::shared_ptr<TensorList<Backend>> AsTensorList(const std::shared_ptr<TensorList<Backend>> &in) {
std::shared_ptr<TensorList<Backend>> AsTensorList(std::shared_ptr<TensorList<Backend>> in) {
return in;
}

template <typename Backend>
std::shared_ptr<TensorList<Backend>> AsTensorList(
const std::shared_ptr<TensorVector<Backend>> &in) {
std::shared_ptr<TensorList<Backend>> AsTensorList(std::shared_ptr<TensorVector<Backend>> in) {
if (in->IsContiguous()) {
// Filled contiguous TensorVector, we can return TensorList directly.
return in->AsTensorList(false);
auto tl = in->AsTensorList(false);
// Explicitly set layout (it could be empty in case of per-sample operators).
tl->SetLayout(in->GetLayout());
return tl;
}

auto tl = std::make_shared<TensorList<Backend>>();
tl->Copy(*in);
return tl;
}

template <typename StorageType>
void MakeContiguous(std::shared_ptr<StorageType> storage) {}

template <>
void MakeContiguous(std::shared_ptr<TensorVector<CPUBackend>> storage) {
storage->SetContiguous(true);
}

template <typename Backend>
struct Backend2Types {};

template <>
struct Backend2Types<CPUBackend> {
using InBackend = CPUBackend;
using OutBackend = CPUBackend;
using WSInputType = TensorVector<CPUBackend>;
using WSOutputType = TensorVector<CPUBackend>;
static const char name[8];
};

template <>
struct Backend2Types<GPUBackend> {
using InBackend = GPUBackend;
using OutBackend = GPUBackend;
using WSInputType = TensorList<GPUBackend>;
using WSOutputType = TensorList<GPUBackend>;
static const char name[8];
};

template <>
struct Backend2Types<MixedBackend> {
using InBackend = CPUBackend;
using OutBackend = GPUBackend;
using WSInputType = TensorVector<CPUBackend>;
using WSOutputType = TensorList<GPUBackend>;
static const char name[8];
};

const char Backend2Types<CPUBackend>::name[] = "CPU";
const char Backend2Types<GPUBackend>::name[] = "GPU";
const char Backend2Types<MixedBackend>::name[] = "Mixed";

/**
* @brief Direct operator providing eager execution of an operator in Run.
*/
template <typename Backend>
class DLL_PUBLIC EagerOperator {
using InBackend = typename Backend2Types<Backend>::InBackend;
using OutBackend = typename Backend2Types<Backend>::OutBackend;
using WSInputType = typename Backend2Types<Backend>::WSInputType;
using WSOutputType = typename Backend2Types<Backend>::WSOutputType;

public:
DLL_PUBLIC inline EagerOperator(const OpSpec &spec) : EagerOperator(spec, spec.name()) {}

DLL_PUBLIC inline EagerOperator(const OpSpec &spec, std::string name)
: EagerOperator(spec, std::move(name), shared_thread_pool->NumThreads()) {}

DLL_PUBLIC inline EagerOperator(const OpSpec &spec, std::string name, int num_threads)
: max_batch_size_(spec.GetArgument<int>("max_batch_size")),
op_spec_(spec),
name_(std::move(name)),
op_(InstantiateOperator(spec)) {
name_(std::move(name)) {
op_spec_.AddArg("num_threads", num_threads);
op_ = InstantiateOperator(op_spec_);
num_outputs_ = op_spec_.GetSchema().CalculateOutputs(op_spec_) +
op_spec_.GetSchema().CalculateAdditionalOutputs(op_spec_);
}

// Runs operator using shared thread pool and shared CUDA stream.
template <typename InBackend, typename OutBackend>
DLL_PUBLIC std::vector<std::shared_ptr<TensorList<OutBackend>>> Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
int batch_size = -1) {
DALI_FAIL("Unsupported backends in EagerOperator.Run().");
}
int batch_size = -1);

// Runs operator using specified thread pool.
template <typename InBackend, typename OutBackend>
DLL_PUBLIC std::vector<std::shared_ptr<TensorList<OutBackend>>> Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
ThreadPool *tp, int batch_size = -1) {
DALI_FAIL("Unsupported backends in EagerOperator.Run() with thread pool.");
}
ThreadPool *tp, int batch_size = -1);

// Runs operator using specified CUDA stream.
template <typename InBackend, typename OutBackend>
DLL_PUBLIC std::vector<std::shared_ptr<TensorList<OutBackend>>> Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
CUDAStreamLease &cuda_stream, int batch_size = -1) {
DALI_FAIL("Unsupported backends in EagerOperator.Run() with CUDA stream");
}
CUDAStreamLease &cuda_stream, int batch_size = -1);

// Update shared thread pool used for all direct operators.
DLL_PUBLIC inline static void UpdateThreadPool(int num_threads, int device_id,
bool set_affinity) {
shared_thread_pool = std::make_unique<ThreadPool>(num_threads, device_id, set_affinity);
DLL_PUBLIC inline static void UpdateThreadPool(int num_threads) {
shared_thread_pool = std::make_unique<ThreadPool>(num_threads, CPU_ONLY_DEVICE_ID, false);
}

// Update shared CUDA stream used for all direct operators.
Expand All @@ -111,7 +155,6 @@ class DLL_PUBLIC EagerOperator {
}

private:
template <typename InBackend, typename OutBackend, typename WSInputType, typename WSOutputType>
std::vector<std::shared_ptr<TensorList<OutBackend>>> RunImpl(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
Expand All @@ -122,6 +165,11 @@ class DLL_PUBLIC EagerOperator {
", instance name: \"", name_, "\", encountered:\n", what);
}

static inline int GetDefaultNumThreads() {
int num_cores = std::thread::hardware_concurrency();
return num_cores < 6 ? num_cores : 6;
}

int max_batch_size_;
size_t num_outputs_;
workspace_t<Backend> ws_;
Expand All @@ -133,86 +181,63 @@ class DLL_PUBLIC EagerOperator {
static std::unique_ptr<ThreadPool> shared_thread_pool;
};

template <>
template <typename Backend>
std::vector<std::shared_ptr<TensorList<typename EagerOperator<Backend>::OutBackend>>>
EagerOperator<Backend>::Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
int batch_size) {
return Run(inputs, kwargs, shared_cuda_stream, batch_size);
}

template <>
std::vector<std::shared_ptr<TensorList<CPUBackend>>> EagerOperator<CPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
ThreadPool *thread_pool, int batch_size) {
try {
DomainTimeRange tr("[DALI][CPU op] " + name_, DomainTimeRange::kBlue1);
ws_.Clear();
ws_.SetThreadPool(thread_pool);

return RunImpl<CPUBackend, CPUBackend, TensorVector<CPUBackend>, TensorVector<CPUBackend>>(
inputs, kwargs, batch_size);
} catch (std::exception &e) { throw std::runtime_error(ExtendErrorMsg("CPU", e.what())); }
int batch_size) {
return Run(inputs, kwargs, shared_thread_pool.get(), batch_size);
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<GPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<GPUBackend>>> &inputs,
template <typename Backend>
std::vector<std::shared_ptr<TensorList<typename EagerOperator<Backend>::OutBackend>>>
EagerOperator<Backend>::Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
CUDAStreamLease &cuda_stream, int batch_size) {
ThreadPool *thread_pool, int batch_size) {
try {
DomainTimeRange tr("[DALI][GPU op] " + name_, DomainTimeRange::knvGreen);
DomainTimeRange tr("[DALI][" + std::string(Backend2Types<Backend>::name) + " op] " + name_,
DomainTimeRange::kBlue1);
ws_.Clear();
ws_.set_stream(cuda_stream);
auto output = RunImpl<GPUBackend, GPUBackend, TensorList<GPUBackend>, TensorList<GPUBackend>>(
inputs, kwargs, batch_size);
CUDA_CALL(cudaStreamSynchronize(cuda_stream));
return output;
} catch (std::exception &e) { throw std::runtime_error(ExtendErrorMsg("GPU", e.what())); }
ws_.SetThreadPool(thread_pool);

return RunImpl(inputs, kwargs, batch_size);
} catch (std::exception &e) {
throw std::runtime_error(ExtendErrorMsg(Backend2Types<Backend>::name, e.what()));
}
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<MixedBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
template <typename Backend>
std::vector<std::shared_ptr<TensorList<typename EagerOperator<Backend>::OutBackend>>>
EagerOperator<Backend>::Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
CUDAStreamLease &cuda_stream, int batch_size) {
try {
DomainTimeRange tr("[DALI][Mixed op] " + name_, DomainTimeRange::kOrange);
DomainTimeRange tr("[DALI][" + std::string(Backend2Types<Backend>::name) + " op] " + name_,
DomainTimeRange::knvGreen);
ws_.Clear();
ws_.set_stream(cuda_stream);
auto output = RunImpl<CPUBackend, GPUBackend, TensorVector<CPUBackend>, TensorList<GPUBackend>>(
inputs, kwargs, batch_size);
auto output = RunImpl(inputs, kwargs, batch_size);
CUDA_CALL(cudaStreamSynchronize(cuda_stream));
return output;
} catch (std::exception &e) { throw std::runtime_error(ExtendErrorMsg("Mixed", e.what())); }
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<CPUBackend>>> EagerOperator<CPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
int batch_size) {
return Run<CPUBackend, CPUBackend>(inputs, kwargs, shared_thread_pool.get(), batch_size);
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<GPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<GPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
int batch_size) {
return Run<GPUBackend, GPUBackend>(inputs, kwargs, shared_cuda_stream, batch_size);
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<MixedBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
int batch_size) {
return Run<CPUBackend, GPUBackend>(inputs, kwargs, shared_cuda_stream, batch_size);
} catch (std::exception &e) {
throw std::runtime_error(ExtendErrorMsg(Backend2Types<Backend>::name, e.what()));
}
}

template <typename Backend>
template <typename InBackend, typename OutBackend, typename WSInputType, typename WSOutputType>
std::vector<std::shared_ptr<TensorList<OutBackend>>> EagerOperator<Backend>::RunImpl(
std::vector<std::shared_ptr<TensorList<typename EagerOperator<Backend>::OutBackend>>>
EagerOperator<Backend>::RunImpl(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
int batch_size) {
Expand Down Expand Up @@ -250,12 +275,12 @@ std::vector<std::shared_ptr<TensorList<OutBackend>>> EagerOperator<Backend>::Run
}

std::vector<OutputDesc> output_desc{};
std::vector<std::shared_ptr<TensorList<OutBackend>>> outputs{};

outputs.reserve(num_outputs_);
std::vector<std::shared_ptr<TensorList<OutBackend>>> outputs(num_outputs_);

for (size_t i = 0; i < num_outputs_; ++i) {
ws_.AddOutput(std::make_shared<WSOutputType>(max_batch_size_));
auto tensor_out = std::make_shared<WSOutputType>(batch_size);
MakeContiguous(tensor_out);
ws_.AddOutput(tensor_out);
}

ws_.SetBatchSizes(batch_size);
Expand All @@ -270,7 +295,7 @@ std::vector<std::shared_ptr<TensorList<OutBackend>>> EagerOperator<Backend>::Run
op_->Run(ws_);

for (size_t i = 0; i < num_outputs_; ++i) {
outputs.push_back(AsTensorList<OutBackend>(ws_.template OutputPtr<OutBackend>(i)));
outputs[i] = AsTensorList<OutBackend>(ws_.template OutputPtr<OutBackend>(i));
}

for (size_t i = 0; i < outputs.size(); ++i) {
Expand All @@ -285,7 +310,8 @@ std::vector<std::shared_ptr<TensorList<OutBackend>>> EagerOperator<Backend>::Run

template <typename Backend>
std::unique_ptr<ThreadPool> EagerOperator<Backend>::shared_thread_pool =
std::make_unique<ThreadPool>(1, CPU_ONLY_DEVICE_ID, false);
std::make_unique<ThreadPool>(EagerOperator<Backend>::GetDefaultNumThreads(), CPU_ONLY_DEVICE_ID,
false);

template <typename Backend>
CUDAStreamLease EagerOperator<Backend>::shared_cuda_stream{};
Expand Down
Loading

0 comments on commit 4e08539

Please sign in to comment.