From 05a6f4fa232c79fa065979a7540a705d666fc8cb Mon Sep 17 00:00:00 2001 From: Maksim Kutakov Date: Mon, 16 Dec 2024 16:11:54 +0100 Subject: [PATCH] [CPU] Throw when release_memory is called during inference (#27520) ### Details: This PR changes the behavior of the `CompiledModel::release_memory()` implementation in the CPU plugin for the situation when the method is being called concurrently with the other graph state modifying methods (e.g. graph initialization, inference, properties request). This is necessary to ensure thread safety and provide a clear defined behavior when the method is called concurrently. Also, the PR contains some refactoring of the Infer request implementation, aimed at decoupling the InferRequest implementation from the compiled model internals and providing a safer interface that ensures thread safe access to the CPU graph structures. --- src/plugins/intel_cpu/src/compiled_model.cpp | 9 +- src/plugins/intel_cpu/src/compiled_model.h | 76 ++++++- src/plugins/intel_cpu/src/graph.cpp | 19 +- src/plugins/intel_cpu/src/graph.h | 38 +++- src/plugins/intel_cpu/src/infer_request.cpp | 161 ++++++-------- src/plugins/intel_cpu/src/infer_request.h | 15 +- src/plugins/intel_cpu/src/nodes/composite.cpp | 4 +- src/plugins/intel_cpu/src/nodes/if.cpp | 24 +-- src/plugins/intel_cpu/src/nodes/lora.cpp | 4 +- src/plugins/intel_cpu/src/nodes/memory.hpp | 2 +- .../intel_cpu/src/nodes/tensoriterator.cpp | 12 +- .../concurent_release_memory.cpp | 198 ++++++++++++++++++ 12 files changed, 406 insertions(+), 156 deletions(-) create mode 100644 src/plugins/intel_cpu/tests/functional/custom/behavior/ov_executable_network/concurent_release_memory.cpp diff --git a/src/plugins/intel_cpu/src/compiled_model.cpp b/src/plugins/intel_cpu/src/compiled_model.cpp index f0f9668ca3ae50..f81c7dbbced99d 100644 --- a/src/plugins/intel_cpu/src/compiled_model.cpp +++ b/src/plugins/intel_cpu/src/compiled_model.cpp @@ -184,7 +184,6 @@ CompiledModel::GraphGuard::Lock CompiledModel::get_graph() const { } std::shared_ptr CompiledModel::create_sync_infer_request() const { - m_numRequests++; return std::make_shared(std::static_pointer_cast(shared_from_this())); } @@ -344,8 +343,12 @@ void CompiledModel::export_model(std::ostream& modelStream) const { void CompiledModel::release_memory() { for (auto&& graph : m_graphs) { - GraphGuard::Lock graph_lock{graph}; - auto ctx = graph_lock._graph.getGraphContext(); + // try to lock mutex, since it may be already locked (e.g by an infer request) + std::unique_lock lock(graph._mutex, std::try_to_lock); + OPENVINO_ASSERT(lock.owns_lock(), + "Attempt to call release_memory() on a compiled model in a busy state. Please ensure that all " + "infer requests are completed before releasing memory."); + auto ctx = graph.getGraphContext(); ctx->getNetworkMemoryControl()->releaseMemory(); } } diff --git a/src/plugins/intel_cpu/src/compiled_model.h b/src/plugins/intel_cpu/src/compiled_model.h index dc3735b4f3b63e..f7d2903b0526cf 100644 --- a/src/plugins/intel_cpu/src/compiled_model.h +++ b/src/plugins/intel_cpu/src/compiled_model.h @@ -20,6 +20,15 @@ namespace ov { namespace intel_cpu { class CompiledModel : public ov::ICompiledModel { +public: + struct GraphGuard : public Graph { + std::mutex _mutex; + struct Lock : public std::unique_lock { + explicit Lock(GraphGuard& graph) : std::unique_lock(graph._mutex), _graph(graph) {} + GraphGuard& _graph; + }; + }; + public: typedef std::shared_ptr Ptr; @@ -51,9 +60,13 @@ class CompiledModel : public ov::ICompiledModel { void release_memory() override; + std::string name() const { + return m_name; + } + private: std::shared_ptr create_sync_infer_request() const override; - friend class SyncInferRequest; + friend class CompiledModelHolder; const std::shared_ptr m_model; const std::shared_ptr m_plugin; @@ -66,13 +79,6 @@ class CompiledModel : public ov::ICompiledModel { Config m_cfg; mutable std::atomic_int m_numRequests = {0}; std::string m_name; - struct GraphGuard : public Graph { - std::mutex _mutex; - struct Lock : public std::unique_lock { - explicit Lock(GraphGuard& graph) : std::unique_lock(graph._mutex), _graph(graph) {} - GraphGuard& _graph; - }; - }; const bool m_loaded_from_cache; // WARNING: Do not use m_graphs directly. @@ -94,5 +100,59 @@ class CompiledModel : public ov::ICompiledModel { bool m_has_sub_compiled_models = false; }; +// This class provides safe access to the internal CompiledModel structures and helps to decouple SyncInferRequest and +// the CompiledModel internal structures +class CompiledModelHolder { +public: + CompiledModelHolder(std::shared_ptr compiled_model) + : m_compiled_model(std::move(compiled_model)) { + OPENVINO_ASSERT(!m_compiled_model->m_graphs.empty(), + "No graph was found in the compiled model: ", + m_compiled_model->name()); + m_graph = &(m_compiled_model->get_graph()._graph); + m_id = (m_compiled_model->m_numRequests)++; + } + + ~CompiledModelHolder() { + if (m_compiled_model) { + --(m_compiled_model->m_numRequests); + } + } + + CompiledModelHolder(const CompiledModelHolder&) = delete; + CompiledModelHolder& operator=(const CompiledModelHolder&) = delete; + + CompiledModelHolder(CompiledModelHolder&&) = default; + CompiledModelHolder& operator=(CompiledModelHolder&&) = default; + + const Graph& graph() const { + return *m_graph; + } + + CompiledModel::GraphGuard::Lock lock() { + auto lock = m_compiled_model->get_graph(); + m_graph = &(lock._graph); + OPENVINO_ASSERT(m_graph, "Graph ptr null check failed"); + return lock; + } + + std::string name() const { + return m_compiled_model->name(); + } + + std::shared_ptr compiled_model() const { + return m_compiled_model; + } + + int id() const { + return m_id; + } + +private: + std::shared_ptr m_compiled_model; + const Graph* m_graph; + int m_id; +}; + } // namespace intel_cpu } // namespace ov diff --git a/src/plugins/intel_cpu/src/graph.cpp b/src/plugins/intel_cpu/src/graph.cpp index 7fb5f512227cf9..fd6721ce4c83ad 100644 --- a/src/plugins/intel_cpu/src/graph.cpp +++ b/src/plugins/intel_cpu/src/graph.cpp @@ -1940,8 +1940,23 @@ std::shared_ptr Graph::dump() const { return dump_graph_as_ie_ngraph_net(*this); } -const std::unordered_map& Graph::getInternalStateNodes() const { - return m_context->getMemoryStatesRegister()->getMemoryStates(); +std::vector Graph::memoryStates() const { + std::vector resultVector; + + for (auto&& item : m_context->getMemoryStatesRegister()->getMemoryStates()) { + resultVector.emplace_back(item.second->makeState()); + } + return resultVector; +} + +void Graph::assignStates(const std::vector& states) { + auto&& inputStateNodes = m_context->getMemoryStatesRegister()->getMemoryStates(); + for (const auto& state : states) { + auto itr = inputStateNodes.find(state->get_name()); + if (itr != inputStateNodes.end()) { + itr->second->assignState(state); + } + } } } // namespace intel_cpu diff --git a/src/plugins/intel_cpu/src/graph.h b/src/plugins/intel_cpu/src/graph.h index bdf3205d2baaaf..5d5d5b335a36f2 100644 --- a/src/plugins/intel_cpu/src/graph.h +++ b/src/plugins/intel_cpu/src/graph.h @@ -14,6 +14,7 @@ #include "edge.h" #include "graph_context.h" #include "memory_control.hpp" +#include "memory_state.h" #include "node.h" #include "nodes/input.h" #include "openvino/core/node_vector.hpp" @@ -87,28 +88,42 @@ class Graph { return _name; } - std::map& GetInputNodesMap() { - return inputNodesMap; + NodePtr getInputNodeByIndex(std::size_t index) { + auto input = inputNodesMap.find(index); + if (input == inputNodesMap.end()) + return nullptr; + return input->second; } - std::map& GetOutputNodesMap() { - return outputNodesMap; + NodePtr getOutputNodeByIndex(std::size_t index) { + auto output = outputNodesMap.find(index); + if (output == outputNodesMap.end()) + return nullptr; + return output->second; } - NodePtr getInputNodeByIndex(const std::size_t& index) { + NodeConstPtr getInputNodeByIndex(std::size_t index) const { auto input = inputNodesMap.find(index); if (input == inputNodesMap.end()) - OPENVINO_THROW("CPU execution graph doesn't contain input node with index: ", index); + return nullptr; return input->second; } - NodePtr getOutputNodeByIndex(const std::size_t& index) { + NodeConstPtr getOutputNodeByIndex(std::size_t index) const { auto output = outputNodesMap.find(index); if (output == outputNodesMap.end()) - OPENVINO_THROW("CPU execution graph doesn't contain output node with index: ", index); + return nullptr; return output->second; } + size_t inputsNumber() const { + return inputNodesMap.size(); + } + + size_t outputsNumber() const { + return outputNodesMap.size(); + } + dnnl::engine getEngine() const { return m_context->getEngine(); } @@ -117,6 +132,9 @@ class Graph { return m_context; } + std::vector memoryStates() const; + void assignStates(const std::vector& state); + void GetPerfData(std::vector& perfMap) const; void CreateEdge(const NodePtr& parent, const NodePtr& child, int parentPort = 0, int childPort = 0); @@ -202,8 +220,6 @@ class Graph { return graphHasDynamicInput; } - const std::unordered_map& getInternalStateNodes() const; - /** * Init graph using \p model, \p context, \p inputConfigs and \p outputConfigs */ @@ -218,7 +234,7 @@ class Graph { void Activate(const std::vector& externalInputMemory = {}, const std::vector& externalOutputMemory = {}); - const std::unordered_map& getOutputNodesMemBlocksMap() const { + const std::unordered_map& getOutputNodesMemBlocksMap() { return outputNodesMemBlocksMap; } diff --git a/src/plugins/intel_cpu/src/infer_request.cpp b/src/plugins/intel_cpu/src/infer_request.cpp index 3cfc34589623d2..44b9904bde202a 100644 --- a/src/plugins/intel_cpu/src/infer_request.cpp +++ b/src/plugins/intel_cpu/src/infer_request.cpp @@ -5,11 +5,9 @@ #include "infer_request.h" #include "async_infer_request.h" -#include "compiled_model.h" #include "dnnl_extension_utils.h" #include "itt.h" #include "memory_desc/cpu_memory_desc_utils.h" -#include "memory_state.h" #include "nodes/common/cpu_convert.h" #include "nodes/memory_state_base.h" #include "openvino/core/shape.hpp" @@ -24,9 +22,9 @@ using OvString = ov::element_type_traits::value_type; namespace ov { namespace intel_cpu { -SyncInferRequest::SyncInferRequest(std::shared_ptr compiled_model) - : ov::ISyncInferRequest(compiled_model), - m_compiled_model(compiled_model) { +SyncInferRequest::SyncInferRequest(CompiledModelHolder compiled_model) + : ov::ISyncInferRequest(compiled_model.compiled_model()), + m_compiled_model(std::move(compiled_model)) { const auto& inputs = get_inputs(); for (std::size_t input_index = 0; input_index < inputs.size(); input_index++) { m_input_ports_map[input_index] = inputs[input_index]; @@ -40,13 +38,8 @@ SyncInferRequest::SyncInferRequest(std::shared_ptr compiled } void SyncInferRequest::create_infer_request() { - auto id = (m_compiled_model->m_numRequests)++; - m_profiling_task = openvino::itt::handle("INTEL_CPU_INFER_" + m_compiled_model->m_name + "_" + std::to_string(id)); - - if (m_compiled_model->m_graphs.size() == 0) { - OPENVINO_THROW("No graph was found"); - } - m_graph = &(m_compiled_model->get_graph()._graph); + m_profiling_task = openvino::itt::handle("INTEL_CPU_INFER_" + m_compiled_model.name() + "_" + + std::to_string(m_compiled_model.id())); // Alocate memory for each tensor if static shape for (const auto& it : m_input_ports_map) { @@ -57,35 +50,16 @@ void SyncInferRequest::create_infer_request() { } // create states according to the list of the MemoryStateNodes - for (auto&& node : m_graph->getInternalStateNodes()) { - m_memory_states.emplace_back(node.second->makeState()); - } -} - -SyncInferRequest::~SyncInferRequest() { - --(m_compiled_model->m_numRequests); -} - -// state -> storage -void SyncInferRequest::assign_states() { - auto&& graph_internal_state_nodes = m_graph->getInternalStateNodes(); - for (const auto& state : m_memory_states) { - auto itr = graph_internal_state_nodes.find(state->get_name()); - if (itr != graph_internal_state_nodes.end()) { - itr->second->assignState(state); - } - } + m_memory_states = m_compiled_model.graph().memoryStates(); } -void SyncInferRequest::redefine_memory_for_input_nodes() { - const auto cpuInputNodes = m_graph->GetInputNodesMap(); +void SyncInferRequest::redefine_memory_for_input_nodes(Graph& graph) { for (const auto& input_port : m_input_ports_map) { - const auto inputNode = cpuInputNodes.find(input_port.first); - if (inputNode == cpuInputNodes.end()) - OPENVINO_THROW("CPU execution graph doesn't contain input node with index: ", input_port.first); - if (inputNode->second->isDynamicNode()) { + auto inputNode = graph.getInputNodeByIndex(input_port.first); + OPENVINO_ASSERT(inputNode, "CPU execution graph doesn't contain output node with index: ", input_port.first); + if (inputNode->isDynamicNode()) { auto tensor = get_tensor(input_port.second); - inputNode->second->redefineOutputMemory({tensor->get_shape()}); + inputNode->redefineOutputMemory({tensor->get_shape()}); } } } @@ -103,8 +77,8 @@ void SyncInferRequest::update_external_tensor_ptrs() { void SyncInferRequest::infer() { using namespace openvino::itt; OV_ITT_SCOPED_TASK(itt::domains::intel_cpu, m_profiling_task); - auto graphLock = m_compiled_model->get_graph(); - m_graph = &(graphLock._graph); + auto graphLock = m_compiled_model.lock(); + auto&& graph = graphLock._graph; auto message = ov::threading::message_manager(); throw_if_canceled(); @@ -120,40 +94,41 @@ void SyncInferRequest::infer() { update_external_tensor_ptrs(); } - if (m_graph->hasDynamicInput()) { - redefine_memory_for_input_nodes(); + if (graph.hasDynamicInput()) { + redefine_memory_for_input_nodes(graph); } - change_default_ptr(); + change_default_ptr(graph); throw_if_canceled(); // state -> node if (!m_memory_states.empty()) { - assign_states(); + graph.assignStates(m_memory_states); } - push_input_data(); + push_input_data(graph); - m_graph->Infer(this); + graph.Infer(this); throw_if_canceled(); // update output control blocks, if any, in order to refresh internal buffers - if (m_graph->IsDynamic()) { + if (graph.IsDynamic()) { for (auto&& item : m_outputControlBlocks) { item.second.update(); } } - m_graph->PullOutputData(m_outputs); + graph.PullOutputData(m_outputs); } std::vector SyncInferRequest::get_profiling_info() const { - if (!m_graph || !m_graph->IsReady()) + auto&& graph = m_compiled_model.graph(); + if (!graph.IsReady()) OPENVINO_THROW("Graph is not ready!"); std::vector perfMap; - m_graph->GetPerfData(perfMap); + graph.GetPerfData(perfMap); return perfMap; } @@ -172,13 +147,10 @@ static inline void change_edge_ptr(const EdgePtr& edge, ov::SoPtr& } } -void SyncInferRequest::change_default_ptr() { - const auto& inputNodesMap = m_graph->GetInputNodesMap(); - const auto& outputNodesMap = m_graph->GetOutputNodesMap(); - +void SyncInferRequest::change_default_ptr(Graph& graph) { std::unordered_set inputPtrs; std::function& tensor)> changeInpPtr; - if (m_graph->IsDynamic()) { + if (graph.IsDynamic()) { changeInpPtr = [&inputPtrs](const EdgePtr& edge, ov::SoPtr& tensor) { change_edge_ptr(edge, tensor); inputPtrs.insert(tensor->data()); @@ -190,9 +162,8 @@ void SyncInferRequest::change_default_ptr() { } for (auto& it : m_input_external_ptr) { - auto input = inputNodesMap.find(it.first); - OPENVINO_ASSERT(inputNodesMap.end() != input, "Cannot find input tensor with index: ", it.first); - NodePtr inputNodePtr = input->second; + auto inputNodePtr = graph.getInputNodeByIndex(it.first); + OPENVINO_ASSERT(inputNodePtr, "Cannot find input tensor with index: ", it.first); if (inputNodePtr->getDstDataAtPort(0) == static_cast(it.second->data())) continue; auto& childEdges = inputNodePtr->getChildEdges(); @@ -238,9 +209,9 @@ void SyncInferRequest::change_default_ptr() { } for (auto& it : m_output_external_ptr) { - auto output = outputNodesMap.find(it.first); - OPENVINO_ASSERT(outputNodesMap.end() != output, "Cannot find output tensor with index: ", it.first); - auto parentEdge = output->second->getParentEdgeAt(0); + auto output = graph.getOutputNodeByIndex(it.first); + OPENVINO_ASSERT(output, "Cannot find output tensor with index: ", it.first); + auto parentEdge = output->getParentEdgeAt(0); void* const outputRawPtr = parentEdge->getMemory().getData(); if (outputRawPtr == static_cast(it.second->data())) continue; @@ -278,24 +249,21 @@ void SyncInferRequest::change_default_ptr() { change_edge_ptr(parentEdge, it.second); } - if (m_graph->IsDynamic()) { - const auto& outMemBlocksMap = m_graph->getOutputNodesMemBlocksMap(); + if (graph.IsDynamic()) { + const auto& outMemBlocksMap = graph.getOutputNodesMemBlocksMap(); for (auto&& item : outMemBlocksMap) { - const auto& name = item.first; + const auto index = item.first; // share intel_cpu::Tensor to Graph by injecting to corresponding ProxyMemoryBlock instance. auto outputMemBlock = item.second; - OPENVINO_ASSERT(outputMemBlock, "proxy mem block for output ", name, " is empty."); + OPENVINO_ASSERT(outputMemBlock, "proxy mem block for output ", index, " is empty."); - auto controlBlockItr = m_outputControlBlocks.find(name); + auto controlBlockItr = m_outputControlBlocks.find(index); if (controlBlockItr != m_outputControlBlocks.end()) { - auto output = outputNodesMap.find(name); - OPENVINO_ASSERT(outputNodesMap.end() != output, - "Node with name: ", - name, - " is absent in the outputNodesMap"); - auto parentEdge = output->second->getParentEdgeAt(0); + auto output = graph.getOutputNodeByIndex(index); + OPENVINO_ASSERT(output, "Output with index: ", index, " is absent in the outputNodesMap"); + auto parentEdge = output->getParentEdgeAt(0); // avoid cyclic memory use auto&& controlBlock = controlBlockItr->second; @@ -311,10 +279,10 @@ void SyncInferRequest::change_default_ptr() { ", actual ", controlBlock.currentMemBlock(), " graph ", - m_graph, - " inferrequest ", + &graph, + " infer request ", this); - DEBUG_LOG(name, ", tensor ", controlBlock.tensor()); + DEBUG_LOG(index, ", tensor ", controlBlock.tensor()); } else { outputMemBlock->reset(); // switch to the internal memory since memory sharing is no longer possible } @@ -413,7 +381,12 @@ void SyncInferRequest::set_tensor(const ov::Output& in_port, con " are different."); } - MemoryDescPtr actualDesc = m_graph->getInputNodeByIndex(input_index)->getBaseMemDescAtOutputPort(0); + auto&& graph = m_compiled_model.graph(); + + auto inputNode = graph.getInputNodeByIndex(input_index); + OPENVINO_ASSERT(inputNode, "CPU execution graph doesn't contain input node with index: ", input_index); + + MemoryDescPtr actualDesc = inputNode->getBaseMemDescAtOutputPort(0); if (!actualDesc->isDefined()) { // we must define desc for dynamic case // otherwise we got incorrect check on shape compatibility inside isCompatible @@ -460,7 +433,11 @@ void SyncInferRequest::set_tensor(const ov::Output& in_port, con " are different."); } - const auto& desc = m_graph->getOutputNodeByIndex(output_index)->getParentEdgeAt(0)->getMemory().getDesc(); + auto&& graph = m_compiled_model.graph(); + + auto outputNode = graph.getOutputNodeByIndex(output_index); + OPENVINO_ASSERT(outputNode, "CPU execution graph doesn't contain output node with index: ", output_index); + const auto& desc = outputNode->getParentEdgeAt(0)->getMemory().getDesc(); if (!isDynamic && mem_desc_ptr->isCompatible(desc)) { m_output_external_ptr[output_index] = tensor; } else if (m_output_external_ptr.find(output_index) != m_output_external_ptr.end()) { @@ -484,15 +461,15 @@ void SyncInferRequest::set_tensors_impl(const ov::Output port, void SyncInferRequest::init_tensor(const std::size_t& port_index, const ov::ISyncInferRequest::FoundPort::Type& type) { OV_ITT_SCOPED_TASK(itt::domains::intel_cpu, "init_tensor"); - if (!m_graph || !m_graph->IsReady()) - OPENVINO_THROW("Graph is not ready!"); + auto&& graph = m_compiled_model.graph(); + OPENVINO_ASSERT(graph.IsReady(), "Graph is not ready!"); ov::SoPtr tensor; if (type == ov::ISyncInferRequest::FoundPort::Type::INPUT) { - OPENVINO_ASSERT(m_graph->GetInputNodesMap().find(port_index) != m_graph->GetInputNodesMap().end(), + OPENVINO_ASSERT(graph.getInputNodeByIndex(port_index), "Tensor with index: ", port_index, - " exists in CPU plugin graph, but absents in model inputs"); + " absent in the plugin's graph inputs"); const auto& port = m_input_ports_map[port_index]; tensor = ov::ISyncInferRequest::get_tensor(port); @@ -513,8 +490,9 @@ void SyncInferRequest::init_tensor(const std::size_t& port_index, const ov::ISyn if (!isDynamic) { auto mem_desc_ptr = MemoryDescUtils::generateCpuBlockedMemoryDesc(tensor); - if (mem_desc_ptr->isCompatible( - m_graph->getInputNodeByIndex(port_index)->getChildEdgeAt(0)->getMemory().getDesc())) { + auto inputNode = graph.getInputNodeByIndex(port_index); + OPENVINO_ASSERT(inputNode, "CPU execution graph doesn't contain input node with index: ", port_index); + if (mem_desc_ptr->isCompatible(inputNode->getChildEdgeAt(0)->getMemory().getDesc())) { m_input_external_ptr[port_index] = tensor; } } @@ -522,16 +500,12 @@ void SyncInferRequest::init_tensor(const std::size_t& port_index, const ov::ISyn } if (type == ov::ISyncInferRequest::FoundPort::Type::OUTPUT) { - const auto& outMap = m_graph->GetOutputNodesMap(); - auto output = outMap.find(port_index); - OPENVINO_ASSERT(output != outMap.end(), - "Tensor with index: ", - port_index, - " exists in CPU plugin graph, but absents in model outputs"); + auto output = graph.getOutputNodeByIndex(port_index); + OPENVINO_ASSERT(output, "Tensor with index: ", port_index, " absent in the plugin's graph outputs"); if (m_outputs.find(port_index) == m_outputs.end()) { const auto& port = m_output_ports_map[port_index]; const auto& port_shape = port.get_partial_shape(); - const auto& graph_shape = output->second->getInputShapeAtPort(0); + const auto& graph_shape = output->getInputShapeAtPort(0); // WA, due to the transformations and constant folding, shape inference of the resulting model may // have static shapes, while they are dynamic in the initial representation @@ -560,8 +534,7 @@ void SyncInferRequest::init_tensor(const std::size_t& port_index, const ov::ISyn tensor = std::make_shared(memory); } else { - const auto graph_prec = - output->second->getParentEdgeAt(0)->getMemory().getDesc().getPrecision(); + const auto graph_prec = output->getParentEdgeAt(0)->getMemory().getDesc().getPrecision(); OutputControlBlock control_block{model_prec, Shape{shape}}; DEBUG_LOG(port_index, @@ -585,7 +558,7 @@ void SyncInferRequest::init_tensor(const std::size_t& port_index, const ov::ISyn m_outputs[port_index] = tensor; if (!port_shape.is_dynamic() && !m_output_external_ptr.count(port_index)) { auto desc = MemoryDescUtils::generateCpuBlockedMemoryDesc(tensor); - if (desc->isCompatible(output->second->getParentEdgeAt(0)->getMemory().getDesc())) { + if (desc->isCompatible(output->getParentEdgeAt(0)->getMemory().getDesc())) { m_output_external_ptr[port_index] = tensor; } } @@ -603,10 +576,10 @@ void SyncInferRequest::init_tensor(const std::size_t& port_index, const ov::ISyn return; } -void SyncInferRequest::push_input_data() { +void SyncInferRequest::push_input_data(Graph& graph) { for (auto& input : m_input_ports_map) { auto tensor = get_tensor(input.second); - m_graph->PushInputData(input.first, tensor); + graph.PushInputData(input.first, tensor); } } diff --git a/src/plugins/intel_cpu/src/infer_request.h b/src/plugins/intel_cpu/src/infer_request.h index b66387ecc4d4d5..daae553dff2ea4 100644 --- a/src/plugins/intel_cpu/src/infer_request.h +++ b/src/plugins/intel_cpu/src/infer_request.h @@ -4,6 +4,7 @@ #pragma once +#include "compiled_model.h" #include "cpu_tensor.h" #include "graph.h" #include "memory_state.h" @@ -13,13 +14,11 @@ namespace ov { namespace intel_cpu { -class CompiledModel; class AsyncInferRequest; class SyncInferRequest : public ov::ISyncInferRequest { public: - SyncInferRequest(std::shared_ptr compiled_model); - virtual ~SyncInferRequest(); + SyncInferRequest(CompiledModelHolder compiled_model); void infer() override; @@ -97,11 +96,10 @@ class SyncInferRequest : public ov::ISyncInferRequest { void create_infer_request(); void init_tensor(const std::size_t& port_index, const ov::ISyncInferRequest::FoundPort::Type& type); - void push_input_data(); - void redefine_memory_for_input_nodes(); - void assign_states(); + void push_input_data(Graph& graph); + void redefine_memory_for_input_nodes(Graph& graph); void update_external_tensor_ptrs(); - void change_default_ptr(); + void change_default_ptr(Graph& graph); const ov::Output& get_internal_port(const ov::Output& port) const; @@ -110,14 +108,13 @@ class SyncInferRequest : public ov::ISyncInferRequest { private: std::unordered_map m_outputControlBlocks; - Graph* m_graph = nullptr; std::unordered_map> m_input_external_ptr; std::unordered_map> m_output_external_ptr; - std::shared_ptr m_compiled_model; openvino::itt::handle_t m_profiling_task; std::vector m_memory_states; AsyncInferRequest* m_asyncRequest = nullptr; + CompiledModelHolder m_compiled_model; std::unordered_map> m_input_ports_map; std::unordered_map> m_output_ports_map; diff --git a/src/plugins/intel_cpu/src/nodes/composite.cpp b/src/plugins/intel_cpu/src/nodes/composite.cpp index 616d3df6950e9a..0d8b33d90fbd9c 100644 --- a/src/plugins/intel_cpu/src/nodes/composite.cpp +++ b/src/plugins/intel_cpu/src/nodes/composite.cpp @@ -75,7 +75,7 @@ void Composite::selectOptimalPrimitiveDescriptor() { // @todo add ascii diagramm for memory mapping / reuse void Composite::createPrimitive() { - OPENVINO_ASSERT(getOriginalInputsNumber() == m_graph.GetInputNodesMap().size(), + OPENVINO_ASSERT(getOriginalInputsNumber() == m_graph.inputsNumber(), "Number of node inputs must be equal the number of inner graph's inputs"); std::vector inputMemory; @@ -83,7 +83,7 @@ void Composite::createPrimitive() { inputMemory.emplace_back(getSrcMemoryAtPort(i)); } - OPENVINO_ASSERT(getOriginalOutputsNumber() == m_graph.GetOutputNodesMap().size(), + OPENVINO_ASSERT(getOriginalOutputsNumber() == m_graph.outputsNumber(), "Number of node outputs must be equal the number of inner graph's outputs"); std::vector outputMemory; diff --git a/src/plugins/intel_cpu/src/nodes/if.cpp b/src/plugins/intel_cpu/src/nodes/if.cpp index 8de1cf14920d74..88e2c84970d874 100644 --- a/src/plugins/intel_cpu/src/nodes/if.cpp +++ b/src/plugins/intel_cpu/src/nodes/if.cpp @@ -86,11 +86,9 @@ void If::getSupportedDescriptors() { subGraphThen.CreateGraph(thenBody, context); subGraphElse.CreateGraph(elseBody, context); - const auto& inMapThen = subGraphThen.GetInputNodesMap(); for (const auto& param : ifOp->get_then_body()->get_parameters()) { - auto inNode = inMapThen.find(ifOp->get_then_body()->get_parameter_index(param)); - if (inNode != inMapThen.end()) { - inputMemThen.push_back(getToMemories(inNode->second.get(), 0)); + if (auto inNode = subGraphThen.getInputNodeByIndex(ifOp->get_then_body()->get_parameter_index(param))) { + inputMemThen.push_back(getToMemories(inNode.get(), 0)); } else { OPENVINO_THROW("Then body of node If with name ", getName(), @@ -99,11 +97,9 @@ void If::getSupportedDescriptors() { } } - const auto& inMapElse = subGraphElse.GetInputNodesMap(); for (const auto& param : ifOp->get_else_body()->get_parameters()) { - auto inNode = inMapElse.find(ifOp->get_else_body()->get_parameter_index(param)); - if (inNode != inMapElse.end()) { - inputMemElse.push_back(getToMemories(inNode->second.get(), 0)); + if (auto inNode = subGraphElse.getInputNodeByIndex(ifOp->get_else_body()->get_parameter_index(param))) { + inputMemElse.push_back(getToMemories(inNode.get(), 0)); } else { OPENVINO_THROW("Else body of node If with name ", getName(), @@ -112,11 +108,9 @@ void If::getSupportedDescriptors() { } } - const auto& outMapThen = subGraphThen.GetOutputNodesMap(); for (const auto& out : ifOp->get_then_body()->get_results()) { - auto outNode = outMapThen.find(ifOp->get_then_body()->get_result_index(out)); - if (outNode != outMapThen.end()) { - auto outMem = outNode->second->getSrcMemoryAtPort(0); + if (auto outNode = subGraphThen.getOutputNodeByIndex(ifOp->get_then_body()->get_result_index(out))) { + auto outMem = outNode->getSrcMemoryAtPort(0); outputMemThen.push_back(outMem); } else { OPENVINO_THROW("Then body of node If with name ", @@ -126,11 +120,9 @@ void If::getSupportedDescriptors() { } } - const auto& outMapElse = subGraphElse.GetOutputNodesMap(); for (const auto& out : ifOp->get_else_body()->get_results()) { - auto outNode = outMapElse.find(ifOp->get_else_body()->get_result_index(out)); - if (outNode != outMapElse.end()) { - auto outMem = outNode->second->getSrcMemoryAtPort(0); + if (auto outNode = subGraphElse.getOutputNodeByIndex(ifOp->get_else_body()->get_result_index(out))) { + auto outMem = outNode->getSrcMemoryAtPort(0); outputMemElse.push_back(outMem); } else { OPENVINO_THROW("Else body of node If with name ", diff --git a/src/plugins/intel_cpu/src/nodes/lora.cpp b/src/plugins/intel_cpu/src/nodes/lora.cpp index 0dcb2e9ef2b9e5..c59a3a7fa37578 100644 --- a/src/plugins/intel_cpu/src/nodes/lora.cpp +++ b/src/plugins/intel_cpu/src/nodes/lora.cpp @@ -88,7 +88,7 @@ void LoRA::selectOptimalPrimitiveDescriptor() { // @todo add ascii diagram for memory mapping / reuse void LoRA::createPrimitive() { - CPU_NODE_ASSERT(getOriginalInputsNumber() == m_graph.GetInputNodesMap().size(), + CPU_NODE_ASSERT(getOriginalInputsNumber() == m_graph.inputsNumber(), "Number of node inputs must be equal the number of inner graph's inputs"); std::vector inputMemory; @@ -99,7 +99,7 @@ void LoRA::createPrimitive() { inputMemory.emplace_back(std::move(mem)); } - CPU_NODE_ASSERT(getOriginalOutputsNumber() == m_graph.GetOutputNodesMap().size(), + CPU_NODE_ASSERT(getOriginalOutputsNumber() == m_graph.outputsNumber(), "Number of node outputs must be equal the number of inner graph's outputs"); std::vector outputMemory{getDstMemoryAtPort(0)}; diff --git a/src/plugins/intel_cpu/src/nodes/memory.hpp b/src/plugins/intel_cpu/src/nodes/memory.hpp index 1571e8fffa2231..9c0c9664ce8a27 100644 --- a/src/plugins/intel_cpu/src/nodes/memory.hpp +++ b/src/plugins/intel_cpu/src/nodes/memory.hpp @@ -29,7 +29,7 @@ class MemoryStatesRegister { void registerInput(MemoryInputBase* node); void remove(MemoryNode* node); - const InputNodesMap& getMemoryStates() const { + const InputNodesMap& getMemoryStates() { return memory_inputs; } diff --git a/src/plugins/intel_cpu/src/nodes/tensoriterator.cpp b/src/plugins/intel_cpu/src/nodes/tensoriterator.cpp index e2bd8ed6f25b5d..99281189b8196b 100644 --- a/src/plugins/intel_cpu/src/nodes/tensoriterator.cpp +++ b/src/plugins/intel_cpu/src/nodes/tensoriterator.cpp @@ -440,19 +440,15 @@ void TensorIterator::getSupportedDescriptors() { const std::shared_ptr body = tiOp->get_function(); sub_graph.CreateGraph(body, context); - const auto& inMap = sub_graph.GetInputNodesMap(); for (const auto& param : tiOp->get_function()->get_parameters()) { - auto inNode = inMap.find(tiOp->get_function()->get_parameter_index(param)); - if (inNode != inMap.end()) { - input_mems.push_back(getToMemories(inNode->second.get(), 0)); + if (auto inNode = sub_graph.getInputNodeByIndex(tiOp->get_function()->get_parameter_index(param))) { + input_mems.push_back(getToMemories(inNode.get(), 0)); } } - const auto& outMap = sub_graph.GetOutputNodesMap(); for (const auto& out : tiOp->get_function()->get_results()) { - auto outNode = outMap.find(tiOp->get_function()->get_result_index(out)); - if (outNode != outMap.end()) { - auto outMem = outNode->second->getSrcMemoryAtPort(0); + if (auto outNode = sub_graph.getOutputNodeByIndex(tiOp->get_function()->get_result_index(out))) { + auto outMem = outNode->getSrcMemoryAtPort(0); output_mem.push_back(outMem); } } diff --git a/src/plugins/intel_cpu/tests/functional/custom/behavior/ov_executable_network/concurent_release_memory.cpp b/src/plugins/intel_cpu/tests/functional/custom/behavior/ov_executable_network/concurent_release_memory.cpp new file mode 100644 index 00000000000000..4c8fb1945bb8fa --- /dev/null +++ b/src/plugins/intel_cpu/tests/functional/custom/behavior/ov_executable_network/concurent_release_memory.cpp @@ -0,0 +1,198 @@ +// Copyright (C) 2018-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace ov { +namespace test { +// Openvino extension operation that sleeps for X us in its evaluate method +namespace { +enum class TestSteps { INIT, ENTER_EVALUATE, RUN_EVALUATE }; +} // namespace + +class SleepCustomOp : public ov::op::Op { +public: + OPENVINO_OP("SleepCustomOp"); + SleepCustomOp() = default; + SleepCustomOp(const ov::OutputVector& args, + size_t sleep, + std::shared_ptr mutex, + std::shared_ptr cv, + std::shared_ptr> test_step) + : Op(args), + m_sleep(sleep), + m_mutex(mutex), + m_cv(cv), + m_test_step(test_step) { + constructor_validate_and_infer_types(); + } + + void validate_and_infer_types() override { + set_output_type(0, get_input_element_type(0), get_input_partial_shape(0)); + } + + std::shared_ptr clone_with_new_inputs(const ov::OutputVector& new_args) const override { + OPENVINO_ASSERT(new_args.size() == 1, "Incorrect number of new arguments"); + auto new_op = std::make_shared(new_args, m_sleep, m_mutex, m_cv, m_test_step); + return new_op; + } + + bool visit_attributes(ov::AttributeVisitor& visitor) override { + return true; + } + + void revalidate_and_infer_types() override { + validate_and_infer_types(); + } + + bool evaluate(ov::TensorVector& outputs, const ov::TensorVector& inputs) const override { + // signal entering the evaluate method + { + std::lock_guard lock(*m_mutex); + m_test_step->store(TestSteps::ENTER_EVALUATE); + } + m_cv->notify_all(); + { + // this is required to start all the evaluate calls at the same time + std::unique_lock lock(*m_mutex); + m_cv->wait(lock, [&] { + return m_test_step->load() == TestSteps::RUN_EVALUATE; + }); + } + std::this_thread::sleep_for(std::chrono::microseconds(m_sleep)); + return true; + } + + bool evaluate(ov::TensorVector& output_values, + const ov::TensorVector& input_values, + const ov::EvaluationContext& evaluationContext) const override { + return evaluate(output_values, input_values); + } + + bool has_evaluate() const override { + return true; + } + +private: + size_t m_sleep; // sleep time in us + std::shared_ptr m_mutex; + std::shared_ptr m_cv; + std::shared_ptr> m_test_step; +}; + +class ReleaseMemoryMultiThreadTest : public ::testing::Test { +protected: + void SetUp() override { + param = std::make_shared(ov::element::f32, ov::Shape{1}); + + constexpr size_t sleep_time = 5; // us + mutex = std::make_shared(); + cv = std::make_shared(); + test_step = std::make_shared>(TestSteps::INIT); + + auto sleep = std::make_shared(ov::OutputVector{param}, sleep_time, mutex, cv, test_step); + ov::ResultVector results{std::make_shared(sleep)}; + ov::ParameterVector params{param}; + + auto model = std::make_shared(results, params, "testModel"); + + compiled_model = core.compile_model(model, ov::test::utils::DEVICE_CPU, {{"NUM_STREAMS", num_streams}}); + } + +protected: + const size_t num_streams = 1; // use only one async stream to simplify invocation order syncronization + ov::Core core; + ov::CompiledModel compiled_model; + std::shared_ptr param; + + std::shared_ptr mutex; + std::shared_ptr cv; + std::shared_ptr> test_step; +}; +} // namespace test +} // namespace ov + +using namespace ov::test; + +TEST_F(ReleaseMemoryMultiThreadTest, smoke_throwInferenceIsRunning) { + // Create and infer a few infer requests concurrently + std::vector inferRequests; + for (size_t i = 0; i < num_streams; i++) { + auto inferRequest = compiled_model.create_infer_request(); + inferRequest.set_tensor(param, ov::Tensor(ov::element::f32, ov::Shape{1})); + inferRequests.push_back(std::move(inferRequest)); + } + // infer the infer requests + for (auto& inferRequest : inferRequests) { + inferRequest.start_async(); + } + + //wait till the infer request enters evaluate + { + std::unique_lock lock(*mutex); + cv->wait(lock, [&] { + return test_step->load() == TestSteps::ENTER_EVALUATE; + }); + } + + // While the infer requests are waiting on the cv, call release_memory. + // We expect that the method will throw an exception when it is called while infer requests are running. + EXPECT_THROW(compiled_model.release_memory(), ov::Exception); + + // lets unlock cv + { + std::lock_guard lock(*mutex); + test_step->store(TestSteps::RUN_EVALUATE); + } + cv->notify_all(); + + for (auto& inferRequest : inferRequests) { + inferRequest.wait(); + } +} + +TEST_F(ReleaseMemoryMultiThreadTest, smoke_noThrowInferenceIsNotRunning) { + // Create and infer a few infer requests concurrently + std::vector inferRequests; + for (size_t i = 0; i < num_streams; i++) { + auto inferRequest = compiled_model.create_infer_request(); + inferRequest.set_tensor(param, ov::Tensor(ov::element::f32, ov::Shape{1})); + inferRequests.push_back(std::move(inferRequest)); + } + // infer the infer requests + for (auto& inferRequest : inferRequests) { + inferRequest.start_async(); + } + + //wait till the infer request enters evaluate + { + std::unique_lock lock(*mutex); + cv->wait(lock, [&] { + return test_step->load() == TestSteps::ENTER_EVALUATE; + }); + } + + // lets unlock cv + { + std::lock_guard lock(*mutex); + test_step->store(TestSteps::RUN_EVALUATE); + } + cv->notify_all(); + + for (auto& inferRequest : inferRequests) { + inferRequest.wait(); + } + + // Don't throw when the infer requests are finished + EXPECT_NO_THROW(compiled_model.release_memory()); +} \ No newline at end of file