Skip to content

Commit

Permalink
Reserving CPU resource in CPU inference (openvinotoolkit#27321)
Browse files Browse the repository at this point in the history
### Details:
- Add property `ov::hint::enable_cpu_reservation` to reserve CPU
resource in CPU inference
- `ov::hint::enable_cpu_reservation` defaults to false, user can
explicitly set it to true to enable CPU reservation.
 - update proc_type_table before stream scheduling in compile_model()

### Tickets:
 - *CVS-155268*
 - *openvinotoolkit#27083

---------

Co-authored-by: Shen, Wanglei <[email protected]>
Co-authored-by: Chen Peter <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2025
1 parent 513dcc5 commit c849f72
Show file tree
Hide file tree
Showing 36 changed files with 2,048 additions and 501 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ All parameters must be set before calling ``ov::Core::compile_model()`` in order
- ``ov::hint::num_request``
- ``ov::hint::scheduling_core_type``
- ``ov::hint::enable_hyper_threading``
- ``ov::hint::enable_cpu_reservation``
- ``ov::hint::enable_cpu_pinning``
- ``ov::num_streams``
- ``ov::inference_num_threads``
Expand Down
7 changes: 7 additions & 0 deletions src/inference/dev_api/openvino/runtime/system_conf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ OPENVINO_RUNTIME_API std::vector<std::vector<int>> get_proc_type_table();
*/
OPENVINO_RUNTIME_API int get_current_socket_id();

/**
* @brief Returns the numa node ID in cpu mapping table of the currently running thread.
* @ingroup ov_dev_api_system_conf
* @return numa node ID in cpu mapping
*/
OPENVINO_RUNTIME_API int get_current_numa_node_id();

/**
* @brief Returns a table of original number of processor types without filtering other plugins occupying CPU
* resources. The difference from get_proc_type_table: This is used to get the configuration of current machine. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class OPENVINO_RUNTIME_API CPUStreamsExecutor : public IStreamsExecutor {

std::vector<int> get_rank() override;

void cpu_reset() override;

private:
struct Impl;
std::unique_ptr<Impl> _impl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <memory>
#include <string>
#include <vector>
#include <mutex>

#include "openvino/runtime/common.hpp"
#include "openvino/runtime/properties.hpp"
Expand Down Expand Up @@ -89,14 +90,15 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
ov::hint::SchedulingCoreType::ANY_CORE; //!< PCORE_ONLY and ECORE_ONLY are valid in hybrid core machine,
//!< ANY_CORE is valid in all machines. Core type priority:
//!< physical PCore, ECore, logical PCore
bool _cpu_reservation = false; //!< Whether to reserve current cores which will not be used by other plugin.
//!< If it is true, cpu_pinning defaults to true.
bool _cpu_reservation = false; //!< Whether to reserve current cores which will not be used by other plugin or
//!< compiled model. If it is true, cpu_pinning defaults to true.
bool _cpu_pinning = false; //!< Whether to bind threads to cores.
bool _cores_limit = true; //!< Whether to limit the number of streams and threads by the number of cpu cores
std::vector<std::vector<int>> _streams_info_table = {};
std::vector<std::vector<int>> _stream_processor_ids;
int _sub_streams = 0;
std::vector<int> _rank = {};
bool _add_lock = true;

/**
* @brief Get and reserve cpu ids based on configuration and hardware information,
Expand All @@ -109,6 +111,8 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
*/
void update_executor_config();

void update_executor_config(bool lock);

/**
* @brief Set _streams_info_table and _cpu_reservation in cpu streams executor config when nstreams = 0,
* that is, only create one thread with TBB
Expand Down Expand Up @@ -136,7 +140,8 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
bool cpu_pinning = false,
bool cores_limit = true,
std::vector<std::vector<int>> streams_info_table = {},
std::vector<int> rank = {})
std::vector<int> rank = {},
bool add_lock = true)
: _name{std::move(name)},
_streams{streams},
_threads_per_stream{threads_per_stream},
Expand All @@ -145,8 +150,9 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
_cpu_pinning{cpu_pinning},
_cores_limit{cores_limit},
_streams_info_table{std::move(streams_info_table)},
_rank{rank} {
update_executor_config();
_rank{rank},
_add_lock(add_lock) {
update_executor_config(_add_lock);
}

// These APIs which includes set_property and get_property can not be removed until they will never be called by
Expand Down Expand Up @@ -266,12 +272,19 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
*/
virtual std::vector<int> get_rank() = 0;

/**
* @brief Reset cpu map table when user set enable_cpu_reservation = true
*/
virtual void cpu_reset() = 0;

/**
* @brief Execute the task in the current thread using streams executor configuration and constraints
* @param task A task to start
*/
virtual void execute(Task task) = 0;
};

