Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace old CPU streams executor logic #16164

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
364 changes: 11 additions & 353 deletions src/inference/src/threading/ie_cpu_streams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,380 +19,38 @@

#include "ie_parallel_custom_arena.hpp"
#include "ie_system_conf.h"
#include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "threading/ie_executor_manager.hpp"
#include "threading/ie_istreams_executor.hpp"
#include "threading/ie_thread_affinity.hpp"
#include "threading/ie_thread_local.hpp"

using namespace openvino;

namespace InferenceEngine {
struct CPUStreamsExecutor::Impl {
struct Stream {
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
struct Observer : public custom::task_scheduler_observer {
CpuSet _mask;
int _ncpus = 0;
int _threadBindingStep = 0;
int _offset = 0;
int _cpuIdxOffset = 0;
Observer(custom::task_arena& arena,
CpuSet mask,
int ncpus,
const int streamId,
const int threadsPerStream,
const int threadBindingStep,
const int threadBindingOffset,
const int cpuIdxOffset = 0)
: custom::task_scheduler_observer(arena),
_mask{std::move(mask)},
_ncpus(ncpus),
_threadBindingStep(threadBindingStep),
_offset{streamId * threadsPerStream + threadBindingOffset},
_cpuIdxOffset(cpuIdxOffset) {}
void on_scheduler_entry(bool) override {
PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(),
_threadBindingStep,
_ncpus,
_mask,
_cpuIdxOffset);
}
void on_scheduler_exit(bool) override {
PinCurrentThreadByMask(_ncpus, _mask);
}
~Observer() override = default;
};
#endif
explicit Stream(Impl* impl) : _impl(impl) {
{
std::lock_guard<std::mutex> lock{_impl->_streamIdMutex};
if (_impl->_streamIdQueue.empty()) {
_streamId = _impl->_streamId++;
} else {
_streamId = _impl->_streamIdQueue.front();
_impl->_streamIdQueue.pop();
}
}
_numaNodeId = _impl->_config._streams
? _impl->_usedNumaNodes.at((_streamId % _impl->_config._streams) /
((_impl->_config._streams + _impl->_usedNumaNodes.size() - 1) /
_impl->_usedNumaNodes.size()))
: _impl->_usedNumaNodes.at(_streamId % _impl->_usedNumaNodes.size());
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
const auto concurrency = (0 == _impl->_config._threadsPerStream) ? custom::task_arena::automatic
: _impl->_config._threadsPerStream;
if (ThreadBindingType::HYBRID_AWARE == _impl->_config._threadBindingType) {
if (Config::PreferredCoreType::ROUND_ROBIN != _impl->_config._threadPreferredCoreType) {
if (Config::PreferredCoreType::ANY == _impl->_config._threadPreferredCoreType) {
_taskArena.reset(new custom::task_arena{concurrency});
} else {
const auto selected_core_type =
Config::PreferredCoreType::BIG == _impl->_config._threadPreferredCoreType
? custom::info::core_types().back() // running on Big cores only
: custom::info::core_types().front(); // running on Little cores only
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{}
.set_core_type(selected_core_type)
.set_max_concurrency(concurrency)});
}
} else {
// assigning the stream to the core type in the round-robin fashion
// wrapping around total_streams (i.e. how many streams all different core types can handle
// together). Binding priority: Big core, Logical big core, Small core
const auto total_streams = _impl->total_streams_on_core_types.back().second;
const auto big_core_streams = _impl->total_streams_on_core_types.front().second;
const auto hybrid_core = _impl->total_streams_on_core_types.size() > 1;
const auto phy_core_streams =
_impl->_config._big_core_streams == 0
? 0
: _impl->num_big_core_phys / _impl->_config._threads_per_stream_big;
const auto streamId_wrapped = _streamId % total_streams;
const auto& selected_core_type =
std::find_if(
_impl->total_streams_on_core_types.cbegin(),
_impl->total_streams_on_core_types.cend(),
[streamId_wrapped](const decltype(_impl->total_streams_on_core_types)::value_type& p) {
return p.second > streamId_wrapped;
})
->first;
const auto small_core = hybrid_core && selected_core_type == 0;
const auto logic_core = !small_core && streamId_wrapped >= phy_core_streams;
const auto small_core_skip = small_core && _impl->_config._threads_per_stream_small == 3 &&
_impl->_config._small_core_streams > 1;
const auto max_concurrency =
small_core ? _impl->_config._threads_per_stream_small : _impl->_config._threads_per_stream_big;
// Special handling of _threads_per_stream_small == 3
const auto small_core_id = small_core_skip ? 0 : streamId_wrapped - big_core_streams;
const auto stream_id =
hybrid_core
? (small_core ? small_core_id
: (logic_core ? streamId_wrapped - phy_core_streams : streamId_wrapped))
: streamId_wrapped;
const auto thread_binding_step = hybrid_core ? (small_core ? _impl->_config._threadBindingStep : 2)
: _impl->_config._threadBindingStep;
// Special handling of _threads_per_stream_small == 3, need to skip 4 (Four cores share one L2 cache
// on the small core), stream_id = 0, cpu_idx_offset cumulative plus 4
const auto small_core_offset =
small_core_skip ? _impl->_config._small_core_offset + (streamId_wrapped - big_core_streams) * 4
: _impl->_config._small_core_offset;
const auto cpu_idx_offset =
hybrid_core
// Prevent conflicts with system scheduling, so default cpu id on big core starts from 1
? (small_core ? small_core_offset : (logic_core ? 0 : 1))
: 0;

_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{}
.set_core_type(selected_core_type)
.set_max_concurrency(max_concurrency)});
CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = GetProcessMask();
if (nullptr != processMask) {
_observer.reset(new Observer{*_taskArena,
std::move(processMask),
ncpus,
stream_id,
max_concurrency,
thread_binding_step,
_impl->_config._threadBindingOffset,
cpu_idx_offset});
_observer->observe(true);
}
}
} else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{_numaNodeId, concurrency}});
} else if ((0 != _impl->_config._threadsPerStream) ||
(ThreadBindingType::CORES == _impl->_config._threadBindingType)) {
_taskArena.reset(new custom::task_arena{concurrency});
if (ThreadBindingType::CORES == _impl->_config._threadBindingType) {
CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = GetProcessMask();
if (nullptr != processMask) {
_observer.reset(new Observer{*_taskArena,
std::move(processMask),
ncpus,
_streamId,
_impl->_config._threadsPerStream,
_impl->_config._threadBindingStep,
_impl->_config._threadBindingOffset});
_observer->observe(true);
}
}
}
#elif IE_THREAD == IE_THREAD_OMP
omp_set_num_threads(_impl->_config._threadsPerStream);
if (!checkOpenMpEnvVars(false) && (ThreadBindingType::NONE != _impl->_config._threadBindingType)) {
CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = GetProcessMask();
if (nullptr != processMask) {
parallel_nt(_impl->_config._threadsPerStream, [&](int threadIndex, int threadsPerStream) {
int thrIdx = _streamId * _impl->_config._threadsPerStream + threadIndex +
_impl->_config._threadBindingOffset;
PinThreadToVacantCore(thrIdx, _impl->_config._threadBindingStep, ncpus, processMask);
});
}
}
#elif IE_THREAD == IE_THREAD_SEQ
if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
InferenceEngine::PinCurrentThreadToSocket(_numaNodeId);
} else if (ThreadBindingType::CORES == _impl->_config._threadBindingType) {
CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = GetProcessMask();
if (nullptr != processMask) {
PinThreadToVacantCore(_streamId + _impl->_config._threadBindingOffset,
_impl->_config._threadBindingStep,
ncpus,
processMask);
}
}
#endif
}
~Stream() {
{
std::lock_guard<std::mutex> lock{_impl->_streamIdMutex};
_impl->_streamIdQueue.push(_streamId);
}
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
if (nullptr != _observer) {
_observer->observe(false);
}
#endif
}

