Skip to content

Commit

Permalink
Merge branch 'guozhong/multi_realized_by_ctput' into ywang2/fix_cachi…
Browse files Browse the repository at this point in the history
…ng_test_failure
  • Loading branch information
yangwang201911 authored Mar 28, 2023
2 parents 92a3d05 + 6490c2e commit fb6b33b
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 168 deletions.
5 changes: 2 additions & 3 deletions src/plugins/auto/auto_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,12 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
std::vector<Task> otherDevicesloads;
std::vector<Task> cpuLoads;
if (_pCTPUTLoadContext) {
for (size_t i = 0; i < _autoSContext->_devicePriorities.size(); i++) {
for (size_t i = 0; i < _nCTputDeviceNums; i++) {
auto* contextPtr = &_pCTPUTLoadContext[i];
auto modelPath = _autoSContext->_modelPath;
auto network = _autoSContext->_network;
_pCTPUTLoadContext[i].task = std::bind(loadDeviceTask, contextPtr, modelPath, network, isCumulative);
if (i == _autoSContext->_devicePriorities.size() - 1 &&
if (i == _nCTputDeviceNums - 1 &&
_pCTPUTLoadContext[i].deviceInfo.deviceName.find("CPU") != std::string::npos) {
cpuLoads.push_back(_pCTPUTLoadContext[i].task);
} else {
Expand Down Expand Up @@ -791,7 +791,6 @@ IInferPtr AutoSchedule::CreateInferRequest() {
so = _passthroughExeNet._so;
syncRequestImpl->setPointerToSo(so);
} else if (std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest()) {
// cumulative case, load to MULTI:*
auto sharedMultiRequest = std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest();
if (sharedMultiRequest._ptr->getPointerToSo())
syncRequestImpl->setPointerToSo(sharedMultiRequest._ptr->getPointerToSo());
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/auto/auto_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class AutoSchedule : public MultiSchedule {
static bool RunPipelineTask(IE::Task& inferPipelineTask, NotBusyPriorityWorkerRequests& idleWorkerRequests,
const DeviceName& preferred_device);
DeviceMap<NotBusyPriorityWorkerRequests> _idleWorkerRequests;
AutoScheduleContext::Ptr _autoSContext;

private:
/**
Expand All @@ -78,7 +79,6 @@ class AutoSchedule : public MultiSchedule {
std::promise<void> _firstLoadPromise;
bool _exitFlag = {false};
size_t _cpuHelpInferCount = 0;
AutoScheduleContext::Ptr _autoSContext;
size_t _nCTputDeviceNums;
};

Expand Down
154 changes: 27 additions & 127 deletions src/plugins/auto/bind_multi_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,140 +6,40 @@
#include "async_infer_request.hpp"
#include "plugin.hpp"
#include "bind_multi_schedule.hpp"
#include "multi_executable_network.hpp"
// ------------------------------MultiSchedule----------------------------
namespace MultiDevicePlugin {

thread_local IE::IInferRequestInternal* BinderMultiSchedule::_sharedRequest = nullptr;

void BinderMultiSchedule::init(const ScheduleContext::Ptr& sContext) {
MultiSchedule::init(sContext);
AutoSchedule::init(sContext);
LOG_INFO_TAG("enable bind buffer for AUTO");
}

Pipeline BinderMultiSchedule::GetPipeline(const IInferPtr& syncInferRequest, WorkerInferRequest** workerInferRequest) {
Pipeline pipeline = {
// if the request is coming with device-specific remote blobs make sure it is scheduled to the specific device only:
Stage {
/*TaskExecutor*/ std::make_shared<IE::ImmediateExecutor>(), /*task*/ [this, &syncInferRequest, workerInferRequest]() {
// by default, no preferred device:
_thisPreferredDeviceName = "";
auto execNetwork = _multiSContext->_executableNetwork.lock();
// if any input is remote (e.g. was set with SetBlob), let' use the corresponding device
for (const auto& it : execNetwork->GetInputsInfo()) {
auto b = syncInferRequest->GetBlob(it.first);
auto r = b->as<IE::RemoteBlob>();
if (r) {
const auto name = r->getDeviceName();
const auto res = std::find_if(
_multiSContext->_devicePrioritiesInitial.cbegin(),
_multiSContext->_devicePrioritiesInitial.cend(),
[&name](const MultiDevicePlugin::DeviceInformation & d) {
return (d.defaultDeviceID.empty() ? d.deviceName : (d.deviceName + "." +
d.defaultDeviceID)) == name;
});
if (_multiSContext->_devicePrioritiesInitial.cend() == res) {
IE_THROW() <<
"None of the devices (for which current MULTI-device configuration was "
"initialized) supports a remote blob created on the device named " << name;
} else {
// it is ok to take the c_str() here (as pointed in the executable_network.hpp we need to use const char*)
// as the original strings are from the "persistent" vector (with the right lifetime)
_thisPreferredDeviceName = res->deviceName.c_str();
break;
}
}
}
_thisWorkerInferRequest = *workerInferRequest;
_sharedRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>(syncInferRequest)->GetSharedRequest()._ptr.get();
}},
// as the scheduling algo may select any device, this stage accepts the scheduling decision (actual workerRequest)
// then sets the device-agnostic blobs to the actual (device-specific) request
Stage {
/*TaskExecutor*/std::dynamic_pointer_cast<IE::ITaskExecutor>(shared_from_this()), /*task*/ [&syncInferRequest, workerInferRequest]() {
*workerInferRequest = _thisWorkerInferRequest;
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>(syncInferRequest);
multiSyncInferRequest->SetBlobsToAnotherRequest(_thisWorkerInferRequest->_inferRequest);
INFO_RUN([workerInferRequest]() {
(*workerInferRequest)->_startTimes.push_back(std::chrono::steady_clock::now());
});
}},
// final task in the pipeline:
Stage {
/*TaskExecutor*/std::make_shared<ThisRequestExecutor>(workerInferRequest), /*task*/ [this, &syncInferRequest, workerInferRequest]() {
if (nullptr != (*workerInferRequest)->_exceptionPtr) {
std::rethrow_exception((*workerInferRequest)->_exceptionPtr);
}
if (_multiSContext->_needPerfCounters) {
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>
(syncInferRequest);
multiSyncInferRequest->_scheduledRequest =
(*workerInferRequest)->_inferRequest;
}
INFO_RUN([workerInferRequest]() {
(*workerInferRequest)->_endTimes.push_back(std::chrono::steady_clock::now());
});
}}
};
return pipeline;
}

bool BinderMultiSchedule::ScheduleToWorkerInferRequest(IE::Task inferPipelineTask, DeviceName preferred_device) {
std::vector<DeviceInformation> devices;
devices = [&] {
std::lock_guard<std::mutex> lock(_multiSContext->_mutex);
return _multiSContext->_devicePriorities;
}();
for (auto&& device : devices) {
if (!preferred_device.empty() && (device.deviceName != preferred_device)) {
continue;
}
if (RunPipelineTask(inferPipelineTask, _idleWorkerRequests[device.deviceName], preferred_device)) {
return true;
}
}
// no vacant requests this time, storing the task to the respective queue
if (!preferred_device.empty()) {
_inferPipelineTasksDeviceSpecific[preferred_device]->push(std::move(inferPipelineTask));
} else {
_inferPipelineTasks.push(std::move(inferPipelineTask));
}
return false;
}

bool BinderMultiSchedule::RunPipelineTask(IE::Task& inferPipelineTask,
NotBusyWorkerRequests& idleWorkerRequests,
const DeviceName& preferred_device) {
WorkerInferRequest* workerRequestPtr = nullptr;
WorkerInferRequest* headWorker = nullptr;
bool flag = false;
while (idleWorkerRequests.try_pop(workerRequestPtr)) {
if (flag && workerRequestPtr == headWorker)
break;
if (!flag) {
headWorker = workerRequestPtr;
flag = true;
}
IdleGuard<NotBusyWorkerRequests> idleGuard{workerRequestPtr, idleWorkerRequests};
if (_sharedRequest == workerRequestPtr->_inferRequest._ptr.get()) {
_thisWorkerInferRequest = workerRequestPtr;
{
auto capturedTask = std::move(inferPipelineTask);
Pipeline pipeline;
struct RequestExecutor : ITaskExecutor {
explicit RequestExecutor(InferenceEngine::SoIInferRequestInternal& inferRequest) : _inferRequest(inferRequest) {
_inferRequest->SetCallback([this](std::exception_ptr exceptionPtr) mutable {
_exceptionPtr = exceptionPtr;
auto capturedTask = std::move(_task);
capturedTask();
}
idleGuard.Release();
return true;
});
}
}
return false;
}

void BinderMultiSchedule::run(IE::Task inferPipelineTask) {
if (_thisWorkerInferRequest) {
auto capturedTask = std::move(inferPipelineTask);
capturedTask();
} else {
ScheduleToWorkerInferRequest(std::move(inferPipelineTask), _thisPreferredDeviceName);
}
void run(InferenceEngine::Task task) override {
_task = std::move(task);
_inferRequest->StartAsync();
};
InferenceEngine::SoIInferRequestInternal& _inferRequest;
std::exception_ptr _exceptionPtr;
InferenceEngine::Task _task;
};
auto requestExecutor = std::make_shared<RequestExecutor>(
std::static_pointer_cast<MultiDeviceInferRequest>(syncInferRequest)->GetSharedRequest());
pipeline.emplace_back(requestExecutor, [requestExecutor] {
if (nullptr != requestExecutor->_exceptionPtr) {
std::rethrow_exception(requestExecutor->_exceptionPtr);
}
});
return pipeline;
}

BinderMultiSchedule::~BinderMultiSchedule() {
Expand All @@ -153,7 +53,7 @@ IInferPtr BinderMultiSchedule::CreateInferRequestImpl(
SoInfer request_to_share_blobs_with;
// borrowing device-specific blobs from the underlying requests for the device-agnostic, user-facing requests
// this allows to potentially save on the data-copy later (if the requests are scheduled in the same order)
for (const auto& device : _multiSContext->_devicePrioritiesInitial) {
for (const auto& device : _autoSContext->_devicePrioritiesInitial) {
auto& dev_requests = _workerRequests[device.deviceName];
if ((num - sum) < dev_requests.size()) {
request_to_share_blobs_with = dev_requests.at(num - sum)._inferRequest;
Expand All @@ -177,7 +77,7 @@ IInferPtr BinderMultiSchedule::CreateInferRequestImpl(IE::InputsDataMap networkI
size_t sum = 0;
// borrowing device-specific blobs from the underlying requests for the device-agnostic, user-facing requests
// this allows to potentially save on the data-copy later (if the requests are scheduled in the same order)
for (const auto& device : _multiSContext->_devicePrioritiesInitial) {
for (const auto& device : _autoSContext->_devicePrioritiesInitial) {
auto& dev_requests = _workerRequests[device.deviceName];
if ((num - sum) < dev_requests.size()) {
request_to_share_blobs_with = dev_requests.at(num - sum)._inferRequest;
Expand Down
12 changes: 2 additions & 10 deletions src/plugins/auto/bind_multi_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
///////////////////////////////////////////////////////////////////////////////////////////////////
#pragma once

#include "multi_schedule.hpp"
#include "auto_schedule.hpp"

#ifdef MULTIUNITTEST
#define MOCKTESTMACRO virtual
Expand All @@ -15,22 +15,14 @@
#endif

namespace MultiDevicePlugin {
class BinderMultiSchedule : public MultiSchedule {
class BinderMultiSchedule : public AutoSchedule {
public:
using Ptr = std::shared_ptr<BinderMultiSchedule>;
IInferPtr CreateInferRequestImpl(IE::InputsDataMap networkInputs, IE::OutputsDataMap networkOutputs) override;
IE::IInferRequestInternal::Ptr CreateInferRequestImpl(const std::vector<std::shared_ptr<const ov::Node>>& inputs,
const std::vector<std::shared_ptr<const ov::Node>>& outputs) override;
void run(IE::Task inferTask) override;
void init(const ScheduleContext::Ptr& sContext) override;
Pipeline GetPipeline(const IInferPtr& syncRequestImpl, WorkerInferRequest** WorkerInferRequest) override;
virtual ~BinderMultiSchedule();

protected:
static bool RunPipelineTask(IE::Task& inferPipelineTask, NotBusyWorkerRequests& idleWorkerRequests, const DeviceName& preferred_device);
bool ScheduleToWorkerInferRequest(IE::Task, DeviceName preferred_device = "") override;

protected:
thread_local static IE::IInferRequestInternal* _sharedRequest;
};
} // namespace MultiDevicePlugin
1 change: 0 additions & 1 deletion src/plugins/auto/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ class MultiScheduleContext : public ScheduleContext {
std::mutex _mutex;
bool _needPerfCounters;
bool _batchingDisabled = {false};
bool _bindBuffer = false;
bool _startupfallback = true;
bool _runtimeFallback = true;
virtual ~MultiScheduleContext() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/auto/multi_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ IInferPtr MultiSchedule::CreateInferRequest() {
if (!so)
so = _passthroughExeNet._so;
syncRequestImpl->setPointerToSo(so);
} else if (_multiSContext->_bindBuffer) {
} else if (std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest()) {
auto sharedRequest = std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest();
if (sharedRequest._ptr->getPointerToSo())
syncRequestImpl->setPointerToSo(sharedRequest._ptr->getPointerToSo());
Expand Down
20 changes: 14 additions & 6 deletions src/plugins/auto/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,18 +481,26 @@ IExecutableNetworkInternal::Ptr MultiDeviceInferencePlugin::LoadNetworkImpl(cons
autoSContext->_plugin = this;
autoSContext->_core = GetCore();
autoSContext->_LogTag = _LogTag;
autoSContext->_bindBuffer = loadConfig.get_property(ov::intel_auto::device_bind_buffer);
autoSContext->_startupfallback = loadConfig.get_property(ov::intel_auto::enable_startup_fallback);
autoSContext->_runtimeFallback = loadConfig.get_property(ov::intel_auto::enable_runtime_fallback);
auto execNetwork = std::make_shared<AutoExecutableNetwork>(autoSContext, std::make_shared<AutoSchedule>());
IExecutableNetworkInternal::Ptr impl;
// enable bind only in cumulative_throughput mode
if (loadConfig.get_property(ov::intel_auto::device_bind_buffer) &&
autoSContext->_performanceHint == "CUMULATIVE_THROUGHPUT") {
LOG_INFO_TAG("runtime fallback set to disabled in binder mode");
autoSContext->_runtimeFallback = false;
impl = std::make_shared<AutoExecutableNetwork>(autoSContext, std::make_shared<BinderMultiSchedule>());
} else {
impl = std::make_shared<AutoExecutableNetwork>(autoSContext, std::make_shared<AutoSchedule>());
}
if (!modelPath.empty()) {
SetExeNetworkInfo(execNetwork,
SetExeNetworkInfo(impl,
autoSContext->_hwExecutableNetwork->GetInputsInfo(),
autoSContext->_hwExecutableNetwork->GetOutputsInfo());
execNetwork->setInputs(autoSContext->_hwExecutableNetwork->getInputs());
execNetwork->setOutputs(autoSContext->_hwExecutableNetwork->getOutputs());
impl->setInputs(autoSContext->_hwExecutableNetwork->getInputs());
impl->setOutputs(autoSContext->_hwExecutableNetwork->getOutputs());
}
return execNetwork;
return impl;
}

QueryNetworkResult MultiDeviceInferencePlugin::QueryNetwork(const CNNNetwork& network,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,12 @@ INSTANTIATE_TEST_SUITE_P(smoke_HETERO_OVClassLoadNetworkWithSecondaryPropertiesT
// IE Class load and check network with ov::device::properties
INSTANTIATE_TEST_SUITE_P(smoke_CPU_OVClassLoadNetworkAndCheckWithSecondaryPropertiesTest,
OVClassLoadNetworkAndCheckSecondaryPropertiesTest,
::testing::Combine(::testing::Values("CPU", "MULTI:CPU"),
::testing::Combine(::testing::Values("CPU"),
::testing::ValuesIn(configsDeviceProperties)));

INSTANTIATE_TEST_SUITE_P(smoke_CPU_OVClassLoadNetworkAndCheckWithSecondaryPropertiesDoubleTest,
OVClassLoadNetworkAndCheckSecondaryPropertiesTest,
::testing::Combine(::testing::Values("CPU", "MULTI:CPU"),
::testing::Combine(::testing::Values("CPU"),
::testing::ValuesIn(configsDevicePropertiesDouble)));
INSTANTIATE_TEST_SUITE_P(
smoke_OVClassLoadNetworkTest, OVClassLoadNetworkTest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ std::vector<std::string> disabledTestPatterns() {
R"(.*Behavior.*ExecutableNetworkBaseTest.*canSetConfigToExecNetWithIncorrectConfig.*)",
R"(.*Hetero.*Behavior.*ExecutableNetworkBaseTest.*ExecGraphInfo.*)",
R"(.*Hetero.*Behavior.*ExecutableNetworkBaseTest.*CanCreateTwoExeNetworksAndCheckFunction.*)",
R"(.*Hetero.*Behavior.*ExecutableNetworkBaseTest.*canLoadCorrectNetworkToGetMetricAndCheckConfig.*)",
// AutoExcutableNetwork does not support GetConfig
R"(.*(Auto|Multi).*Behavior.*ExecutableNetworkBaseTest.*canLoadCorrectNetworkToGetExecutableAndCheckConfig.*)",
// AutoExcutableNetwork GetMetric does not support key ov::num_streams
R"(.*smoke_CPU.*OVClassLoadNetworkAndCheckSecondaryProperties.*LoadNetworkAndCheckSecondaryPropertiesTest.*)",

// CPU does not support dynamic rank
// Issue: CVS-66778
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ TEST_P(MultiDevice_Test, cannotInferRemoteBlobIfNotInitializedForDevice) {
}

TEST_P(MultiDevice_Bind_oversubsciption_test, oversubsciptionOfInferRequest) {
GTEST_SKIP();
InferenceEngine::CNNNetwork net(fn_ptr);
auto ie = PluginCache::get().ie();
// load a network to the GPU to make sure we have a remote context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ TEST_P(ExecutableNetworkBaseTest, canLoadCorrectNetworkToGetExecutableAndCheckCo
auto execNet = ie->LoadNetwork(cnnNet, target_device, configuration);
for (const auto& configItem : configuration) {
InferenceEngine::Parameter param;
ASSERT_NO_THROW(param = execNet.GetConfig(configItem.first));
try {
param = execNet.GetConfig(configItem.first);
} catch (InferenceEngine::Exception&) {
ASSERT_NO_THROW(param = execNet.GetMetric(configItem.first));
}
ASSERT_FALSE(param.empty());
ASSERT_EQ(param, InferenceEngine::Parameter(configItem.second));
}
Expand Down Expand Up @@ -294,16 +298,6 @@ TEST_P(ExecutableNetworkBaseTest, pluginDoesNotChangeOriginalNetwork) {
compare_functions(cnnNet.getFunction(), referenceNetwork);
}

TEST_P(ExecutableNetworkBaseTest, canLoadCorrectNetworkToGetMetricAndCheckConfig) {
auto execNet = ie->LoadNetwork(cnnNet, target_device, configuration);
for (const auto& configItem : configuration) {
InferenceEngine::Parameter param;
ASSERT_NO_THROW(param = execNet.GetMetric(configItem.first));
ASSERT_FALSE(param.empty());
ASSERT_EQ(param, InferenceEngine::Parameter(configItem.second));
}
}

class ExecNetSetPrecision : public BehaviorTestsUtils::BehaviorTestsBasicBase,
public BehaviorTestsUtils::IEExecutableNetworkTestBase {
protected:
Expand Down

0 comments on commit fb6b33b

Please sign in to comment.