static std::mutex _streams_executor_mutex;

} // namespace threading
} // namespace ov
17 changes: 17 additions & 0 deletions src/inference/include/openvino/runtime/properties.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,23 @@ static constexpr Property<std::set<ModelDistributionPolicy>> model_distribution_
*/
static constexpr Property<bool> enable_cpu_pinning{"ENABLE_CPU_PINNING"};

/**
* @brief This property allows CPU reservation during inference.
* @ingroup ov_runtime_cpp_prop_api
*
* Cpu Reservation means reserve cpus which will not be used by other plugin or compiled model. Developer can use this
* property to enable or disable CPU reservation during inference on Windows and Linux. MacOS does not support CPU
* reservation, and this property is always disabled. This property defaults to false.
*
* The following code is example to use this property.
*
* @code
* ie.set_property(ov::hint::enable_cpu_reservation(true));
* ie.set_property(ov::hint::enable_cpu_reservation(false));
* @endcode
*/
static constexpr Property<bool> enable_cpu_reservation{"ENABLE_CPU_RESERVATION"};

/**
* @brief This property define if using hyper threading during inference.
* @ingroup ov_runtime_cpp_prop_api
Expand Down
26 changes: 18 additions & 8 deletions src/inference/src/dev/threading/cpu_streams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ struct CPUStreamsExecutor::Impl {
_impl->_streamIdQueue.push(_streamId);
}
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
if (_impl->_config.get_name().find("StreamsExecutor") == std::string::npos) {
try {
set_cpu_used(_cpu_ids, NOT_USED);
} catch (const ov::Exception&) {
// Destructor should not throw - catch needed for static analysis.
// CPU::CPU() won't throw here as cpu_info() is called from Stream constructor.
}
}
if (nullptr != _observer) {
_observer->observe(false);
}
Expand Down Expand Up @@ -345,6 +337,7 @@ struct CPUStreamsExecutor::Impl {
_exectorMgr = executor_manager();
auto numaNodes = get_available_numa_nodes();
int streams_num = _config.get_streams();
auto processor_ids = _config.get_stream_processor_ids();
if (streams_num != 0) {
std::copy_n(std::begin(numaNodes),
std::min<std::size_t>(streams_num, numaNodes.size()),
Expand All @@ -353,6 +346,10 @@ struct CPUStreamsExecutor::Impl {
_usedNumaNodes = std::move(numaNodes);
}
for (auto streamId = 0; streamId < streams_num; ++streamId) {
if (_config.get_cpu_reservation()) {
std::lock_guard<std::mutex> lock(_cpu_ids_mutex);
_cpu_ids_all.insert(_cpu_ids_all.end(), processor_ids[streamId].begin(), processor_ids[streamId].end());
}
_threads.emplace_back([this, streamId] {
openvino::itt::threadName(_config.get_name() + "_" + std::to_string(streamId));
for (bool stopped = false; !stopped;) {
Expand Down Expand Up @@ -457,6 +454,8 @@ struct CPUStreamsExecutor::Impl {
CustomThreadLocal _streams;
std::shared_ptr<ExecutorManager> _exectorMgr;
bool _isExit = false;
std::vector<int> _cpu_ids_all;
std::mutex _cpu_ids_mutex;
};

int CPUStreamsExecutor::get_stream_id() {
Expand Down Expand Up @@ -492,9 +491,20 @@ std::vector<int> CPUStreamsExecutor::get_rank() {
return stream->_rank;
}

void CPUStreamsExecutor::cpu_reset() {
if (!_impl->_cpu_ids_all.empty()) {
set_cpu_used(_impl->_cpu_ids_all, NOT_USED);
{
std::lock_guard<std::mutex> lock(_impl->_cpu_ids_mutex);
_impl->_cpu_ids_all.clear();
}
}
}

CPUStreamsExecutor::CPUStreamsExecutor(const IStreamsExecutor::Config& config) : _impl{new Impl{config}} {}

CPUStreamsExecutor::~CPUStreamsExecutor() {
cpu_reset();
{
std::lock_guard<std::mutex> lock(_impl->_mutex);
_impl->_isStopped = true;
Expand Down
37 changes: 27 additions & 10 deletions src/inference/src/dev/threading/istreams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,32 @@ void IStreamsExecutor::Config::update_executor_config() {
const auto proc_type_table = get_proc_type_table();
bool streams_info_available = false;

if (proc_type_table.empty()) {
return;
}

if (_cpu_reservation && !_cpu_pinning) {
_cpu_pinning = true;
if (proc_type_table.empty() || proc_type_table[0][ALL_PROC] == 0) {
if (_cpu_reservation) {
OPENVINO_THROW("[ Config ] proc_type_table is empty. No CPU resources available!");
} else {
return;
}
}

if (!_streams_info_table.empty()) {
streams_info_available = true;
std::vector<int> threads_proc_type(HYPER_THREADING_PROC + 1, 0);
int threads_all = 0;
for (size_t i = 0; i < _streams_info_table.size(); i++) {
if (_streams_info_table[i][NUMBER_OF_STREAMS] > 0) {
threads_proc_type[_streams_info_table[i][PROC_TYPE]] +=
int num_threads =
_streams_info_table[i][THREADS_PER_STREAM] * _streams_info_table[i][NUMBER_OF_STREAMS];
threads_proc_type[_streams_info_table[i][PROC_TYPE]] += num_threads;
threads_all += num_threads;
}
}
if (threads_all == 0) {
OPENVINO_THROW("streams_info_table is invalid!");
}
for (size_t i = ALL_PROC; i < threads_proc_type.size(); i++) {
if (threads_proc_type[i] > proc_type_table[0][i]) {
streams_info_available = false;
break;
OPENVINO_THROW("Not enough CPU resources!");
}
}
}
Expand Down Expand Up @@ -269,7 +274,7 @@ void IStreamsExecutor::Config::update_executor_config() {
}
}

if (_cpu_pinning) {
if (_cpu_pinning || _cpu_reservation) {
reserve_available_cpus(_streams_info_table, _stream_processor_ids, _cpu_reservation ? CPU_USED : NOT_USED);
}

Expand Down Expand Up @@ -319,6 +324,17 @@ void IStreamsExecutor::Config::update_executor_config() {
#endif
}

void IStreamsExecutor::Config::update_executor_config(bool lock) {
if (lock) {
{
std::lock_guard<std::mutex> lock{_streams_executor_mutex};
update_executor_config();
}
} else {
update_executor_config();
}
}

void IStreamsExecutor::Config::set_config_zero_stream() {
std::vector<std::vector<int>> proc_type_table = get_proc_type_table();
int core_type = MAIN_CORE_PROC;
Expand All @@ -333,6 +349,7 @@ void IStreamsExecutor::Config::set_config_zero_stream() {
socket_id = std::max(0, proc_type_table[0][PROC_SOCKET_ID]);
}
_streams_info_table.push_back({1, core_type, 1, numa_id, socket_id});
_cpu_reservation = false;
_cpu_pinning = false;
}

Expand Down
32 changes: 0 additions & 32 deletions src/inference/src/os/cpu_map_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,6 @@ class CPU {
std::mutex _cpu_mutex;
int _socket_idx = 0;

private:
/**
* @brief Sort proc_type_table by CPU ID on which application is running. The numa node containing this CPU ID
* will move to first row.
* @param[in] _processor_id CPU ID on which application is running.
* @param[in] _proc_type_table summary table of number of processors per type
* @param[in] _cpu_mapping_table CPU mapping table for each processor
* @return
*/
void sort_table_by_cpu_id(const int _processor_id,
std::vector<std::vector<int>>& _proc_type_table,
const std::vector<std::vector<int>>& _cpu_mapping_table) {
int current_numa_node = 0;
int current_socket = 0;

for (auto& row : _cpu_mapping_table) {
if (_processor_id == row[CPU_MAP_PROCESSOR_ID]) {
current_numa_node = row[CPU_MAP_NUMA_NODE_ID];
current_socket = row[CPU_MAP_SOCKET_ID];
break;
}
}
for (size_t i = 1; i < _proc_type_table.size(); i++) {
if ((current_numa_node == _proc_type_table[i][PROC_NUMA_NODE_ID]) &&
(current_socket == _proc_type_table[i][PROC_SOCKET_ID])) {
std::rotate(_proc_type_table.begin() + 1, _proc_type_table.begin() + i, _proc_type_table.end());
break;
}
}
};

friend class LinuxSortProcTableTests;
};

CPU& cpu_info();
Expand Down
5 changes: 0 additions & 5 deletions src/inference/src/os/lin/lin_system_conf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,6 @@ CPU::CPU() {
OPENVINO_THROW("CPU affinity check failed. No CPU is eligible to run inference.");
};

if (_proc_type_table.size() > 1) {
int cur_processor_id = sched_getcpu();
sort_table_by_cpu_id(cur_processor_id, _proc_type_table, _cpu_mapping_table);
}

_org_proc_type_table = _proc_type_table;

cpu_debug();
Expand Down
5 changes: 0 additions & 5 deletions src/inference/src/os/win/win_system_conf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ CPU::CPU() {
}
}

if (_proc_type_table.size() > 1) {
int cur_processor_id = GetCurrentProcessorNumber();
sort_table_by_cpu_id(cur_processor_id, _proc_type_table, _cpu_mapping_table);
}

cpu_debug();
}

Expand Down
Loading

0 comments on commit c849f72

Please sign in to comment.