Impl* _impl = nullptr;
int _streamId = 0;
int _numaNodeId = 0;
bool _execute = false;
std::queue<Task> _taskQueue;
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
std::unique_ptr<custom::task_arena> _taskArena;
std::unique_ptr<Observer> _observer;
#endif
};

explicit Impl(const Config& config)
: _config{config},
_streams([this] {
return std::make_shared<Impl::Stream>(this);
}) {
_exectorMgr = executorManager();
auto numaNodes = getAvailableNUMANodes();
if (_config._streams != 0) {
std::copy_n(std::begin(numaNodes),
std::min(static_cast<std::size_t>(_config._streams), numaNodes.size()),
std::back_inserter(_usedNumaNodes));
} else {
_usedNumaNodes = numaNodes;
}
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) {
const auto core_types = custom::info::core_types();
const auto num_core_phys = getNumberOfCPUCores();
num_big_core_phys = getNumberOfCPUCores(true);
const auto num_small_core_phys = num_core_phys - num_big_core_phys;
int sum = 0;
// reversed order, so BIG cores are first
for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) {
const auto& type = *iter;
// calculating the #streams per core type
const int num_streams_for_core_type =
type == 0 ? std::max(1,
std::min(config._small_core_streams,
config._threads_per_stream_small == 0
? 0
: num_small_core_phys / config._threads_per_stream_small))
: std::max(1,
std::min(config._big_core_streams,
config._threads_per_stream_big == 0
? 0
: num_big_core_phys / config._threads_per_stream_big * 2));
sum += num_streams_for_core_type;
// prefix sum, so the core type for a given stream id will be deduced just as a upper_bound
// (notice that the map keeps the elements in the descending order, so the big cores are populated
// first)
total_streams_on_core_types.push_back({type, sum});
}
}
#endif
for (auto streamId = 0; streamId < _config._streams; ++streamId) {
_threads.emplace_back([this, streamId] {
openvino::itt::threadName(_config._name + "_" + std::to_string(streamId));
for (bool stopped = false; !stopped;) {
Task task;
{
std::unique_lock<std::mutex> lock(_mutex);
_queueCondVar.wait(lock, [&] {
return !_taskQueue.empty() || (stopped = _isStopped);
});
if (!_taskQueue.empty()) {
task = std::move(_taskQueue.front());
_taskQueue.pop();
}
}
if (task) {
Execute(task, *(_streams.local()));
}
}
});
}
}

