From 0de4b3742c2702d36b5c399ed90b17678de5f035 Mon Sep 17 00:00:00 2001 From: Janusz Lisiecki <39967756+JanuszL@users.noreply.github.com> Date: Wed, 18 May 2022 12:30:51 +0200 Subject: [PATCH 1/3] Add CUDA 11.7 support (#3906) * Add CUDA 11.7 support Signed-off-by: Janusz Lisiecki --- docker/Dockerfile.cuda117.aarch64.deps | 12 ++++++++++++ docker/Dockerfile.cuda117.x86_64.deps | 24 ++++++++++++++++++++++++ docker/build.sh | 4 ++-- docs/compilation.rst | 6 +++--- 4 files changed, 41 insertions(+), 5 deletions(-) create mode 100644 docker/Dockerfile.cuda117.aarch64.deps create mode 100644 docker/Dockerfile.cuda117.x86_64.deps diff --git a/docker/Dockerfile.cuda117.aarch64.deps b/docker/Dockerfile.cuda117.aarch64.deps new file mode 100644 index 00000000000..34b0b5e413c --- /dev/null +++ b/docker/Dockerfile.cuda117.aarch64.deps @@ -0,0 +1,12 @@ +ARG TOOLKIT_BASE_IMAGE=ubuntu:20.04 +FROM ${TOOLKIT_BASE_IMAGE} as cuda + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt update && apt install -y libxml2 curl perl gcc && \ + rm -rf /var/lib/apt/lists/* + +RUN curl -LO https://developer.download.nvidia.com/compute/cuda/11.7.0/local_installers/cuda_11.7.0_515.43.04_linux_sbsa.run && \ + chmod +x cuda_*.run && \ + ./cuda_*.run --silent --no-opengl-libs --toolkit && \ + rm -f cuda_*.run; diff --git a/docker/Dockerfile.cuda117.x86_64.deps b/docker/Dockerfile.cuda117.x86_64.deps new file mode 100644 index 00000000000..1886e83a770 --- /dev/null +++ b/docker/Dockerfile.cuda117.x86_64.deps @@ -0,0 +1,24 @@ +ARG TOOLKIT_BASE_IMAGE=ubuntu:20.04 +FROM ${TOOLKIT_BASE_IMAGE} as cuda + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt update && apt install -y libxml2 curl perl gcc && \ + rm -rf /var/lib/apt/lists/* + +RUN curl -LO https://developer.download.nvidia.com/compute/cuda/11.7.0/local_installers/cuda_11.7.0_515.43.04_linux.run && \ + chmod +x cuda_*.run && \ + ./cuda_*.run --silent --no-opengl-libs --toolkit && \ + rm -f cuda_*.run; + +RUN NVJPEG2K_VERSION=0.5.0.25-1 && \ + CUFILE_VERSION=1.3.0.44-1 && \ + apt-get update && \ + apt-get install wget software-properties-common -y && \ + apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/3bf863cc.pub && \ + add-apt-repository "deb https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/ /" && \ + apt-get update && \ + apt-get install libnvjpeg2k0=${NVJPEG2K_VERSION} libnvjpeg2k-dev=${NVJPEG2K_VERSION} libcufile-dev-11-7=${CUFILE_VERSION} -y && \ + cp /usr/include/nvjpeg2k* /usr/local/cuda/include/ && \ + cp /usr/lib/x86_64-linux-gnu/libnvjpeg2k* /usr/local/cuda/lib64/ && \ + rm -rf /var/lib/apt/lists/* diff --git a/docker/build.sh b/docker/build.sh index eb0db6d266e..e9577f998cc 100755 --- a/docker/build.sh +++ b/docker/build.sh @@ -5,7 +5,7 @@ a build environment To change build configuration please export appropriate env variables (for exact meaning please check the README): PYVER=[default 3.6, required only by Run image] -CUDA_VERSION=[default 11.6, accepts also 10.2, 11.0 and 11.1, 11.2, 11.3, 11.4, 11.5] +CUDA_VERSION=[default 11.7, accepts also 10.2, 11.0 and 11.1, 11.2, 11.3, 11.4, 11.5, 11.6] NVIDIA_BUILD_ID=[default 12345] CREATE_WHL=[default YES] CREATE_RUNNER=[default NO] @@ -40,7 +40,7 @@ shift $((OPTIND - 1)) export ARCH=${ARCH:-x86_64} export PYVER=${PYVER:-3.6} export PYV=${PYVER/./} -export CUDA_VERSION=${CUDA_VERSION:-11.6} +export CUDA_VERSION=${CUDA_VERSION:-11.7} export CUDA_VER=${CUDA_VERSION//./} if [ "${CUDA_VERSION%%\.*}" ] diff --git a/docs/compilation.rst b/docs/compilation.rst index 09d0377ea51..7d8dbc27ee6 100644 --- a/docs/compilation.rst +++ b/docs/compilation.rst @@ -38,10 +38,10 @@ Building Python Wheel Change directory (``cd``) into ``docker`` directory and run ``./build.sh``. If needed, set the following environment variables: -* | CUDA_VERSION - CUDA toolkit version (10.2 and 11.5 are offiically supported, 10.0, 11.0, 11.1, +* | CUDA_VERSION - CUDA toolkit version (10.2 and 11.7 are officially supported, 10.0, 11.0, 11.1, 11.2, 11.4, 11.5 and 11.6 are deprecated and may not work). - | The default is ``11.6``. Thanks to CUDA extended compatibility mode, CUDA 11.1, 11.2, 11.3, 11.4 - 11.5 and 11.6 wheels are named as CUDA 11.0 because it can work with the CUDA 11.0 R450.x driver + | The default is ``11.7``. Thanks to CUDA extended compatibility mode, CUDA 11.1, 11.2, 11.3, 11.4 + 11.5, 11.6 and 11.7 wheels are named as CUDA 11.0 because it can work with the CUDA 11.0 R450.x driver family. Please update to the latest recommended driver version in that family. | If the value of the CUDA_VERSION is prefixed with `.` then any value ``.XX.Y`` can be passed, the supported version check is suppressed, and the user needs to make sure that From c26d2cd63f56451567a5a06f8cb3902dc6fc245e Mon Sep 17 00:00:00 2001 From: Krystian Sztenderski <56883920+ksztenderski@users.noreply.github.com> Date: Wed, 18 May 2022 14:50:57 +0200 Subject: [PATCH 2/3] Add stateless CPU eager operators (#3887) * Adds Python exposure of stateless CPU eager operators to the nvidia.dali.eager.experimental module. * Fixes constant classification for eager (and debug) operators. * Simplifies templating in backend eager operators. * Adds eager version of cpu_only tests (for now subset of operators). Signed-off-by: ksztenderski --- dali/pipeline/operator/eager_operator.h | 202 ++++++++------ dali/pipeline/pipeline_debug.h | 32 ++- dali/python/backend_impl.cc | 12 +- dali/python/nvidia/dali/_debug_mode.py | 106 ++------ dali/python/nvidia/dali/_utils/eager_util.py | 135 ++++++++++ dali/python/nvidia/dali/eager.py | 265 +++++++++++++++++++ dali/python/nvidia/dali/fn.py | 9 +- dali/python/nvidia/dali/ops.py | 30 +-- dali/python/nvidia/dali/types.py | 17 +- dali/test/python/test_eager_operators.py | 195 ++++++++++++++ dali/test/python/test_pipeline_debug.py | 11 +- 11 files changed, 787 insertions(+), 227 deletions(-) create mode 100644 dali/python/nvidia/dali/_utils/eager_util.py create mode 100644 dali/python/nvidia/dali/eager.py create mode 100644 dali/test/python/test_eager_operators.py diff --git a/dali/pipeline/operator/eager_operator.h b/dali/pipeline/operator/eager_operator.h index 6f16fbfdbd8..1dbfe09c8a3 100644 --- a/dali/pipeline/operator/eager_operator.h +++ b/dali/pipeline/operator/eager_operator.h @@ -17,13 +17,14 @@ #include #include +#include #include #include #include - #include "dali/core/cuda_stream_pool.h" #include "dali/core/nvtx.h" +#include "dali/pipeline/data/backend.h" #include "dali/pipeline/data/tensor_list.h" #include "dali/pipeline/operator/op_spec.h" #include "dali/pipeline/operator/operator.h" @@ -35,16 +36,18 @@ namespace dali { template -std::shared_ptr> AsTensorList(const std::shared_ptr> &in) { +std::shared_ptr> AsTensorList(std::shared_ptr> in) { return in; } template -std::shared_ptr> AsTensorList( - const std::shared_ptr> &in) { +std::shared_ptr> AsTensorList(std::shared_ptr> in) { if (in->IsContiguous()) { // Filled contiguous TensorVector, we can return TensorList directly. - return in->AsTensorList(false); + auto tl = in->AsTensorList(false); + // Explicitly set layout (it could be empty in case of per-sample operators). + tl->SetLayout(in->GetLayout()); + return tl; } auto tl = std::make_shared>(); @@ -52,54 +55,95 @@ std::shared_ptr> AsTensorList( return tl; } +template +void MakeContiguous(std::shared_ptr storage) {} + +template <> +void MakeContiguous(std::shared_ptr> storage) { + storage->SetContiguous(true); +} + +template +struct Backend2Types {}; + +template <> +struct Backend2Types { + using InBackend = CPUBackend; + using OutBackend = CPUBackend; + using WSInputType = TensorVector; + using WSOutputType = TensorVector; + static const char name[8]; +}; + +template <> +struct Backend2Types { + using InBackend = GPUBackend; + using OutBackend = GPUBackend; + using WSInputType = TensorList; + using WSOutputType = TensorList; + static const char name[8]; +}; + +template <> +struct Backend2Types { + using InBackend = CPUBackend; + using OutBackend = GPUBackend; + using WSInputType = TensorVector; + using WSOutputType = TensorList; + static const char name[8]; +}; + +const char Backend2Types::name[] = "CPU"; +const char Backend2Types::name[] = "GPU"; +const char Backend2Types::name[] = "Mixed"; + /** * @brief Direct operator providing eager execution of an operator in Run. */ template class DLL_PUBLIC EagerOperator { + using InBackend = typename Backend2Types::InBackend; + using OutBackend = typename Backend2Types::OutBackend; + using WSInputType = typename Backend2Types::WSInputType; + using WSOutputType = typename Backend2Types::WSOutputType; + public: DLL_PUBLIC inline EagerOperator(const OpSpec &spec) : EagerOperator(spec, spec.name()) {} DLL_PUBLIC inline EagerOperator(const OpSpec &spec, std::string name) + : EagerOperator(spec, std::move(name), shared_thread_pool->NumThreads()) {} + + DLL_PUBLIC inline EagerOperator(const OpSpec &spec, std::string name, int num_threads) : max_batch_size_(spec.GetArgument("max_batch_size")), op_spec_(spec), - name_(std::move(name)), - op_(InstantiateOperator(spec)) { + name_(std::move(name)) { + op_spec_.AddArg("num_threads", num_threads); + op_ = InstantiateOperator(op_spec_); num_outputs_ = op_spec_.GetSchema().CalculateOutputs(op_spec_) + op_spec_.GetSchema().CalculateAdditionalOutputs(op_spec_); } // Runs operator using shared thread pool and shared CUDA stream. - template DLL_PUBLIC std::vector>> Run( const std::vector>> &inputs, const std::unordered_map>> &kwargs, - int batch_size = -1) { - DALI_FAIL("Unsupported backends in EagerOperator.Run()."); - } + int batch_size = -1); // Runs operator using specified thread pool. - template DLL_PUBLIC std::vector>> Run( const std::vector>> &inputs, const std::unordered_map>> &kwargs, - ThreadPool *tp, int batch_size = -1) { - DALI_FAIL("Unsupported backends in EagerOperator.Run() with thread pool."); - } + ThreadPool *tp, int batch_size = -1); // Runs operator using specified CUDA stream. - template DLL_PUBLIC std::vector>> Run( const std::vector>> &inputs, const std::unordered_map>> &kwargs, - CUDAStreamLease &cuda_stream, int batch_size = -1) { - DALI_FAIL("Unsupported backends in EagerOperator.Run() with CUDA stream"); - } + CUDAStreamLease &cuda_stream, int batch_size = -1); // Update shared thread pool used for all direct operators. - DLL_PUBLIC inline static void UpdateThreadPool(int num_threads, int device_id, - bool set_affinity) { - shared_thread_pool = std::make_unique(num_threads, device_id, set_affinity); + DLL_PUBLIC inline static void UpdateThreadPool(int num_threads) { + shared_thread_pool = std::make_unique(num_threads, CPU_ONLY_DEVICE_ID, false); } // Update shared CUDA stream used for all direct operators. @@ -111,7 +155,6 @@ class DLL_PUBLIC EagerOperator { } private: - template std::vector>> RunImpl( const std::vector>> &inputs, const std::unordered_map>> &kwargs, @@ -122,6 +165,11 @@ class DLL_PUBLIC EagerOperator { ", instance name: \"", name_, "\", encountered:\n", what); } + static inline int GetDefaultNumThreads() { + int num_cores = std::thread::hardware_concurrency(); + return num_cores < 6 ? num_cores : 6; + } + int max_batch_size_; size_t num_outputs_; workspace_t ws_; @@ -133,86 +181,63 @@ class DLL_PUBLIC EagerOperator { static std::unique_ptr shared_thread_pool; }; -template <> +template +std::vector::OutBackend>>> +EagerOperator::Run( + const std::vector>> &inputs, + const std::unordered_map>> &kwargs, + int batch_size) { + return Run(inputs, kwargs, shared_cuda_stream, batch_size); +} + template <> std::vector>> EagerOperator::Run( const std::vector>> &inputs, const std::unordered_map>> &kwargs, - ThreadPool *thread_pool, int batch_size) { - try { - DomainTimeRange tr("[DALI][CPU op] " + name_, DomainTimeRange::kBlue1); - ws_.Clear(); - ws_.SetThreadPool(thread_pool); - - return RunImpl, TensorVector>( - inputs, kwargs, batch_size); - } catch (std::exception &e) { throw std::runtime_error(ExtendErrorMsg("CPU", e.what())); } + int batch_size) { + return Run(inputs, kwargs, shared_thread_pool.get(), batch_size); } -template <> -template <> -std::vector>> EagerOperator::Run( - const std::vector>> &inputs, +template +std::vector::OutBackend>>> +EagerOperator::Run( + const std::vector>> &inputs, const std::unordered_map>> &kwargs, - CUDAStreamLease &cuda_stream, int batch_size) { + ThreadPool *thread_pool, int batch_size) { try { - DomainTimeRange tr("[DALI][GPU op] " + name_, DomainTimeRange::knvGreen); + DomainTimeRange tr("[DALI][" + std::string(Backend2Types::name) + " op] " + name_, + DomainTimeRange::kBlue1); ws_.Clear(); - ws_.set_stream(cuda_stream); - auto output = RunImpl, TensorList>( - inputs, kwargs, batch_size); - CUDA_CALL(cudaStreamSynchronize(cuda_stream)); - return output; - } catch (std::exception &e) { throw std::runtime_error(ExtendErrorMsg("GPU", e.what())); } + ws_.SetThreadPool(thread_pool); + + return RunImpl(inputs, kwargs, batch_size); + } catch (std::exception &e) { + throw std::runtime_error(ExtendErrorMsg(Backend2Types::name, e.what())); + } } -template <> -template <> -std::vector>> EagerOperator::Run( - const std::vector>> &inputs, +template +std::vector::OutBackend>>> +EagerOperator::Run( + const std::vector>> &inputs, const std::unordered_map>> &kwargs, CUDAStreamLease &cuda_stream, int batch_size) { try { - DomainTimeRange tr("[DALI][Mixed op] " + name_, DomainTimeRange::kOrange); + DomainTimeRange tr("[DALI][" + std::string(Backend2Types::name) + " op] " + name_, + DomainTimeRange::knvGreen); ws_.Clear(); ws_.set_stream(cuda_stream); - auto output = RunImpl, TensorList>( - inputs, kwargs, batch_size); + auto output = RunImpl(inputs, kwargs, batch_size); CUDA_CALL(cudaStreamSynchronize(cuda_stream)); return output; - } catch (std::exception &e) { throw std::runtime_error(ExtendErrorMsg("Mixed", e.what())); } -} - -template <> -template <> -std::vector>> EagerOperator::Run( - const std::vector>> &inputs, - const std::unordered_map>> &kwargs, - int batch_size) { - return Run(inputs, kwargs, shared_thread_pool.get(), batch_size); -} - -template <> -template <> -std::vector>> EagerOperator::Run( - const std::vector>> &inputs, - const std::unordered_map>> &kwargs, - int batch_size) { - return Run(inputs, kwargs, shared_cuda_stream, batch_size); -} - -template <> -template <> -std::vector>> EagerOperator::Run( - const std::vector>> &inputs, - const std::unordered_map>> &kwargs, - int batch_size) { - return Run(inputs, kwargs, shared_cuda_stream, batch_size); + } catch (std::exception &e) { + throw std::runtime_error(ExtendErrorMsg(Backend2Types::name, e.what())); + } } template -template -std::vector>> EagerOperator::RunImpl( +std::vector::OutBackend>>> +EagerOperator::RunImpl( const std::vector>> &inputs, const std::unordered_map>> &kwargs, int batch_size) { @@ -250,12 +275,12 @@ std::vector>> EagerOperator::Run } std::vector output_desc{}; - std::vector>> outputs{}; - - outputs.reserve(num_outputs_); + std::vector>> outputs(num_outputs_); for (size_t i = 0; i < num_outputs_; ++i) { - ws_.AddOutput(std::make_shared(max_batch_size_)); + auto tensor_out = std::make_shared(batch_size); + MakeContiguous(tensor_out); + ws_.AddOutput(tensor_out); } ws_.SetBatchSizes(batch_size); @@ -270,7 +295,7 @@ std::vector>> EagerOperator::Run op_->Run(ws_); for (size_t i = 0; i < num_outputs_; ++i) { - outputs.push_back(AsTensorList(ws_.template OutputPtr(i))); + outputs[i] = AsTensorList(ws_.template OutputPtr(i)); } for (size_t i = 0; i < outputs.size(); ++i) { @@ -285,7 +310,8 @@ std::vector>> EagerOperator::Run template std::unique_ptr EagerOperator::shared_thread_pool = - std::make_unique(1, CPU_ONLY_DEVICE_ID, false); + std::make_unique(EagerOperator::GetDefaultNumThreads(), CPU_ONLY_DEVICE_ID, + false); template CUDAStreamLease EagerOperator::shared_cuda_stream{}; diff --git a/dali/pipeline/pipeline_debug.h b/dali/pipeline/pipeline_debug.h index 192752450ed..9773240d07d 100644 --- a/dali/pipeline/pipeline_debug.h +++ b/dali/pipeline/pipeline_debug.h @@ -19,6 +19,7 @@ #include #include #include + #include "dali/core/cuda_stream_pool.h" #include "dali/pipeline/operator/eager_operator.h" #include "dali/pipeline/util/thread_pool.h" @@ -29,6 +30,12 @@ namespace dali { * @brief Debug mode pipeline keeping operators, thread pool and CUDA stream. */ class DLL_PUBLIC PipelineDebug { + template + using input_t = std::shared_ptr::InBackend>>; + + template + using output_t = std::shared_ptr::OutBackend>>; + public: DLL_PUBLIC inline PipelineDebug(int max_batch_size, int num_threads, int device_id, bool set_affinity = false) @@ -57,9 +64,9 @@ class DLL_PUBLIC PipelineDebug { } } - template - DLL_PUBLIC std::vector>> RunOperator( - int logical_id, const std::vector>> &inputs, + template + DLL_PUBLIC std::vector> RunOperator( + int logical_id, const std::vector> &inputs, const std::unordered_map>> &kwargs, int batch_size = -1) { DALI_FAIL("Unsupported backends in PipelineDebug.RunOperator()."); @@ -69,7 +76,6 @@ class DLL_PUBLIC PipelineDebug { void FillOpSpec(OpSpec &spec) { spec.AddArg("max_batch_size", max_batch_size_); spec.AddArg("device_id", device_id_); - spec.AddArg("num_threads", num_threads_); } void AddOperatorImpl(OpSpec &spec, int logical_id) { @@ -77,11 +83,11 @@ class DLL_PUBLIC PipelineDebug { std::string device = spec.GetArgument("device"); if (device == "gpu") { - gpu_operators_.insert({logical_id, EagerOperator(spec, name)}); + gpu_operators_.insert({logical_id, EagerOperator(spec, name, num_threads_)}); } else if (device == "cpu") { - cpu_operators_.insert({logical_id, EagerOperator(spec, name)}); + cpu_operators_.insert({logical_id, EagerOperator(spec, name, num_threads_)}); } else if (device == "mixed") { - mixed_operators_.insert({logical_id, EagerOperator(spec, name)}); + mixed_operators_.insert({logical_id, EagerOperator(spec, name, num_threads_)}); } } @@ -96,33 +102,33 @@ class DLL_PUBLIC PipelineDebug { }; template <> -std::vector>> PipelineDebug::RunOperator( +std::vector>> PipelineDebug::RunOperator( int logical_id, const std::vector>> &inputs, const std::unordered_map>> &kwargs, int batch_size) { auto op = cpu_operators_.find(logical_id); DALI_ENFORCE(op != cpu_operators_.end(), "Failed to acquire CPU Operator in PipelineDebug."); - return op->second.template Run(inputs, kwargs, &thread_pool_, batch_size); + return op->second.Run(inputs, kwargs, &thread_pool_, batch_size); } template <> -std::vector>> PipelineDebug::RunOperator( +std::vector>> PipelineDebug::RunOperator( int logical_id, const std::vector>> &inputs, const std::unordered_map>> &kwargs, int batch_size) { auto op = gpu_operators_.find(logical_id); DALI_ENFORCE(op != gpu_operators_.end(), "Failed to acquire GPU Operator in PipelineDebug."); - return op->second.template Run(inputs, kwargs, cuda_stream_, batch_size); + return op->second.Run(inputs, kwargs, cuda_stream_, batch_size); } template <> -std::vector>> PipelineDebug::RunOperator( +std::vector>> PipelineDebug::RunOperator( int logical_id, const std::vector>> &inputs, const std::unordered_map>> &kwargs, int batch_size) { auto op = mixed_operators_.find(logical_id); DALI_ENFORCE(op != mixed_operators_.end(), "Failed to acquire Mixed Operator in PipelineDebug."); - return op->second.template Run(inputs, kwargs, cuda_stream_, batch_size); + return op->second.Run(inputs, kwargs, cuda_stream_, batch_size); } } // namespace dali diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index fec4c8d1ecf..6c87c205d35 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -601,7 +601,7 @@ void ExposeEagerOperator(py::module &m) { [](EagerOperator &op, const std::vector>> &inputs, const std::unordered_map>> - &kwargs) { return op.Run(inputs, kwargs); }); + &kwargs) { return op.Run(inputs, kwargs); }); py::class_>(m, "EagerOperatorGPU") .def(py::init([](const OpSpec &op_spec) { @@ -612,7 +612,7 @@ void ExposeEagerOperator(py::module &m) { [](EagerOperator &op, const std::vector>> &inputs, const std::unordered_map>> - &kwargs) { return op.Run(inputs, kwargs); }); + &kwargs) { return op.Run(inputs, kwargs); }); py::class_>(m, "EagerOperatorMixed") .def(py::init([](const OpSpec &op_spec) { @@ -623,7 +623,7 @@ void ExposeEagerOperator(py::module &m) { [](EagerOperator &op, const std::vector>> &inputs, const std::unordered_map>> - &kwargs) { return op.Run(inputs, kwargs); }); + &kwargs) { return op.Run(inputs, kwargs); }); } void ExposePipelineDebug(py::module &m) { @@ -633,9 +633,9 @@ void ExposePipelineDebug(py::module &m) { })) .def("AddOperator", &PipelineDebug::AddOperator) .def("AddMultipleOperators", &PipelineDebug::AddMultipleOperators) - .def("RunOperatorCPU", &PipelineDebug::RunOperator) - .def("RunOperatorGPU", &PipelineDebug::RunOperator) - .def("RunOperatorMixed", &PipelineDebug::RunOperator); + .def("RunOperatorCPU", &PipelineDebug::RunOperator) + .def("RunOperatorGPU", &PipelineDebug::RunOperator) + .def("RunOperatorMixed", &PipelineDebug::RunOperator); } template diff --git a/dali/python/nvidia/dali/_debug_mode.py b/dali/python/nvidia/dali/_debug_mode.py index a009194b15e..0ae46f72080 100644 --- a/dali/python/nvidia/dali/_debug_mode.py +++ b/dali/python/nvidia/dali/_debug_mode.py @@ -21,9 +21,9 @@ import nvidia.dali.pipeline as _pipeline import nvidia.dali.tensors as _tensors import nvidia.dali.types as _types +from nvidia.dali._utils.eager_util import _Classification, _transform_data_to_tensorlist from nvidia.dali.data_node import DataNode as _DataNode, _check from nvidia.dali.fn import _to_snake_case -from nvidia.dali.external_source import _prep_data_for_feed_input from nvidia.dali._utils.external_source_impl import \ get_callback_from_source as _get_callback_from_source, \ accepted_arg_count as _accepted_arg_count @@ -137,18 +137,6 @@ def __rxor__(self, other): _aritm_op_name = _to_snake_case(_ops.ArithmeticGenericOp.__name__) -def _transform_data_to_tensorlist(data, batch_size, layout=None, device_id=None): - data = _prep_data_for_feed_input(data, batch_size, layout, device_id) - - if isinstance(data, list): - if isinstance(data[0], _tensors.TensorGPU): - data = _tensors.TensorListGPU(data, layout or "") - else: - data = _tensors.TensorListCPU(data, layout or "") - - return data - - class _ExternalSourceDebug: """Debug mode version of ExternalSource operator.""" @@ -248,73 +236,6 @@ def to_data_node_debug(data): return to_data_node_debug(raw_data) -class _Classification: - """Classification of data's device and if it is a batch. - - Based on data type determines if data should be treated as a batch and with which device. - If the type can be recognized as a batch without being falsely categorized as such, it is. - This includes lists of supported tensor-like objects e.g. numpy arrays (the only list not - treated as a batch is a list of objects of primitive types), :class:`DataNodeDebug` and - TensorLists. - """ - - def __init__(self, data, type_name): - self.is_batch, self.device, self.data = self._classify_data(data, type_name) - - @staticmethod - def _classify_data(data, type_name): - """Returns tuple (is_batch, device, unpacked data). """ - - def is_primitive_type(x): - return isinstance(x, (int, float, bool, str)) - - if isinstance(data, list): - if len(data) == 0 or any([is_primitive_type(d) for d in data]): - return False, 'cpu', data - - is_batch_list = [] - device_list = [] - data_list = [] - - for d in data: - is_batch, device, val = _Classification._classify_data(d, type_name) - is_batch_list.append(is_batch) - device_list.append(device) - data_list.append(val) - - if any([device != device_list[0] for device in device_list]): - raise RuntimeError(f'{type_name} has batches of data on CPU and on GPU. ' - 'Which is not supported.') - - if all(is_batch_list): - # Input set. - return is_batch_list, device_list[0], data_list - if not any(is_batch_list): - # Converting to TensorList. - return True, device_list[0], _transform_data_to_tensorlist(data_list, len(data_list)) - else: - raise RuntimeError(f'{type_name} has inconsistent batch classification.') - - else: - if isinstance(data, DataNodeDebug): - return True, data.device, data.get() - if isinstance(data, _tensors.TensorListCPU): - return True, 'cpu', data - if isinstance(data, _tensors.TensorListGPU): - return True, 'gpu', data - if is_primitive_type(data) or _types._is_numpy_array(data) or \ - isinstance(data, _tensors.TensorCPU): - return False, 'cpu', data - if _types._is_torch_tensor(data): - return False, 'gpu' if data.is_cuda else 'cpu', data - if _types._is_mxnet_array(data): - return False, 'gpu' if 'gpu' in str(data.context) else 'cpu', data - if hasattr(data, '__cuda_array_interface__') or isinstance(data, _tensors.TensorGPU): - return False, 'gpu', data - - return False, 'cpu', data - - class _IterBatchInfo: def __init__(self, size, source_context): self._size = size @@ -366,6 +287,7 @@ class _OperatorManager: def __init__(self, op_class, op_name, pipe, source_context, next_logical_id, batch_size, seed, inputs, kwargs): """Creates direct operator.""" + self._batch_size = batch_size self._separate_kwargs(kwargs) if op_name == 'arithmetic_generic_op': @@ -393,7 +315,6 @@ def __init__(self, op_class, op_name, pipe, source_context, next_logical_id, bat if 'seed' not in self._init_args: self._init_args['seed'] = seed - self._batch_size = batch_size self._device = self._init_args.get('device', 'cpu') self._expected_inputs_size = len(inputs) self.op_helper = op_class(**self._init_args) @@ -416,7 +337,8 @@ def _separate_kwargs(self, kwargs): self._kwargs_classification = {} for key, value in kwargs.items(): - classification = _Classification(value, f'Argument {key}') + classification = _Classification( + value, f'Argument {key}', arg_constant_len=self._batch_size) if classification.is_batch: self._call_args[key] = classification.data else: @@ -473,10 +395,23 @@ def _check_batch_size(self, classification, input_idx): self._pipe._cur_iter_batch_info.check_input( len(classification.data), self._source_context, self._op_name, input_idx) + def _prep_input_sets(self, inputs): + inputs = list(inputs) + + for i, input in enumerate(inputs): + # Transforming any convertable datatype to TensorList (DataNodeDebugs are already unpacked). + # Additionally accepting input sets, but only as list of TensorList. + if not isinstance(input, (_tensors.TensorListCPU, _tensors.TensorListGPU)) and \ + not (isinstance(input, list) and + all([isinstance(elem, (_tensors.TensorListCPU, _tensors.TensorListGPU)) for elem in input])): + inputs[i] = _transform_data_to_tensorlist(input, len(input)) + + return self.op_helper._build_input_sets(inputs) + def run(self, inputs, kwargs): """Checks correctness of inputs and kwargs and runs the backend operator.""" self._check_arg_len(self._expected_inputs_size, len(inputs), 'inputs') - self._check_arg_len(len(self._kwargs_classification), len(kwargs), 'keyward arguments') + self._check_arg_len(len(self._kwargs_classification), len(kwargs), 'keyword arguments') call_args = {} inputs = list(inputs) @@ -499,11 +434,12 @@ def run(self, inputs, kwargs): inputs[i] = classification.data - input_sets = _ops._prep_input_sets(self.op_helper, inputs) + input_sets = self._prep_input_sets(inputs) # Check kwargs classification as batches and setup call args. for key, value in kwargs.items(): - classification = _Classification(value, f'Argument {key}') + classification = _Classification( + value, f'Argument {key}', arg_constant_len=self._batch_size) self._check_batch_classification( self._kwargs_classification[key].is_batch, classification.is_batch, 'Argument', key) diff --git a/dali/python/nvidia/dali/_utils/eager_util.py b/dali/python/nvidia/dali/_utils/eager_util.py new file mode 100644 index 00000000000..9101b941722 --- /dev/null +++ b/dali/python/nvidia/dali/_utils/eager_util.py @@ -0,0 +1,135 @@ +# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import nvidia.dali.tensors as _tensors +import nvidia.dali.types as _types +from nvidia.dali.external_source import _prep_data_for_feed_input + + +def _transform_data_to_tensorlist(data, batch_size, layout=None, device_id=None): + data = _prep_data_for_feed_input(data, batch_size, layout, device_id) + + if isinstance(data, list): + if isinstance(data[0], _tensors.TensorGPU): + data = _tensors.TensorListGPU(data, layout or "") + else: + data = _tensors.TensorListCPU(data, layout or "") + + return data + + +class _Classification: + """Classification of data's device and whether it is a batch. + + Based on data type determines if data should be treated as a batch and with which device. + If the type can be recognized as a batch without being falsely categorized as such, it is. + This includes lists of supported tensor-like objects e.g. numpy arrays (the only list not + treated as a batch is a list of objects of primitive types), :class:`DataNodeDebug` and + TensorLists. + + Args: + data: Data to be classified. + type_name (str): Representation of argument type (input or keyword). + arg_constant_len (int): Only applicable for argument inputs that are of array type + (e.g. numpy array). If -1 does not modify the data. For positive value works like + `:class:ops.Constant`, repeats the data `arg_constant_len` times. + """ + + def __init__(self, data, type_name, arg_constant_len=-1): + self.is_batch, self.device, self.data = self._classify_data( + data, type_name, arg_constant_len) + + @staticmethod + def _classify_data(data, type_name, arg_constant_len): + from nvidia.dali._debug_mode import DataNodeDebug + """Returns tuple (is_batch, device, unpacked data). """ + + def is_primitive_type(x): + return isinstance(x, (int, float, bool, str)) + + def classify_array_input(arr): + if _types._is_numpy_array(arr): + device = 'cpu' + elif _types._is_torch_tensor(arr): + device = 'gpu' if arr.is_cuda else 'cpu' + elif _types._is_mxnet_array(arr): + device = 'gpu' if 'gpu' in str(arr.context) else 'cpu' + else: + raise RuntimeError(f"Unsupported array type '{type(arr)}'.") + + return False, device, arr + + def classify_array_kwarg(arr): + if _types._is_torch_tensor(arr): + if arr.is_cuda: + arr = arr.cpu().numpy() + elif _types._is_mxnet_array(arr): + import mxnet as mx + + if 'gpu' in str(arr.context): + arr = arr.copyto(mx.cpu()) + elif not _types._is_numpy_array(arr): + raise RuntimeError(f"Unsupported array type '{type(arr)}'.") + + arr = _types._preprocess_constant_array_type(arr) + arr = _tensors.TensorListCPU([_tensors.TensorCPU(arr)] * arg_constant_len) + return True, 'cpu', arr + + if isinstance(data, list): + if len(data) == 0 or any([is_primitive_type(d) for d in data]): + return False, 'cpu', data + + is_batch_list = [] + device_list = [] + data_list = [] + + for d in data: + is_batch, device, val = _Classification._classify_data(d, type_name, -1) + is_batch_list.append(is_batch) + device_list.append(device) + data_list.append(val) + + if any([device != device_list[0] for device in device_list]): + raise RuntimeError(f'{type_name} has batches of data on CPU and on GPU, ' + 'which is not supported.') + + if all(is_batch_list): + # Input set. + return is_batch_list, device_list[0], data_list + if not any(is_batch_list): + # Converting to TensorList. + return True, device_list[0], _transform_data_to_tensorlist(data_list, len(data_list)) + else: + raise RuntimeError(f'{type_name} has inconsistent batch classification.') + + else: + if isinstance(data, DataNodeDebug): + return True, data.device, data.get() + if isinstance(data, _tensors.TensorListCPU): + return True, 'cpu', data + if isinstance(data, _tensors.TensorListGPU): + return True, 'gpu', data + if is_primitive_type(data) or isinstance(data, _tensors.TensorCPU): + return False, 'cpu', data + if _types._is_compatible_array_type(data): + if arg_constant_len > 0: + # For call argument input repeats data `arg_constant_len` times to match + # the ContantOp behavior. + return classify_array_kwarg(data) + else: + return classify_array_input(data) + if hasattr(data, '__cuda_array_interface__') or isinstance(data, _tensors.TensorGPU): + return False, 'gpu', data + + return False, 'cpu', data diff --git a/dali/python/nvidia/dali/eager.py b/dali/python/nvidia/dali/eager.py new file mode 100644 index 00000000000..5220340413c --- /dev/null +++ b/dali/python/nvidia/dali/eager.py @@ -0,0 +1,265 @@ +# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import nvidia.dali.backend as _b +import nvidia.dali.internal as _internal +import nvidia.dali.ops as _ops +import nvidia.dali.tensors as _tensors +from nvidia.dali._utils.eager_util import _Classification, _transform_data_to_tensorlist + + +_stateful_operators = { + 'decoders__ImageRandomCrop', + 'noise__Gaussian', + 'noise__SaltAndPepper', + 'noise__Shot', + 'segmentation__RandomMaskPixel', + 'segmentation__RandomObjectBBox', + 'FastResizeCropMirror', + 'Jitter', + 'ROIRandomCrop', + 'RandomBBoxCrop', + 'RandomResizedCrop', + 'ResizeCropMirror', +} + + +_generator_operators = { + 'readers__COCO', + 'readers__Caffe', + 'readers__Caffe2', + 'readers__File', + 'readers__MXNet', + 'readers__NemoAsr', + 'readers__Numpy', + 'readers__Sequence', + 'readers__TFRecord', + 'readers__Video', + 'readers__VideoResize', + 'readers__Webdataset', + 'random__CoinFlip', + 'random__Normal', + 'random__Uniform', + 'BatchPermutation', +} + + +_stateless_operators_cache = {} + + +def _eager_op_base_factory(op_class, op_name, num_inputs, call_args_names): + class EagerOperatorBase(op_class): + def __init__(self, *, max_batch_size, device_id, **kwargs): + super().__init__(**kwargs) + + self._spec.AddArg('device_id', device_id) + self._spec.AddArg('max_batch_size', max_batch_size) + + for i in range(num_inputs): + self._spec.AddInput(op_name+f'[{i}]', self._device) + + for arg_name in call_args_names: + self._spec.AddArgumentInput(arg_name, '') + + if self._device == 'cpu': + self._backend_op = _b.EagerOperatorCPU(self._spec) + elif self._device == 'gpu': + self._backend_op = _b.EagerOperatorGPU(self._spec) + elif self._device == 'mixed': + self._backend_op = _b.EagerOperatorMixed(self._spec) + else: + raise ValueError( + f"Incorrect device type '{self._device}' in eager operator '{op_name}'.") + + return EagerOperatorBase + + +def _stateless_op_factory(op_class, op_name, num_inputs, call_args_names): + class EagerOperator(_eager_op_base_factory(op_class, op_name, num_inputs, call_args_names)): + def __call__(self, inputs, kwargs): + # Here all kwargs are supposed to be TensorLists. + output = self._backend_op(inputs, kwargs) + + if len(output) == 1: + return output[0] + + return output + + return EagerOperator + + +def _choose_device(op_name, wrapper_name, inputs, device_param): + """Returns device type and device_id based on inputs and device_param.""" + + input_device = '' + + if len(inputs) > 0: + if any(isinstance(input, _tensors.TensorListGPU) for input in inputs): + input_device = 'gpu:0' + else: + input_device = 'cpu' + + if device_param is None: + # Select device type based on inputs. + device_param = input_device if input_device else 'cpu' + + sep_pos = device_param.find(':') + + # Separate device and device_id. + if sep_pos != -1: + device = device_param[:sep_pos] + device_id = int(device_param[sep_pos + 1:]) + else: + device = device_param + device_id = 0 + + if device == 'cpu' and input_device == 'gpu': + raise ValueError("An operator with device='cpu' cannot accept GPU inputs.") + + if device != 'cpu' and device != 'gpu': + raise ValueError(f"Incorrect device type '{device}'.") + + if input_device == 'cpu' and device == 'gpu': + if op_name in _ops._mixed_ops: + device = 'mixed' + else: + raise ValueError(f"Operator '{wrapper_name}' not registered for mixed.") + + return device, device_id + + +def _disqualify_arguments(op_name, kwargs, disqualified_args): + for key in disqualified_args: + if key in kwargs: + raise RuntimeError(f"Argument '{key}' is not supported by eager operator '{op_name}'.") + + +def _choose_batch_size(inputs, batch_size): + """Returns batch size based on inputs and batch_size parameter.""" + + if len(inputs) > 0: + input_batch_size = len(inputs[0]) + + if batch_size == -1: + batch_size = input_batch_size + + if input_batch_size != batch_size: + raise ValueError( + f"Requested batch_size={batch_size}, but input 0 has batch_size={input_batch_size}") + + if batch_size == -1: + raise RuntimeError( + "Operators with no inputs need to have 'batch_size' parameter specified.") + + return batch_size + + +def _prep_inputs(inputs, batch_size): + inputs = list(inputs) + + for i, input in enumerate(inputs): + if not isinstance(input, (_tensors.TensorListCPU, _tensors.TensorListGPU)): + inputs[i] = _transform_data_to_tensorlist(input, batch_size) + + return inputs + + +def _prep_kwargs(kwargs, batch_size): + for key, value in kwargs.items(): + kwargs[key] = _Classification(value, f'Argument {key}', arg_constant_len=batch_size).data + + return kwargs + + +def _desc_call_args(inputs, args): + """Returns string description of call arguments (inputs and input arguments) to use as part of + the caching key.""" + return str([(inp.dtype, inp.layout(), len(inp[0].shape())) for inp in inputs]) + str(sorted( + [(key, value.dtype, value.layout(), len(value[0].shape())) for key, value in args.items()])) + + +def _wrap_stateless(op_class, op_name, wrapper_name): + """Wraps stateless Eager Operator in a function. Callable the same way as functions in fn API, + but directly with TensorLists. + """ + def wrapper(*inputs, **kwargs): + _disqualify_arguments(wrapper_name, kwargs, _wrap_stateless.disqualified_arguments) + + # Preprocess kwargs to get batch_size. + batch_size = _choose_batch_size(inputs, kwargs.pop('batch_size', -1)) + kwargs = _prep_kwargs(kwargs, batch_size) + init_args, call_args = _ops._separate_kwargs(kwargs, _tensors.TensorListCPU) + + # Preprocess inputs, try to convert each input to TensorList. + inputs = _prep_inputs(inputs, batch_size) + + init_args['max_batch_size'] = batch_size + init_args['device'], init_args['device_id'] = _choose_device( + op_name, wrapper_name, inputs, kwargs.get('device')) + + # Creating cache key consisting of operator name, description of inputs, input arguments + # and init args. Each call arg is described by dtype, layout and dim. + key = op_name + _desc_call_args(inputs, call_args) + str(sorted(init_args.items())) + + if key not in _stateless_operators_cache: + _stateless_operators_cache[key] = _stateless_op_factory( + op_class, wrapper_name, len(inputs), call_args.keys())(**init_args) + + return _stateless_operators_cache[key](inputs, call_args) + + return wrapper + + +_wrap_stateless.disqualified_arguments = { + 'bytes_per_sample_hint', + 'preserve', + 'seed' +} + + +def _wrap_eager_op(op_class, submodule, wrapper_name, wrapper_doc): + """Exposes eager operator to the appropriate module (similar to :func:`nvidia.dali.fn._wrap_op`). + Uses ``op_class`` for preprocessing inputs and keyword arguments and filling OpSpec for backend + eager operators. + + Args: + op_class: Op class to wrap. + submodule: Additional submodule (scope). + wrapper_name: Wrapper name (the same as in fn API). + wrapper_doc (str): Documentation of the wrapper function. + """ + op_name = op_class.schema_name + op_schema = _b.TryGetSchema(op_name) + if op_schema.IsDeprecated() or op_name in _stateful_operators or op_name in _generator_operators: + # TODO(ksztenderski): For now only exposing stateless operators. + return + else: + # If operator is not stateful or a generator expose it as stateless. + wrapper = _wrap_stateless(op_class, op_name, wrapper_name) + + # Exposing to eager.experimental module. + eager_module = _internal.get_submodule(sys.modules[__name__], 'experimental') + op_module = _internal.get_submodule(eager_module, submodule) + + if not hasattr(op_module, wrapper_name): + wrapper.__name__ = wrapper_name + wrapper.__qualname__ = wrapper_name + wrapper.__doc__ = wrapper_doc + + if submodule: + wrapper.__module__ = op_module.__name__ + + setattr(op_module, wrapper_name, wrapper) diff --git a/dali/python/nvidia/dali/fn.py b/dali/python/nvidia/dali/fn.py index 4b105622d38..46c87b9d771 100644 --- a/dali/python/nvidia/dali/fn.py +++ b/dali/python/nvidia/dali/fn.py @@ -83,9 +83,6 @@ def fn_wrapper(*inputs, **kwargs): else: return op_wrapper(*inputs, **kwargs) - op_wrapper.__name__ = wrapper_name - op_wrapper.__qualname__ = wrapper_name - op_wrapper.__doc__ = wrapper_doc fn_wrapper.__name__ = wrapper_name fn_wrapper.__qualname__ = wrapper_name fn_wrapper.__doc__ = wrapper_doc @@ -102,9 +99,15 @@ def _wrap_op(op_class, submodule, parent_module, wrapper_doc): otherwise in a specified parent module. wrapper_doc (str): Documentation of the wrapper function """ + from nvidia.dali.eager import _wrap_eager_op + schema = _b.TryGetSchema(op_class.__name__) make_hidden = schema.IsDocHidden() if schema else False wrapper_name = _to_snake_case(op_class.__name__) + + # Add operator to eager API. + _wrap_eager_op(op_class, submodule, wrapper_name, wrapper_doc) + if parent_module is None: fn_module = sys.modules[__name__] else: diff --git a/dali/python/nvidia/dali/ops.py b/dali/python/nvidia/dali/ops.py index ce4ce17d762..db0b050b307 100644 --- a/dali/python/nvidia/dali/ops.py +++ b/dali/python/nvidia/dali/ops.py @@ -324,19 +324,24 @@ def _instantiate_constant_node(device, constant): return _Constant(device=device, value=constant.value, dtype=constant.dtype, shape=constant.shape) -def _separate_kwargs(kwargs): +def _separate_kwargs(kwargs, arg_input_type=_DataNode): """Separates arguments into ones that should go to operator's __init__ and to __call__. Returns a pair of dictionaries of kwargs - the first for __init__, the second for __call__. + + Args: + kwargs: Keyword arguments. + arg_input_type: operator's argument input type, DataNode for pipeline mode, TensorListCPU + for eager mode. """ - def is_data_node(x): - return isinstance(x, _DataNode) + def is_arg_input_type(x): + return isinstance(x, arg_input_type) def is_call_arg(name, value): if name == "device": return False if name == "ndim": return False - if name == "name" or is_data_node(value): + if name == "name" or is_arg_input_type(value): return True if isinstance(value, (str, list, tuple, nvidia.dali.types.ScalarConstant)): return False @@ -717,23 +722,6 @@ def _build_input_sets(self, inputs): return Operator -def _prep_input_sets(op, inputs): - from nvidia.dali._debug_mode import _transform_data_to_tensorlist - import nvidia.dali.tensors as tensors - - inputs = list(inputs) - - for i, input in enumerate(inputs): - # Transforming any convertable datatype to TensorList (DataNodeDebugs are already unpacked). - # Additionally accepting input sets, but only as list of TensorList. - if not isinstance(input, (tensors.TensorListCPU, tensors.TensorListGPU)) and \ - not (isinstance(input, list) and - all([isinstance(elem, (tensors.TensorListCPU, tensors.TensorListGPU)) for elem in input])): - inputs[i] = _transform_data_to_tensorlist(input, len(input)) - - return op._build_input_sets(inputs) - - def _process_op_name(op_schema_name, make_hidden=False): namespace_delim = "__" # Two underscores (reasoning: we might want to have single underscores in the namespace itself) op_full_name = op_schema_name.replace(namespace_delim, '.') diff --git a/dali/python/nvidia/dali/types.py b/dali/python/nvidia/dali/types.py index e69af09413c..bad0e0d8152 100644 --- a/dali/python/nvidia/dali/types.py +++ b/dali/python/nvidia/dali/types.py @@ -409,8 +409,8 @@ def to_dali_type(framework_type): def _is_compatible_array_type(value): return _is_numpy_array(value) or _is_mxnet_array(value) or _is_torch_tensor(value) -def ConstantNode(device, value, dtype, shape, layout, **kwargs): - data = value + +def _preprocess_constant_array_type(value): if _is_mxnet_array(value): # mxnet ndarray is not directly compatible with numpy.ndarray, but provides conversion value = value.asnumpy() @@ -425,8 +425,15 @@ def ConstantNode(device, value, dtype, shape, layout, **kwargs): if value.dtype == np.uint64: value = value.astype(np.uint32) - if _is_numpy_array(value) or _is_torch_tensor(value): - # torch tensor and numpy array have very similar API + return value + + +def ConstantNode(device, value, dtype, shape, layout, **kwargs): + data = value + if _is_compatible_array_type(value): + value = _preprocess_constant_array_type(value) + + # At this point value is a numpy array or a torch tensor. They have very similar API actual_type = to_dali_type(value.dtype) if dtype is None: dtype = actual_type @@ -527,7 +534,7 @@ def Constant(value, dtype = None, shape = None, layout = None, device = None, ** as to fill the requested shape. Otherwise, the number of elements in `value` must match the volume of the shape. layout: string, optional - A string descirbing the layout of the constant tensor, e.g. "HWC" + A string describing the layout of the constant tensor, e.g. "HWC" device: string, optional, "cpu" or "gpu" The device to place the constant tensor in. If specified, it forces the value to become a constant tensor node on given device, diff --git a/dali/test/python/test_eager_operators.py b/dali/test/python/test_eager_operators.py new file mode 100644 index 00000000000..05d5a40f5bc --- /dev/null +++ b/dali/test/python/test_eager_operators.py @@ -0,0 +1,195 @@ +# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +from functools import reduce + +import nvidia.dali.eager.experimental as eager +import nvidia.dali.fn as fn +import nvidia.dali.tensors as tensors +import nvidia.dali.types as types +from nvidia.dali import pipeline_def +from nose_utils import raises +from test_utils import check_batch + +rng = np.random.default_rng() + +batch_size = 2 +sample_shape = [20, 20, 3] +data = [[rng.integers(0, 255, size=sample_shape, dtype=np.uint8) + for _ in range(batch_size)] for _ in range(10)] + + +def get_data(i): + return data[i] + + +@pipeline_def(batch_size=batch_size, num_threads=3, device_id=0) +def single_op_pipe(op, kwargs): + data = fn.external_source(source=get_data, layout="HWC") + out = op(data, **kwargs) + return out + + +def reduce_getattr(x, y): + return getattr(x, y) + + +def compare_eager_with_pipeline(path, batch_size=batch_size, N_iterations=5, fn_op=None, eager_op=None, + **kwargs): + import_path = path.split('.') + if fn_op is None: + fn_op = reduce(reduce_getattr, [fn] + import_path) + if eager_op is None: + eager_op = reduce(reduce_getattr, [eager] + import_path) + + pipe = single_op_pipe(fn_op, kwargs) + pipe.build() + + for i in range(N_iterations): + input_tl = tensors.TensorListCPU(np.array(get_data(i)), layout="HWC") + out1, = pipe.run() + out2 = eager_op(input_tl, **kwargs) + + out1_data = out1.as_cpu() if isinstance(out1, tensors.TensorListGPU) else out1 + out2_data = out2.as_cpu() if isinstance(out2, tensors.TensorListGPU) else out2 + + check_batch(out1_data, out2_data, batch_size) + + +def test_rotate_cpu(): + compare_eager_with_pipeline('rotate', angle=25) + + +def test_brightness_contrast_cpu(): + compare_eager_with_pipeline('brightness_contrast') + + +def test_hue_cpu(): + compare_eager_with_pipeline('hue') + + +def test_brightness_cpu(): + compare_eager_with_pipeline('brightness') + + +def test_contrast_cpu(): + compare_eager_with_pipeline('contrast') + + +def test_hsv_cpu(): + compare_eager_with_pipeline('hsv') + + +def test_color_twist_cpu(): + compare_eager_with_pipeline('color_twist') + + +def test_saturation_cpu(): + compare_eager_with_pipeline('saturation') + + +def test_shapes_cpu(): + compare_eager_with_pipeline('shapes') + + +def test_crop_cpu(): + compare_eager_with_pipeline('crop', crop=(5, 5)) + + +def test_color_space_coversion_cpu(): + compare_eager_with_pipeline('color_space_conversion', + image_type=types.BGR, output_type=types.RGB) + + +def test_cast_cpu(): + compare_eager_with_pipeline('cast', dtype=types.INT32) + + +def test_resize_cpu(): + compare_eager_with_pipeline('resize', resize_x=50, resize_y=50) + + +def test_gaussian_blur_cpu(): + compare_eager_with_pipeline('gaussian_blur', window_size=5) + + +def test_laplacian_cpu(): + compare_eager_with_pipeline('laplacian', window_size=5) + + +def test_crop_mirror_normalize_cpu(): + compare_eager_with_pipeline('crop_mirror_normalize') + + +def test_flip_cpu(): + compare_eager_with_pipeline('flip', horizontal=True) + + +def test_jpeg_compression_distortion_cpu(): + compare_eager_with_pipeline('jpeg_compression_distortion', quality=10) + + +def test_reshape_cpu(): + new_shape = sample_shape.copy() + new_shape[0] //= 2 + new_shape[1] *= 2 + compare_eager_with_pipeline("reshape", shape=new_shape) + + +def test_reinterpret_cpu(): + compare_eager_with_pipeline("reinterpret", rel_shape=[0.5, 1, -1]) + + +def test_water_cpu(): + compare_eager_with_pipeline("water") + + +def test_sphere_cpu(): + compare_eager_with_pipeline("sphere") + + +def test_erase_cpu(): + compare_eager_with_pipeline("erase", anchor=[0.3], axis_names="H", + normalized_anchor=True, shape=[0.1], normalized_shape=True) + + +def test_expand_dims_cpu(): + compare_eager_with_pipeline("expand_dims", axes=1, new_axis_names="Z") + + +def test_coord_transform_cpu(): + M = [0, 0, 1, + 0, 1, 0, + 1, 0, 0] + compare_eager_with_pipeline("coord_transform", M=M, dtype=types.UINT8) + + +def test_grid_mask_cpu(): + compare_eager_with_pipeline("grid_mask", tile=51, ratio=0.38158387, angle=2.6810782) + + +def test_multi_paste_cpu(): + compare_eager_with_pipeline("multi_paste", in_ids=np.array([0, 1]), output_size=sample_shape) + + +@raises(RuntimeError, glob=f"Argument '*' is not supported by eager operator 'crop'.") +def _test_disqualified_argument(key): + tl = tensors.TensorListCPU(np.zeros((8, 256, 256, 3))) + eager.crop(tl, crop=[64, 64], **{key: 0}) + + +def test_disqualified_arguments(): + for arg in ['bytes_per_sample_hint', 'preserve', 'seed']: + yield _test_disqualified_argument, arg diff --git a/dali/test/python/test_pipeline_debug.py b/dali/test/python/test_pipeline_debug.py index bd76d5aad34..36922676f45 100644 --- a/dali/test/python/test_pipeline_debug.py +++ b/dali/test/python/test_pipeline_debug.py @@ -12,16 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np +import os +from nose.plugins.attrib import attr import nvidia.dali.fn as fn import nvidia.dali.types as types from nvidia.dali.pipeline.experimental import pipeline_def -from test_utils import compare_pipelines, get_dali_extra_path - -import numpy as np -import os from nose_utils import raises -from nose.plugins.attrib import attr +from test_utils import compare_pipelines, get_dali_extra_path file_root = os.path.join(get_dali_extra_path(), 'db/single/jpeg') @@ -311,7 +310,7 @@ def kwargs_len_change(): return fn.cat(*inputs, **kwargs) -@raises(RuntimeError, glob='Trying to use operator * with different number of keyward arguments than when it was built.') +@raises(RuntimeError, glob='Trying to use operator * with different number of keyword arguments than when it was built.') def test_kwargs_len_change(): kwargs_len_change.change = True pipe = kwargs_len_change() From 6c844cc4bcb23df21bc37e73a9dcce602b2cf4a8 Mon Sep 17 00:00:00 2001 From: Joaquin Anton Date: Wed, 18 May 2022 15:47:56 +0200 Subject: [PATCH 3/3] Move audio resampler CPU implementation to a single compilation unit (#3914) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joaquin Anton Co-authored-by: MichaƂ Zientkiewicz --- dali/kernels/signal/resampling.h | 253 +--------------- dali/kernels/signal/resampling_cpu.cc | 283 ++++++++++++++++++ dali/kernels/signal/resampling_cpu.h | 58 ++++ dali/kernels/signal/resampling_gpu.cuh | 1 + dali/kernels/signal/resampling_test.cc | 4 +- dali/operators/audio/resample.cc | 4 +- .../decoder/audio/audio_decoder_impl.cc | 6 +- .../decoder/audio/audio_decoder_impl.h | 14 +- .../decoder/audio/audio_decoder_op.h | 4 +- .../operators/reader/loader/nemo_asr_loader.h | 4 +- .../reader/loader/nemo_asr_loader_test.cc | 2 +- 11 files changed, 364 insertions(+), 269 deletions(-) create mode 100644 dali/kernels/signal/resampling_cpu.cc create mode 100644 dali/kernels/signal/resampling_cpu.h diff --git a/dali/kernels/signal/resampling.h b/dali/kernels/signal/resampling.h index dc47d601ea4..8a551bb182c 100644 --- a/dali/kernels/signal/resampling.h +++ b/dali/kernels/signal/resampling.h @@ -15,20 +15,12 @@ #ifndef DALI_KERNELS_SIGNAL_RESAMPLING_H_ #define DALI_KERNELS_SIGNAL_RESAMPLING_H_ -#ifdef __SSE2__ -#include -#endif -#ifdef __ARM_NEON -#include -#endif #include +#include #include #include #include #include "dali/core/math_util.h" -#include "dali/core/small_vector.h" -#include "dali/core/convert.h" -#include "dali/core/static_switch.h" namespace dali { namespace kernels { @@ -45,19 +37,6 @@ inline double Hann(double x) { return 0.5 * (1 + std::cos(x * M_PI)); } -#ifdef __ARM_NEON - -inline float32x4_t vsetq_f32(float x0, float x1, float x2, float x3) { - float32x4_t x; - x = vdupq_n_f32(x0); - x = vsetq_lane_f32(x1, x, 1); - x = vsetq_lane_f32(x2, x, 2); - x = vsetq_lane_f32(x3, x, 3); - return x; -} - -#endif - struct ResamplingWindow { struct InputRange { int i0, i1; @@ -74,7 +53,7 @@ struct ResamplingWindow { * @brief Calculates the window coefficient at an arbitrary floating point position * by interpolating between two samples. */ - inline DALI_HOST_DEV float operator()(float x) const { + DALI_HOST_DEV float operator()(float x) const { float fi = x * scale + center; float floori = floorf(fi); float di = fi - floori; @@ -83,46 +62,6 @@ struct ResamplingWindow { return lookup[i] + di * (lookup[i + 1] - lookup[i]); } -#ifdef __ARM_NEON - inline float32x4_t operator()(float32x4_t x) const { - float32x4_t fi = vfmaq_n_f32(vdupq_n_f32(center), x, scale); - int32x4_t i = vcvtq_s32_f32(fi); - float32x4_t fifloor = vcvtq_f32_s32(i); - float32x4_t di = vsubq_f32(fi, fifloor); - int idx[4] = { - vgetq_lane_s32(i, 0), - vgetq_lane_s32(i, 1), - vgetq_lane_s32(i, 2), - vgetq_lane_s32(i, 3) - }; - float32x2_t c0 = vld1_f32(&lookup[idx[0]]); - float32x2_t c1 = vld1_f32(&lookup[idx[1]]); - float32x2_t c2 = vld1_f32(&lookup[idx[2]]); - float32x2_t c3 = vld1_f32(&lookup[idx[3]]); - float32x4x2_t w = vuzpq_f32(vcombine_f32(c0, c1), vcombine_f32(c2, c3)); - float32x4_t curr = w.val[0]; - float32x4_t next = w.val[1]; - return vfmaq_f32(curr, di, vsubq_f32(next, curr)); - } -#endif - -#ifdef __SSE2__ - inline __m128 operator()(__m128 x) const { - __m128 fi = _mm_add_ps(x * _mm_set1_ps(scale), _mm_set1_ps(center)); - __m128i i = _mm_cvttps_epi32(fi); - __m128 fifloor = _mm_cvtepi32_ps(i); - __m128 di = _mm_sub_ps(fi, fifloor); - int idx[4]; - _mm_storeu_si128(reinterpret_cast<__m128i*>(idx), i); - __m128 curr = _mm_setr_ps(lookup[idx[0]], lookup[idx[1]], - lookup[idx[2]], lookup[idx[3]]); - __m128 next = _mm_setr_ps(lookup[idx[0]+1], lookup[idx[1]+1], - lookup[idx[2]+1], lookup[idx[3]+1]); - return _mm_add_ps(curr, _mm_mul_ps(di, _mm_sub_ps(next, curr))); - } -#endif - - float scale = 1, center = 1; int lobes = 0, coeffs = 0; int lookup_size = 0; @@ -155,198 +94,10 @@ inline void windowed_sinc(ResamplingWindowCPU &window, window.scale = 1 / scale; } - inline int64_t resampled_length(int64_t in_length, double in_rate, double out_rate) { return std::ceil(in_length * out_rate / in_rate); } -struct Resampler { - ResamplingWindowCPU window; - - void Initialize(int lobes = 16, int lookup_size = 2048) { - windowed_sinc(window, lookup_size, lobes); - } - -#if defined(__ARM_NEON) - inline float filter_vec(int &i_ref, float in_pos, int i1, const float *in) const { - const float32x4_t _0123 = vsetq_f32(0, 1, 2, 3); - float32x4_t f4 = vdupq_n_f32(0); - - int i = i_ref; - float32x4_t x4 = vaddq_f32(vdupq_n_f32(i - in_pos), _0123); - - for (; i + 3 < i1; i += 4) { - float32x4_t w4 = window(x4); - f4 = vfmaq_f32(f4, vld1q_f32(in + i), w4); - x4 = vaddq_f32(x4, vdupq_n_f32(4)); - } - // Sum elements in f4 - float32x2_t f2 = vpadd_f32(vget_low_f32(f4), vget_high_f32(f4)); - f2 = vpadd_f32(f2, f2); - i_ref = i; - return vget_lane_f32(f2, 0); - } -#elif defined(__SSE2__) - inline float filter_vec(int &i_ref, float in_pos, int i1, const float *in) const { - __m128 f4 = _mm_setzero_ps(); - int i = i_ref; - __m128 x4 = _mm_setr_ps(i - in_pos, i+1 - in_pos, i+2 - in_pos, i+3 - in_pos); - for (; i + 3 < i1; i += 4) { - __m128 w4 = window(x4); - - f4 = _mm_add_ps(f4, _mm_mul_ps(_mm_loadu_ps(in + i), w4)); - x4 = _mm_add_ps(x4, _mm_set1_ps(4)); - } - i_ref = i; - - // Sum elements in f4 - f4 = _mm_add_ps(f4, _mm_shuffle_ps(f4, f4, _MM_SHUFFLE(1, 0, 3, 2))); - f4 = _mm_add_ps(f4, _mm_shuffle_ps(f4, f4, _MM_SHUFFLE(0, 1, 0, 1))); - return _mm_cvtss_f32(f4); - } -#else - static float filter_vec(int &, float, int, const float *) { - return 0; - } -#endif - - /** - * @brief Resample single-channel signal and convert to Out - * - * Calculates a range of resampled signal. - * The function can seamlessly resample the input and produce the result in chunks. - * To reuse memory and still simulate chunk processing, adjust the in/out pointers. - */ - template - void Resample( - Out *__restrict__ out, int64_t out_begin, int64_t out_end, double out_rate, - const float *__restrict__ in, int64_t n_in, double in_rate) const { - assert(out_rate > 0 && in_rate > 0 && "Sampling rate must be positive"); - int64_t block = 1 << 8; // still leaves 15 significant bits for fractional part - double scale = in_rate / out_rate; - float fscale = scale; - - for (int64_t out_block = out_begin; out_block < out_end; out_block += block) { - int64_t block_end = std::min(out_block + block, out_end); - double in_block_f = out_block * scale; - int64_t in_block_i = std::floor(in_block_f); - float in_pos = in_block_f - in_block_i; - const float *__restrict__ in_block_ptr = in + in_block_i; - for (int64_t out_pos = out_block; out_pos < block_end; out_pos++, in_pos += fscale) { - auto irange = window.input_range(in_pos); - int i0 = irange.i0; - int i1 = irange.i1; - if (i0 + in_block_i < 0) - i0 = -in_block_i; - if (i1 + in_block_i > n_in) - i1 = n_in - in_block_i; - int i = i0; - - float f = filter_vec(i, in_pos, i1, in_block_ptr); - - float x = i - in_pos; - for (; i < i1; i++, x++) { - float w = window(x); - f += in_block_ptr[i] * w; - } - assert(out_pos >= out_begin && out_pos < out_end); - auto rel_pos = out_pos - out_begin; - out[rel_pos] = ConvertSatNorm(f); - } - } - } - - - - /** - * @brief Resample multi-channel signal and convert to Out - * - * Calculates a range of resampled signal. - * The function can seamlessly resample the input and produce the result in chunks. - * To reuse memory and still simulate chunk processing, adjust the in/out pointers. - * - * @tparam static_channels number of channels, if known at compile time, or -1 - */ - template - void Resample( - Out *__restrict__ out, int64_t out_begin, int64_t out_end, double out_rate, - const float *__restrict__ in, int64_t n_in, double in_rate, - int dynamic_num_channels) { - static_assert(static_channels != 0, "Static number of channels must be positive (use static) " - "or negative (use dynamic)."); - assert(out_rate > 0 && in_rate > 0 && "Sampling rate must be positive"); - if (dynamic_num_channels == 1) { - // fast path - Resample(out, out_begin, out_end, out_rate, in, n_in, in_rate); - return; - } - // the check below is compile time, so num_channels will be a compile-time constant - // or a run-time constant, depending on the value of static_channels - const int num_channels = static_channels < 0 ? dynamic_num_channels : static_channels; - assert(num_channels > 0); - - int64_t block = 1 << 8; // still leaves 15 significant bits for fractional part - double scale = in_rate / out_rate; - float fscale = scale; - SmallVector tmp; - tmp.resize(num_channels); - for (int64_t out_block = out_begin; out_block < out_end; out_block += block) { - int64_t block_end = std::min(out_block + block, out_end); - double in_block_f = out_block * scale; - int64_t in_block_i = std::floor(in_block_f); - float in_pos = in_block_f - in_block_i; - const float *__restrict__ in_block_ptr = in + in_block_i * num_channels; - for (int64_t out_pos = out_block; out_pos < block_end; out_pos++, in_pos += fscale) { - auto irange = window.input_range(in_pos); - int i0 = irange.i0; - int i1 = irange.i1; - if (i0 + in_block_i < 0) - i0 = -in_block_i; - if (i1 + in_block_i > n_in) - i1 = n_in - in_block_i; - - for (int c = 0; c < num_channels; c++) - tmp[c] = 0; - - float x = i0 - in_pos; - int ofs0 = i0 * num_channels; - int ofs1 = i1 * num_channels; - for (int in_ofs = ofs0; in_ofs < ofs1; in_ofs += num_channels, x++) { - float w = window(x); - for (int c = 0; c < num_channels; c++) { - assert(in_block_ptr + in_ofs + c >= in && - in_block_ptr + in_ofs + c < in + n_in * num_channels); - tmp[c] += in_block_ptr[in_ofs + c] * w; - } - } - assert(out_pos >= out_begin && out_pos < out_end); - auto rel_pos = out_pos - out_begin; - for (int c = 0; c < num_channels; c++) - out[rel_pos * num_channels + c] = ConvertSatNorm(tmp[c]); - } - } - } - - /** - * @brief Resample multi-channel signal and convert to Out - * - * Calculates a range of resampled signal. - * The function can seamlessly resample the input and produce the result in chunks. - * To reuse memory and still simulate chunk processing, adjust the in/out pointers. - */ - template - void Resample( - Out *__restrict__ out, int64_t out_begin, int64_t out_end, double out_rate, - const float *__restrict__ in, int64_t n_in, double in_rate, - int num_channels) { - VALUE_SWITCH(num_channels, static_channels, (1, 2, 3, 4, 5, 6, 7, 8), - (Resample(out, out_begin, out_end, out_rate, - in, n_in, in_rate, static_channels);), - (Resample<-1, Out>(out, out_begin, out_end, out_rate, - in, n_in, in_rate, num_channels))); - } -}; - } // namespace resampling } // namespace signal } // namespace kernels diff --git a/dali/kernels/signal/resampling_cpu.cc b/dali/kernels/signal/resampling_cpu.cc new file mode 100644 index 00000000000..a96d20ef715 --- /dev/null +++ b/dali/kernels/signal/resampling_cpu.cc @@ -0,0 +1,283 @@ +// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "dali/kernels/signal/resampling_cpu.h" + +#ifdef __SSE2__ +#include +#endif +#ifdef __ARM_NEON +#include +#endif +#include +#include +#include +#include +#include "dali/core/convert.h" +#include "dali/core/math_util.h" +#include "dali/core/small_vector.h" +#include "dali/core/static_switch.h" + +namespace dali { +namespace kernels { +namespace signal { + +namespace resampling { + +#if defined(__ARM_NEON) + +inline float32x4_t evaluate(const ResamplingWindow &window, float32x4_t x) { + float32x4_t fi = vfmaq_n_f32(vdupq_n_f32(window.center), x, window.scale); + int32x4_t i = vcvtq_s32_f32(fi); + float32x4_t fifloor = vcvtq_f32_s32(i); + float32x4_t di = vsubq_f32(fi, fifloor); + int idx[4] = {vgetq_lane_s32(i, 0), vgetq_lane_s32(i, 1), vgetq_lane_s32(i, 2), + vgetq_lane_s32(i, 3)}; + float32x2_t c0 = vld1_f32(&window.lookup[idx[0]]); + float32x2_t c1 = vld1_f32(&window.lookup[idx[1]]); + float32x2_t c2 = vld1_f32(&window.lookup[idx[2]]); + float32x2_t c3 = vld1_f32(&window.lookup[idx[3]]); + float32x4x2_t w = vuzpq_f32(vcombine_f32(c0, c1), vcombine_f32(c2, c3)); + float32x4_t curr = w.val[0]; + float32x4_t next = w.val[1]; + return vfmaq_f32(curr, di, vsubq_f32(next, curr)); +} + +inline float32x4_t vsetq_f32(float x0, float x1, float x2, float x3) { + float32x4_t x; + x = vdupq_n_f32(x0); + x = vsetq_lane_f32(x1, x, 1); + x = vsetq_lane_f32(x2, x, 2); + x = vsetq_lane_f32(x3, x, 3); + return x; +} + +inline float filter_vec(const ResamplingWindow &window, int &i_ref, float in_pos, int i1, + const float *in) { + const float32x4_t _0123 = vsetq_f32(0, 1, 2, 3); + float32x4_t f4 = vdupq_n_f32(0); + + int i = i_ref; + float32x4_t x4 = vaddq_f32(vdupq_n_f32(i - in_pos), _0123); + + for (; i + 3 < i1; i += 4) { + float32x4_t w4 = evaluate(window, x4); + f4 = vfmaq_f32(f4, vld1q_f32(in + i), w4); + x4 = vaddq_f32(x4, vdupq_n_f32(4)); + } + // Sum elements in f4 + float32x2_t f2 = vpadd_f32(vget_low_f32(f4), vget_high_f32(f4)); + f2 = vpadd_f32(f2, f2); + i_ref = i; + return vget_lane_f32(f2, 0); +} + +#elif defined(__SSE2__) + +inline __m128 evaluate(const ResamplingWindow &window, __m128 x) { + __m128 fi = _mm_add_ps(x * _mm_set1_ps(window.scale), _mm_set1_ps(window.center)); + __m128i i = _mm_cvttps_epi32(fi); + __m128 fifloor = _mm_cvtepi32_ps(i); + __m128 di = _mm_sub_ps(fi, fifloor); + int idx[4]; + _mm_storeu_si128(reinterpret_cast<__m128i *>(idx), i); + __m128 curr = _mm_setr_ps(window.lookup[idx[0]], window.lookup[idx[1]], + window.lookup[idx[2]], window.lookup[idx[3]]); + __m128 next = _mm_setr_ps(window.lookup[idx[0] + 1], window.lookup[idx[1] + 1], + window.lookup[idx[2] + 1], window.lookup[idx[3] + 1]); + return _mm_add_ps(curr, _mm_mul_ps(di, _mm_sub_ps(next, curr))); +} + +inline float filter_vec(const ResamplingWindow &window, int &i_ref, float in_pos, int i1, + const float *in) { + __m128 f4 = _mm_setzero_ps(); + int i = i_ref; + __m128 x4 = _mm_setr_ps(i - in_pos, i + 1 - in_pos, i + 2 - in_pos, i + 3 - in_pos); + for (; i + 3 < i1; i += 4) { + __m128 w4 = evaluate(window, x4); + + f4 = _mm_add_ps(f4, _mm_mul_ps(_mm_loadu_ps(in + i), w4)); + x4 = _mm_add_ps(x4, _mm_set1_ps(4)); + } + i_ref = i; + + // Sum elements in f4 + f4 = _mm_add_ps(f4, _mm_shuffle_ps(f4, f4, _MM_SHUFFLE(1, 0, 3, 2))); + f4 = _mm_add_ps(f4, _mm_shuffle_ps(f4, f4, _MM_SHUFFLE(0, 1, 0, 1))); + return _mm_cvtss_f32(f4); +} + +#else + +inline float filter_vec(const ResamplingWindow &, int &, float, int, const float *) { + return 0; +} + +#endif + +/** + * @brief Resample single-channel signal and convert to Out + * + * Calculates a range of resampled signal. + * The function can seamlessly resample the input and produce the result in chunks. + * To reuse memory and still simulate chunk processing, adjust the in/out pointers. + */ +template +void ResampleCPUImpl(ResamplingWindow window, Out *__restrict__ out, int64_t out_begin, + int64_t out_end, double out_rate, const float *__restrict__ in, int64_t n_in, + double in_rate) { + assert(out_rate > 0 && in_rate > 0 && "Sampling rate must be positive"); + int64_t block = 1 << 8; // still leaves 15 significant bits for fractional part + double scale = in_rate / out_rate; + float fscale = scale; + + for (int64_t out_block = out_begin; out_block < out_end; out_block += block) { + int64_t block_end = std::min(out_block + block, out_end); + double in_block_f = out_block * scale; + int64_t in_block_i = std::floor(in_block_f); + float in_pos = in_block_f - in_block_i; + const float *__restrict__ in_block_ptr = in + in_block_i; + for (int64_t out_pos = out_block; out_pos < block_end; out_pos++, in_pos += fscale) { + auto irange = window.input_range(in_pos); + int i0 = irange.i0; + int i1 = irange.i1; + if (i0 + in_block_i < 0) + i0 = -in_block_i; + if (i1 + in_block_i > n_in) + i1 = n_in - in_block_i; + int i = i0; + + float f = filter_vec(window, i, in_pos, i1, in_block_ptr); + + float x = i - in_pos; + for (; i < i1; i++, x++) { + float w = window(x); + f += in_block_ptr[i] * w; + } + assert(out_pos >= out_begin && out_pos < out_end); + auto rel_pos = out_pos - out_begin; + out[rel_pos] = ConvertSatNorm(f); + } + } +} + +/** + * @brief Resample multi-channel signal and convert to Out + * + * Calculates a range of resampled signal. + * The function can seamlessly resample the input and produce the result in chunks. + * To reuse memory and still simulate chunk processing, adjust the in/out pointers. + * + * @tparam static_channels number of channels, if known at compile time, or -1 + */ +template +void ResampleCPUImpl(ResamplingWindow window, Out *__restrict__ out, int64_t out_begin, + int64_t out_end, double out_rate, const float *__restrict__ in, int64_t n_in, + double in_rate, int dynamic_num_channels) { + static_assert(static_channels != 0, + "Static number of channels must be positive (use static) " + "or negative (use dynamic)."); + assert(out_rate > 0 && in_rate > 0 && "Sampling rate must be positive"); + if (dynamic_num_channels == 1) { + // fast path + ResampleCPUImpl(window, out, out_begin, out_end, out_rate, in, n_in, in_rate); + return; + } + // the check below is compile time, so num_channels will be a compile-time constant + // or a run-time constant, depending on the value of static_channels + const int num_channels = static_channels < 0 ? dynamic_num_channels : static_channels; + assert(num_channels > 0); + + int64_t block = 1 << 8; // still leaves 15 significant bits for fractional part + double scale = in_rate / out_rate; + float fscale = scale; + SmallVector tmp; + tmp.resize(num_channels); + for (int64_t out_block = out_begin; out_block < out_end; out_block += block) { + int64_t block_end = std::min(out_block + block, out_end); + double in_block_f = out_block * scale; + int64_t in_block_i = std::floor(in_block_f); + float in_pos = in_block_f - in_block_i; + const float *__restrict__ in_block_ptr = in + in_block_i * num_channels; + for (int64_t out_pos = out_block; out_pos < block_end; out_pos++, in_pos += fscale) { + auto irange = window.input_range(in_pos); + int i0 = irange.i0; + int i1 = irange.i1; + if (i0 + in_block_i < 0) + i0 = -in_block_i; + if (i1 + in_block_i > n_in) + i1 = n_in - in_block_i; + + for (int c = 0; c < num_channels; c++) + tmp[c] = 0; + + float x = i0 - in_pos; + int ofs0 = i0 * num_channels; + int ofs1 = i1 * num_channels; + for (int in_ofs = ofs0; in_ofs < ofs1; in_ofs += num_channels, x++) { + float w = window(x); + for (int c = 0; c < num_channels; c++) { + assert(in_block_ptr + in_ofs + c >= in && + in_block_ptr + in_ofs + c < in + n_in * num_channels); + tmp[c] += in_block_ptr[in_ofs + c] * w; + } + } + assert(out_pos >= out_begin && out_pos < out_end); + auto rel_pos = out_pos - out_begin; + for (int c = 0; c < num_channels; c++) + out[rel_pos * num_channels + c] = ConvertSatNorm(tmp[c]); + } + } +} + +/** + * @brief Resample multi-channel (or single channel) signal and convert to Out + * + * Calculates a range of resampled signal. + * The function can resample a region-of-interest (ROI) of the output, specified by `out_begin` and + * `out_end`. In this case, the output pointer points to the beginning of the ROI. + */ +template +void ResampleCPUImpl(ResamplingWindow window, Out *__restrict__ out, int64_t out_begin, + int64_t out_end, double out_rate, const float *__restrict__ in, int64_t n_in, + double in_rate, int num_channels) { + VALUE_SWITCH(num_channels, static_channels, (1, 2, 3, 4, 5, 6, 7, 8), + (ResampleCPUImpl(window, out, out_begin, out_end, out_rate, + in, n_in, in_rate, static_channels);), + (ResampleCPUImpl<-1, Out>(window, out, out_begin, out_end, out_rate, + in, n_in, in_rate, num_channels))); +} + +#define DALI_INSTANTIATE_RESAMPLER_CPU_OUT(Out) \ + template void ResampleCPUImpl(ResamplingWindow window, Out *__restrict__ out, \ + int64_t out_begin, int64_t out_end, double out_rate, \ + const float *__restrict__ in, int64_t n_in, double in_rate, \ + int num_channels); + +#define DALI_INSTANTIATE_RESAMPLER_CPU() \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(float); \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(int8_t); \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(uint8_t); \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(int16_t); \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(uint16_t); \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(int32_t); \ + DALI_INSTANTIATE_RESAMPLER_CPU_OUT(uint32_t); + +DALI_INSTANTIATE_RESAMPLER_CPU(); + + +} // namespace resampling +} // namespace signal +} // namespace kernels +} // namespace dali diff --git a/dali/kernels/signal/resampling_cpu.h b/dali/kernels/signal/resampling_cpu.h new file mode 100644 index 00000000000..c5aabb1fde5 --- /dev/null +++ b/dali/kernels/signal/resampling_cpu.h @@ -0,0 +1,58 @@ +// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DALI_KERNELS_SIGNAL_RESAMPLING_CPU_H_ +#define DALI_KERNELS_SIGNAL_RESAMPLING_CPU_H_ + +#include "dali/kernels/signal/resampling.h" +#include "dali/core/api_helper.h" + +namespace dali { +namespace kernels { +namespace signal { + +namespace resampling { + +template +DLL_PUBLIC void ResampleCPUImpl(ResamplingWindow window, Out *__restrict__ out, int64_t out_begin, + int64_t out_end, double out_rate, const float *__restrict__ in, + int64_t n_in, double in_rate, int num_channels); + +struct DLL_PUBLIC ResamplerCPU { + ResamplingWindowCPU window; + + inline void Initialize(int lobes = 16, int lookup_size = 2048) { + windowed_sinc(window, lookup_size, lobes); + } + + /** + * @brief Resample multi-channel (or single channel) signal and convert to Out + * + * Calculates a range of resampled signal. + * The function can resample a region-of-interest (ROI) of the output, specified by `out_begin` and + * `out_end`. In this case, the output pointer points to the beginning of the ROI. + */ + template + void Resample(Out *__restrict__ out, int64_t out_begin, int64_t out_end, double out_rate, + const float *__restrict__ in, int64_t n_in, double in_rate, int num_channels) { + ResampleCPUImpl(window, out, out_begin, out_end, out_rate, in, n_in, in_rate, num_channels); + } +}; + +} // namespace resampling +} // namespace signal +} // namespace kernels +} // namespace dali + +#endif // DALI_KERNELS_SIGNAL_RESAMPLING_CPU_H_ diff --git a/dali/kernels/signal/resampling_gpu.cuh b/dali/kernels/signal/resampling_gpu.cuh index 452f7f1c72d..e93ce31b2dc 100644 --- a/dali/kernels/signal/resampling_gpu.cuh +++ b/dali/kernels/signal/resampling_gpu.cuh @@ -18,6 +18,7 @@ #include #include "dali/kernels/signal/resampling.h" #include "dali/core/util.h" +#include "dali/core/convert.h" #define SHM_NCHANNELS 16 diff --git a/dali/kernels/signal/resampling_test.cc b/dali/kernels/signal/resampling_test.cc index 723676f1cca..f76f98dd858 100644 --- a/dali/kernels/signal/resampling_test.cc +++ b/dali/kernels/signal/resampling_test.cc @@ -15,7 +15,7 @@ #include #include #include -#include "dali/kernels/signal/resampling.h" +#include "dali/kernels/signal/resampling_cpu.h" #include "dali/kernels/signal/resampling_test.h" namespace dali { @@ -121,7 +121,7 @@ void ResamplingTest::RunTest() { class ResamplingCPUTest : public ResamplingTest { public: void RunResampling(span args) override { - Resampler R; + ResamplerCPU R; R.Initialize(16); ASSERT_EQ(args.size(), nsamples_); diff --git a/dali/operators/audio/resample.cc b/dali/operators/audio/resample.cc index 0e49fe61e50..701bb3ca252 100644 --- a/dali/operators/audio/resample.cc +++ b/dali/operators/audio/resample.cc @@ -14,9 +14,11 @@ #include #include +#include "dali/kernels/signal/resampling_cpu.h" #include "dali/operators/audio/resample.h" #include "dali/operators/audio/resampling_params.h" #include "dali/kernels/kernel_params.h" +#include "dali/core/convert.h" namespace dali { @@ -176,7 +178,7 @@ class ResampleCPU : public ResampleBase { } private: - kernels::signal::resampling::Resampler R; + kernels::signal::resampling::ResamplerCPU R; std::vector> in_fp32; }; diff --git a/dali/operators/decoder/audio/audio_decoder_impl.cc b/dali/operators/decoder/audio/audio_decoder_impl.cc index 3001bb3a19e..9fe7545de29 100644 --- a/dali/operators/decoder/audio/audio_decoder_impl.cc +++ b/dali/operators/decoder/audio/audio_decoder_impl.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ TensorShape<> DecodedAudioShape(const AudioMetadata &meta, float target_sample_r template void DecodeAudio(TensorView audio, AudioDecoderBase &decoder, - const AudioMetadata &meta, kernels::signal::resampling::Resampler &resampler, + const AudioMetadata &meta, kernels::signal::resampling::ResamplerCPU &resampler, span decode_scratch_mem, span resample_scratch_mem, float target_sample_rate, bool downmix, @@ -107,7 +107,7 @@ void DecodeAudio(TensorView audio, AudioDecode #define DECLARE_IMPL(OutType) \ template void DecodeAudio( \ TensorView audio, AudioDecoderBase & decoder, \ - const AudioMetadata &meta, kernels::signal::resampling::Resampler &resampler, \ + const AudioMetadata &meta, kernels::signal::resampling::ResamplerCPU &resampler, \ span decode_scratch_mem, span resample_scratch_mem, \ float target_sample_rate, bool downmix, const char *audio_filepath); diff --git a/dali/operators/decoder/audio/audio_decoder_impl.h b/dali/operators/decoder/audio/audio_decoder_impl.h index 1ae0007f742..957bba96f53 100644 --- a/dali/operators/decoder/audio/audio_decoder_impl.h +++ b/dali/operators/decoder/audio/audio_decoder_impl.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ #include "dali/operators/decoder/audio/audio_decoder.h" #include "dali/operators/decoder/audio/generic_decoder.h" #include "dali/pipeline/data/backend.h" -#include "dali/kernels/signal/resampling.h" +#include "dali/kernels/signal/resampling_cpu.h" #include "dali/core/tensor_view.h" namespace dali { @@ -42,7 +42,7 @@ DLL_PUBLIC std::pair ProcessOffsetAndLength(const AudioMetadat * @param target_sample_rate If a positive number is provided, it represent the target sampling rate * (the audio data is expected to be resampled if its original sampling rate differs) * @param downmix If set to true, the audio channels are expected to be downmixed, resulting in a shape with 1 - * dimension ({nsamples,}), instead of 2 ({nsamples, nchannels}) + * dimension ({nsamples,}), instead of 2 ({nsamples, nchannels}) */ DLL_PUBLIC TensorShape<> DecodedAudioShape(const AudioMetadata &meta, float target_sample_rate = -1, bool downmix = true); @@ -52,23 +52,23 @@ DLL_PUBLIC TensorShape<> DecodedAudioShape(const AudioMetadata &meta, float targ * @param audio Destination buffer. The function will decode as many audio samples as the shape of this argument * @param decoder Decoder object. * @param meta Audio metadata. - * @param resampler Resampler instance used if resampling is required + * @param resampler ResamplerCPU instance used if resampling is required * @param decode_scratch_mem Scratch memory used for decoding, when decoding can't be done directly to the output buffer. * If downmixing or resampling is required, this buffer should have a positive length, representing * decoded audio length at the original sampling rate: ``length * nchannels`` * @param resample_scratch_mem Scratch memory used for the input of resampling. - * If resampling is required, the buffer should have a positive length, representing the + * If resampling is required, the buffer should have a positive length, representing the * decoded audio length, ``length`` if downmixing is enabled, or the decoded audio length including * channels, ``length * nchannels``, otherwise. * @param target_sample_rate If a positive value is provided, the signal will be resampled except when its original sampling rate * is equal to the target. * @param downmix If true, the audio channes will be downmixed to a single one - * @param audio_filepath Path to the audio file being decoded, only used for debugging purposes + * @param audio_filepath Path to the audio file being decoded, only used for debugging purposes */ template DLL_PUBLIC void DecodeAudio(TensorView audio, AudioDecoderBase &decoder, const AudioMetadata &meta, - kernels::signal::resampling::Resampler &resampler, + kernels::signal::resampling::ResamplerCPU &resampler, span decode_scratch_mem, span resample_scratch_mem, float target_sample_rate, bool downmix, const char *audio_filepath); diff --git a/dali/operators/decoder/audio/audio_decoder_op.h b/dali/operators/decoder/audio/audio_decoder_op.h index 17d29ca98c6..d46bbb4a5a1 100644 --- a/dali/operators/decoder/audio/audio_decoder_op.h +++ b/dali/operators/decoder/audio/audio_decoder_op.h @@ -26,7 +26,7 @@ #include "dali/pipeline/workspace/workspace.h" #include "dali/pipeline/operator/operator.h" #include "dali/pipeline/workspace/host_workspace.h" -#include "dali/kernels/signal/resampling.h" +#include "dali/kernels/signal/resampling_cpu.h" #include "dali/kernels/signal/downmixing.h" #include "dali/core/tensor_view.h" @@ -83,7 +83,7 @@ class AudioDecoderCpu : public Operator { } std::vector target_sample_rates_; - kernels::signal::resampling::Resampler resampler_; + kernels::signal::resampling::ResamplerCPU resampler_; DALIDataType output_type_ = DALI_NO_TYPE, decode_type_ = DALI_NO_TYPE; const bool downmix_ = false, use_resampling_ = false; const float quality_ = 50.0f; diff --git a/dali/operators/reader/loader/nemo_asr_loader.h b/dali/operators/reader/loader/nemo_asr_loader.h index e1cf137e82b..f91c453e5f8 100644 --- a/dali/operators/reader/loader/nemo_asr_loader.h +++ b/dali/operators/reader/loader/nemo_asr_loader.h @@ -25,7 +25,7 @@ #include "dali/core/common.h" #include "dali/core/error_handling.h" -#include "dali/kernels/signal/resampling.h" +#include "dali/kernels/signal/resampling_cpu.h" #include "dali/operators/decoder/audio/audio_decoder.h" #include "dali/operators/decoder/audio/audio_decoder_impl.h" #include "dali/operators/reader/loader/file_label_loader.h" @@ -181,7 +181,7 @@ class DLL_PUBLIC NemoAsrLoader : public Loader { double max_duration_; bool read_text_; int num_threads_; - kernels::signal::resampling::Resampler resampler_; + kernels::signal::resampling::ResamplerCPU resampler_; std::vector> decode_scratch_; std::vector> resample_scratch_; }; diff --git a/dali/operators/reader/loader/nemo_asr_loader_test.cc b/dali/operators/reader/loader/nemo_asr_loader_test.cc index 9adb476628b..c5726c5302c 100644 --- a/dali/operators/reader/loader/nemo_asr_loader_test.cc +++ b/dali/operators/reader/loader/nemo_asr_loader_test.cc @@ -283,7 +283,7 @@ TEST(NemoAsrLoaderTest, ReadSample) { std::vector downsampled(downsampled_len, 0.0f); constexpr double q = 50.0; int lobes = std::round(0.007 * q * q - 0.09 * q + 3); - kernels::signal::resampling::Resampler resampler; + kernels::signal::resampling::ResamplerCPU resampler; resampler.Initialize(lobes, lobes * 64 + 1); resampler.Resample(downsampled.data(), 0, downsampled_len, sr_out, downmixed.data(), downmixed.size(), sr_in, 1);