void Enqueue(Task task) {
{
std::lock_guard<std::mutex> lock(_mutex);
_taskQueue.emplace(std::move(task));
}
_queueCondVar.notify_one();
}

void Execute(const Task& task, Stream& stream) {
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
auto& arena = stream._taskArena;
if (nullptr != arena) {
arena->execute(std::move(task));
} else {
task();
}
#else
task();
#endif
}

void Defer(Task task) {
auto& stream = *(_streams.local());
stream._taskQueue.push(std::move(task));
if (!stream._execute) {
stream._execute = true;
try {
while (!stream._taskQueue.empty()) {
Execute(stream._taskQueue.front(), stream);
stream._taskQueue.pop();
}
} catch (...) {
}
stream._execute = false;
}
}

Config _config;
std::mutex _streamIdMutex;
int _streamId = 0;
std::queue<int> _streamIdQueue;
std::vector<std::thread> _threads;
std::mutex _mutex;
std::condition_variable _queueCondVar;
std::queue<Task> _taskQueue;
bool _isStopped = false;
std::vector<int> _usedNumaNodes;
ThreadLocal<std::shared_ptr<Stream>> _streams;
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
// stream id mapping to the core type
// stored in the reversed order (so the big cores, with the highest core_type_id value, are populated first)
// every entry is the core type and #streams that this AND ALL EARLIER entries can handle (prefix sum)
// (so mapping is actually just an upper_bound: core type is deduced from the entry for which the id < #streams)
using StreamIdToCoreTypes = std::vector<std::pair<custom::core_type_id, int>>;
StreamIdToCoreTypes total_streams_on_core_types;
int num_big_core_phys;
#endif
ExecutorManager::Ptr _exectorMgr;
struct CPUStreamsExecutor::Impl : public ov::threading::CPUStreamsExecutor {
Impl(const InferenceEngine::IStreamsExecutor::Config& config) : ov::threading::CPUStreamsExecutor(config) {}
};

int CPUStreamsExecutor::GetStreamId() {
auto stream = _impl->_streams.local();
return stream->_streamId;
return _impl->get_stream_id();
}

int CPUStreamsExecutor::GetNumaNodeId() {
auto stream = _impl->_streams.local();
return stream->_numaNodeId;
return _impl->get_numa_node_id();
}

CPUStreamsExecutor::CPUStreamsExecutor(const Config& config) : _impl{new Impl{config}} {}
CPUStreamsExecutor::CPUStreamsExecutor(const Config& config) : _impl{new Impl(config)} {}

CPUStreamsExecutor::~CPUStreamsExecutor() {
{
std::lock_guard<std::mutex> lock(_impl->_mutex);
_impl->_isStopped = true;
}
_impl->_queueCondVar.notify_all();
for (auto& thread : _impl->_threads) {
if (thread.joinable()) {
thread.join();
}
}
}
CPUStreamsExecutor::~CPUStreamsExecutor() {}

void CPUStreamsExecutor::Execute(Task task) {
_impl->Defer(std::move(task));
_impl->execute(std::move(task));
}

void CPUStreamsExecutor::run(Task task) {
if (0 == _impl->_config._streams) {
_impl->Defer(std::move(task));
} else {
_impl->Enqueue(std::move(task));
}
_impl->run(std::move(task));
}

} // namespace InferenceEngine