From 3b6adcd65168f9cd26d139ebe1089b0548e7f961 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Tue, 15 Oct 2024 14:30:13 +0800 Subject: [PATCH 01/11] update. --- src/plugins/auto/src/schedule.cpp | 47 ++++++++++++++++++++++--------- src/plugins/auto/src/schedule.hpp | 3 +- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index 96a815cc21c8c6..92ee3a84375187 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -9,6 +9,7 @@ namespace auto_plugin { thread_local WorkerInferRequest* Schedule::m_this_worker_infer_request = nullptr; // TODO: revert to the plain variable (see header file), when we moved to the next CentOS 8.x in our support matrix thread_local const char* Schedule::m_this_preferred_device_name = ""; +int32_t Schedule::m_need_retry_times = 0; void Schedule::launch(const ScheduleContext::Ptr& context) { m_context = context; @@ -55,18 +56,27 @@ bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, NotBusyPriorityWorkerRequests& idle_workerrequests, const DeviceName& preferred_device) { WorkerInferRequest* worker_request_ptr = nullptr; + static int index = 0; std::pair worker; - if (idle_workerrequests.try_pop(worker)) { - worker_request_ptr = worker.second; - IdleGuard idle_guard{worker_request_ptr, idle_workerrequests}; - m_this_worker_infer_request = worker_request_ptr; - { - auto captured_task = std::move(pipeline_task); - captured_task(); + std::cout << "------- start try pop -------\n"; + do { + std::cout << "------- [Need retry: " << m_need_retry_times << "] try pop index: " << index << std::endl; + if (idle_workerrequests.try_pop(worker)) { + std::cout << "------- [Need retry: " << m_need_retry_times << "] popped index: " << index++ << std::endl; + worker_request_ptr = worker.second; + IdleGuard idle_guard{worker_request_ptr, idle_workerrequests}; + m_this_worker_infer_request = worker_request_ptr; + { + auto captured_task = std::move(pipeline_task); + captured_task(); + } + idle_guard.release(); + return true; + } else { + std::cout << "1234567 [Need retry: " << m_need_retry_times << "] Failed to pop index: " << index + << std::endl; } - idle_guard.release(); - return true; - } + } while (m_need_retry_times-- > 0); return false; } @@ -85,8 +95,11 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel OPENVINO_THROW("Every device used with AUTO should support query optimal_number_of_infer_requests property from compiled model ", iie.what()); } - const auto num_requests = (m_context->m_device_priorities.end() == it_numrequests || - it_numrequests->num_requests_per_devices == -1) ? optimal_num : it_numrequests->num_requests_per_devices; + auto num_requests = + (m_context->m_device_priorities.end() == it_numrequests || it_numrequests->num_requests_per_devices == -1) + ? optimal_num + : it_numrequests->num_requests_per_devices; + num_requests = num_requests < 2 ? 2 : num_requests; auto& worker_requests = m_worker_requests[device]; auto& idle_worker_requests = m_idle_worker_requests[device]; worker_requests.resize(num_requests); @@ -128,10 +141,13 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel } else { stop_retry_and_continue(); } + static int index = 0; + std::cout << "------- try push index: " << ++index << std::endl; // try to return the request to the idle list (fails if the overall object destruction has began) - if (idleGuard.release()->try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { + if (index % 2 && idleGuard.release()->try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { // let's try to pop a task, as we know there is at least one idle request, schedule if succeeded // if no device-agnostic tasks, let's try pop the device specific task, schedule if succeeded + std::cout << "------- pushed index: " << index << std::endl; ov::threading::Task t; do { m_infer_pipeline_tasks.try_pop(t); @@ -139,6 +155,11 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel do { m_infer_pipeline_tasks_device_specific[device]->try_pop(t); } while (t && schedule_to_worker_infer_request(std::move(t), device)); + } else { + m_need_retry_times++; + std::cout << "-------[Need to retry: " << m_need_retry_times + << " ] Failed to try push index: " << index << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(1)); } } }); diff --git a/src/plugins/auto/src/schedule.hpp b/src/plugins/auto/src/schedule.hpp index 99efa3138cef00..0568e88b220118 100644 --- a/src/plugins/auto/src/schedule.hpp +++ b/src/plugins/auto/src/schedule.hpp @@ -25,7 +25,8 @@ class Schedule : public std::enable_shared_from_this, public ov::threa // have to use the const char* ptr rather than std::string due to a bug in old gcc versions, // the bug is e.g. manifesting on the old CentOS (and it's 4.8.x gcc) used in our testing // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81880 - static thread_local const char* m_this_preferred_device_name; + static thread_local const char* m_this_preferred_device_name; + static std::int32_t m_need_retry_times; protected: virtual void init() = 0; From 15f9def04cadf8d45c628f5eee0dce760345197b Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Mon, 28 Oct 2024 16:17:46 +0800 Subject: [PATCH 02/11] enable sync for handling worker infer requests. --- src/plugins/auto/src/auto_schedule.cpp | 8 ++- src/plugins/auto/src/cumulative_schedule.cpp | 6 +- src/plugins/auto/src/schedule.cpp | 73 +++++++++++--------- src/plugins/auto/src/schedule.hpp | 10 ++- 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/src/plugins/auto/src/auto_schedule.cpp b/src/plugins/auto/src/auto_schedule.cpp index ea5f2159179824..f7389fa2fc7bce 100644 --- a/src/plugins/auto/src/auto_schedule.cpp +++ b/src/plugins/auto/src/auto_schedule.cpp @@ -212,10 +212,12 @@ void AutoSchedule::init() { // initialize containers before run async task m_idle_worker_requests[device.device_name]; m_worker_requests[device.device_name]; + m_worker_requests_cvs[device.device_name]; m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr; } m_idle_worker_requests["CPU_HELP"]; m_worker_requests["CPU_HELP"]; + m_worker_requests_cvs["CPU_HELP"]; m_infer_pipeline_tasks_device_specific["CPU_HELP"] = nullptr; m_executor->run(m_compile_context[CPU].m_task); m_executor->run(m_compile_context[ACTUALDEVICE].m_task); @@ -486,7 +488,11 @@ bool AutoSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline if (!preferred_device.empty() && (device.device_name != preferred_device)) { continue; } - if (run_pipeline_task(pipeline_task, m_idle_worker_requests[device.device_name], preferred_device)) { + if (run_pipeline_task(pipeline_task, + m_idle_worker_requests[device.device_name], + preferred_device, + m_worker_requests_cvs[device.device_name], + m_worker_infer_mutex)) { return true; } } diff --git a/src/plugins/auto/src/cumulative_schedule.cpp b/src/plugins/auto/src/cumulative_schedule.cpp index a607205e17d1e5..adabfb9826c670 100644 --- a/src/plugins/auto/src/cumulative_schedule.cpp +++ b/src/plugins/auto/src/cumulative_schedule.cpp @@ -247,7 +247,11 @@ bool CumuSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline } auto selected_device_name = preferred_device.empty() ? schedule_to_next_device(devices, current_device_index) : preferred_device; - if (run_pipeline_task(pipeline_task, m_idle_worker_requests[selected_device_name], preferred_device)) { + if (run_pipeline_task(pipeline_task, + m_idle_worker_requests[selected_device_name], + preferred_device, + m_worker_requests_cvs[selected_device_name], + m_worker_infer_mutex)) { return true; } else { current_device_index++; diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index 92ee3a84375187..ddfa2fd3dc1506 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -9,7 +9,6 @@ namespace auto_plugin { thread_local WorkerInferRequest* Schedule::m_this_worker_infer_request = nullptr; // TODO: revert to the plain variable (see header file), when we moved to the next CentOS 8.x in our support matrix thread_local const char* Schedule::m_this_preferred_device_name = ""; -int32_t Schedule::m_need_retry_times = 0; void Schedule::launch(const ScheduleContext::Ptr& context) { m_context = context; @@ -53,30 +52,36 @@ void Schedule::run(ov::threading::Task pipeline_task) { } bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, - NotBusyPriorityWorkerRequests& idle_workerrequests, - const DeviceName& preferred_device) { + NotBusyPriorityWorkerRequests& idle_workerrequests, + const DeviceName& preferred_device, + std::condition_variable& idle_workerrequests_cv, + std::mutex& worker_infer_mutex) { WorkerInferRequest* worker_request_ptr = nullptr; static int index = 0; std::pair worker; + std::unique_lock lck(worker_infer_mutex); std::cout << "------- start try pop -------\n"; - do { - std::cout << "------- [Need retry: " << m_need_retry_times << "] try pop index: " << index << std::endl; - if (idle_workerrequests.try_pop(worker)) { - std::cout << "------- [Need retry: " << m_need_retry_times << "] popped index: " << index++ << std::endl; - worker_request_ptr = worker.second; - IdleGuard idle_guard{worker_request_ptr, idle_workerrequests}; - m_this_worker_infer_request = worker_request_ptr; - { - auto captured_task = std::move(pipeline_task); - captured_task(); - } - idle_guard.release(); - return true; - } else { - std::cout << "1234567 [Need retry: " << m_need_retry_times << "] Failed to pop index: " << index - << std::endl; + std::cout << "------- try pop index: " << index << std::endl; + if (!idle_workerrequests.try_pop(worker)) { + std::cout << "------- pop failed and will wait......" << std::endl; + idle_workerrequests_cv.wait(lck, [&idle_workerrequests, &worker] { + return idle_workerrequests.try_pop(worker); + }); + } + if (worker.second) { + std::cout << "------- popped index: " << index++ << std::endl; + worker_request_ptr = worker.second; + IdleGuard idle_guard{worker_request_ptr, idle_workerrequests}; + m_this_worker_infer_request = worker_request_ptr; + { + auto captured_task = std::move(pipeline_task); + captured_task(); } - } while (m_need_retry_times-- > 0); + idle_guard.release(); + return true; + } else { + std::cout << "------- Failed to pop index: " << index << std::endl; + } return false; } @@ -99,9 +104,9 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel (m_context->m_device_priorities.end() == it_numrequests || it_numrequests->num_requests_per_devices == -1) ? optimal_num : it_numrequests->num_requests_per_devices; - num_requests = num_requests < 2 ? 2 : num_requests; auto& worker_requests = m_worker_requests[device]; auto& idle_worker_requests = m_idle_worker_requests[device]; + auto& worker_requests_cv = m_worker_requests_cvs[device]; worker_requests.resize(num_requests); m_infer_pipeline_tasks_device_specific[device] = std::unique_ptr(new TaskQueue); auto* idle_workerrequests_ptr = &(idle_worker_requests); @@ -111,9 +116,11 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel worker_request.m_inferrequest = {compiled_model->create_infer_request(), compiled_model._so}; auto* worker_request_ptr = &worker_request; worker_request_ptr->m_index = num++; - OPENVINO_ASSERT(idle_worker_requests.try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr)) == true); + OPENVINO_ASSERT( + idle_worker_requests.try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr)) == true); worker_request.m_inferrequest->set_callback( - [worker_request_ptr, this, device, idle_workerrequests_ptr](std::exception_ptr exception_ptr) mutable { + [worker_request_ptr, this, device, idle_workerrequests_ptr, &worker_requests_cv]( + std::exception_ptr exception_ptr) mutable { IdleGuard idleGuard{worker_request_ptr, *idle_workerrequests_ptr}; worker_request_ptr->m_exception_ptr = std::move(exception_ptr); { @@ -142,11 +149,15 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel stop_retry_and_continue(); } static int index = 0; - std::cout << "------- try push index: " << ++index << std::endl; - // try to return the request to the idle list (fails if the overall object destruction has began) - if (index % 2 && idleGuard.release()->try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { - // let's try to pop a task, as we know there is at least one idle request, schedule if succeeded - // if no device-agnostic tasks, let's try pop the device specific task, schedule if succeeded + std::cout << "------- try push index: " << index++ << std::endl; + // try to return the request to the idle list (fails if the overall object destruction has + // began) + //std::this_thread::sleep_for(std::chrono::seconds(5)); + if (idleGuard.release()->try_push( + std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { + // let's try to pop a task, as we know there is at least one idle request, schedule if + // succeeded if no device-agnostic tasks, let's try pop the device specific task, schedule + // if succeeded std::cout << "------- pushed index: " << index << std::endl; ov::threading::Task t; do { @@ -155,11 +166,7 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel do { m_infer_pipeline_tasks_device_specific[device]->try_pop(t); } while (t && schedule_to_worker_infer_request(std::move(t), device)); - } else { - m_need_retry_times++; - std::cout << "-------[Need to retry: " << m_need_retry_times - << " ] Failed to try push index: " << index << std::endl; - std::this_thread::sleep_for(std::chrono::seconds(1)); + worker_requests_cv.notify_all(); } } }); diff --git a/src/plugins/auto/src/schedule.hpp b/src/plugins/auto/src/schedule.hpp index 0568e88b220118..f7ae523ea986b8 100644 --- a/src/plugins/auto/src/schedule.hpp +++ b/src/plugins/auto/src/schedule.hpp @@ -26,12 +26,14 @@ class Schedule : public std::enable_shared_from_this, public ov::threa // the bug is e.g. manifesting on the old CentOS (and it's 4.8.x gcc) used in our testing // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81880 static thread_local const char* m_this_preferred_device_name; - static std::int32_t m_need_retry_times; protected: virtual void init() = 0; - static bool run_pipeline_task(ov::threading::Task& pipeline_task, NotBusyPriorityWorkerRequests& idle_worker_request, - const DeviceName& preferred_device); + static bool run_pipeline_task(ov::threading::Task& pipeline_task, + NotBusyPriorityWorkerRequests& idle_worker_request, + const DeviceName& preferred_device, + std::condition_variable& idle_worker_request_cv, + std::mutex& mutex); virtual void generate_workers(const std::string& device, const SoCompiledModel& compiled_model); virtual void try_to_compile_model(AutoCompileContext& context, const std::shared_ptr& model) = 0; virtual bool schedule_to_worker_infer_request(ov::threading::Task, DeviceName preferred_device = "") = 0; @@ -41,6 +43,7 @@ class Schedule : public std::enable_shared_from_this, public ov::threa std::shared_ptr m_executor; DeviceMap m_idle_worker_requests; DeviceMap> m_worker_requests; + DeviceMap m_worker_requests_cvs; TaskQueue m_infer_pipeline_tasks; DeviceMap> m_infer_pipeline_tasks_device_specific; SoCompiledModel m_passthrough_compiled_model; @@ -51,6 +54,7 @@ class Schedule : public std::enable_shared_from_this, public ov::threa mutable std::atomic m_request_id = {0}; std::mutex m_dev_infer_mutex; std::unordered_map m_dev_infer; + std::mutex m_worker_infer_mutex; }; } // namespace auto_plugin From e9c7d043522b8a1297c12d4ce27dc8efac6297cf Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Tue, 29 Oct 2024 14:49:14 +0800 Subject: [PATCH 03/11] update. --- src/plugins/auto/src/schedule.cpp | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index ddfa2fd3dc1506..133347c4527d05 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -57,19 +57,14 @@ bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, std::condition_variable& idle_workerrequests_cv, std::mutex& worker_infer_mutex) { WorkerInferRequest* worker_request_ptr = nullptr; - static int index = 0; std::pair worker; std::unique_lock lck(worker_infer_mutex); - std::cout << "------- start try pop -------\n"; - std::cout << "------- try pop index: " << index << std::endl; if (!idle_workerrequests.try_pop(worker)) { - std::cout << "------- pop failed and will wait......" << std::endl; idle_workerrequests_cv.wait(lck, [&idle_workerrequests, &worker] { return idle_workerrequests.try_pop(worker); }); } if (worker.second) { - std::cout << "------- popped index: " << index++ << std::endl; worker_request_ptr = worker.second; IdleGuard idle_guard{worker_request_ptr, idle_workerrequests}; m_this_worker_infer_request = worker_request_ptr; @@ -79,8 +74,6 @@ bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, } idle_guard.release(); return true; - } else { - std::cout << "------- Failed to pop index: " << index << std::endl; } return false; } @@ -148,17 +141,11 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel } else { stop_retry_and_continue(); } - static int index = 0; - std::cout << "------- try push index: " << index++ << std::endl; - // try to return the request to the idle list (fails if the overall object destruction has - // began) - //std::this_thread::sleep_for(std::chrono::seconds(5)); if (idleGuard.release()->try_push( std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { // let's try to pop a task, as we know there is at least one idle request, schedule if // succeeded if no device-agnostic tasks, let's try pop the device specific task, schedule // if succeeded - std::cout << "------- pushed index: " << index << std::endl; ov::threading::Task t; do { m_infer_pipeline_tasks.try_pop(t); From eb727a475b1340e73306324233374f2cb6e5c70a Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Tue, 12 Nov 2024 16:35:52 +0800 Subject: [PATCH 04/11] update. --- src/plugins/auto/src/schedule.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index c5e839948e1476..d29e824de234a7 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -141,6 +141,7 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel } else { stop_retry_and_continue(); } + std::unique_lock lck(m_worker_infer_mutex); if (idleGuard.release()->try_push( std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { // let's try to pop a task, as we know there is at least one idle request, schedule if From 3507dfebf1593f0def4bd60529099cc06a6a6868 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Tue, 12 Nov 2024 16:39:29 +0800 Subject: [PATCH 05/11] update. --- src/plugins/auto/src/auto_schedule.cpp | 6 +++--- src/plugins/auto/src/cumulative_schedule.cpp | 2 +- src/plugins/auto/src/schedule.cpp | 2 +- src/plugins/auto/src/schedule.hpp | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/plugins/auto/src/auto_schedule.cpp b/src/plugins/auto/src/auto_schedule.cpp index d87e4b4f418212..a9ef0df8f17307 100644 --- a/src/plugins/auto/src/auto_schedule.cpp +++ b/src/plugins/auto/src/auto_schedule.cpp @@ -212,12 +212,12 @@ void AutoSchedule::init() { // initialize containers before run async task m_idle_worker_requests[device.device_name]; m_worker_requests[device.device_name]; - m_worker_requests_cvs[device.device_name]; + m_worker_requests_conds[device.device_name]; m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr; } m_idle_worker_requests["CPU_HELP"]; m_worker_requests["CPU_HELP"]; - m_worker_requests_cvs["CPU_HELP"]; + m_worker_requests_conds["CPU_HELP"]; m_infer_pipeline_tasks_device_specific["CPU_HELP"] = nullptr; m_executor->run(m_compile_context[CPU].m_task); m_executor->run(m_compile_context[ACTUALDEVICE].m_task); @@ -493,7 +493,7 @@ bool AutoSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline if (run_pipeline_task(pipeline_task, m_idle_worker_requests[device.device_name], preferred_device, - m_worker_requests_cvs[device.device_name], + m_worker_requests_conds[device.device_name], m_worker_infer_mutex)) { return true; } diff --git a/src/plugins/auto/src/cumulative_schedule.cpp b/src/plugins/auto/src/cumulative_schedule.cpp index adabfb9826c670..89672acd8a9073 100644 --- a/src/plugins/auto/src/cumulative_schedule.cpp +++ b/src/plugins/auto/src/cumulative_schedule.cpp @@ -250,7 +250,7 @@ bool CumuSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline if (run_pipeline_task(pipeline_task, m_idle_worker_requests[selected_device_name], preferred_device, - m_worker_requests_cvs[selected_device_name], + m_worker_requests_conds[selected_device_name], m_worker_infer_mutex)) { return true; } else { diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index d29e824de234a7..4457ae55383ecb 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -99,7 +99,7 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel : it_numrequests->num_requests_per_devices; auto& worker_requests = m_worker_requests[device]; auto& idle_worker_requests = m_idle_worker_requests[device]; - auto& worker_requests_cv = m_worker_requests_cvs[device]; + auto& worker_requests_cv = m_worker_requests_conds[device]; worker_requests.resize(num_requests); m_infer_pipeline_tasks_device_specific[device] = std::unique_ptr(new TaskQueue); auto* idle_workerrequests_ptr = &(idle_worker_requests); diff --git a/src/plugins/auto/src/schedule.hpp b/src/plugins/auto/src/schedule.hpp index f7ae523ea986b8..eaa2915e27a45b 100644 --- a/src/plugins/auto/src/schedule.hpp +++ b/src/plugins/auto/src/schedule.hpp @@ -43,7 +43,7 @@ class Schedule : public std::enable_shared_from_this, public ov::threa std::shared_ptr m_executor; DeviceMap m_idle_worker_requests; DeviceMap> m_worker_requests; - DeviceMap m_worker_requests_cvs; + DeviceMap m_worker_requests_conds; TaskQueue m_infer_pipeline_tasks; DeviceMap> m_infer_pipeline_tasks_device_specific; SoCompiledModel m_passthrough_compiled_model; From fdc866b441f3951b0f7d0264569abbfd766d29e9 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Tue, 12 Nov 2024 16:41:25 +0800 Subject: [PATCH 06/11] update. --- src/plugins/auto/src/schedule.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/auto/src/schedule.hpp b/src/plugins/auto/src/schedule.hpp index eaa2915e27a45b..794accc9f0e1df 100644 --- a/src/plugins/auto/src/schedule.hpp +++ b/src/plugins/auto/src/schedule.hpp @@ -25,7 +25,7 @@ class Schedule : public std::enable_shared_from_this, public ov::threa // have to use the const char* ptr rather than std::string due to a bug in old gcc versions, // the bug is e.g. manifesting on the old CentOS (and it's 4.8.x gcc) used in our testing // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81880 - static thread_local const char* m_this_preferred_device_name; + static thread_local const char* m_this_preferred_device_name; protected: virtual void init() = 0; From 47d40e14fce8bfd9574beb6a76c2fd1ec1120309 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Thu, 14 Nov 2024 09:54:40 +0800 Subject: [PATCH 07/11] update. --- src/plugins/auto/src/cumulative_schedule.cpp | 1 + src/plugins/auto/src/schedule.cpp | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/plugins/auto/src/cumulative_schedule.cpp b/src/plugins/auto/src/cumulative_schedule.cpp index 89672acd8a9073..153e2c09e52d75 100644 --- a/src/plugins/auto/src/cumulative_schedule.cpp +++ b/src/plugins/auto/src/cumulative_schedule.cpp @@ -148,6 +148,7 @@ void CumuSchedule::init() { // initialize containers before run async task, if not initialized, it will hang during infer m_idle_worker_requests[device.device_name]; m_worker_requests[device.device_name]; + m_worker_requests_conds[device.device_name]; m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr; } // load devices other than CPU first diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index 4457ae55383ecb..ae12e45c810488 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -58,11 +58,13 @@ bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, std::mutex& worker_infer_mutex) { WorkerInferRequest* worker_request_ptr = nullptr; std::pair worker; - std::unique_lock lck(worker_infer_mutex); - if (!idle_workerrequests.try_pop(worker)) { - idle_workerrequests_cv.wait(lck, [&idle_workerrequests, &worker] { - return idle_workerrequests.try_pop(worker); - }); + { + std::unique_lock lck(worker_infer_mutex); + if (!idle_workerrequests.try_pop(worker)) { + idle_workerrequests_cv.wait(lck, [&idle_workerrequests, &worker] { + return idle_workerrequests.try_pop(worker); + }); + } } if (worker.second) { worker_request_ptr = worker.second; From 81f346cccf08b0425b7ec5d274ca1b2f8904fde6 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Wed, 4 Dec 2024 15:20:15 +0800 Subject: [PATCH 08/11] remove the condition variable control and increase the number of worker requests to at least 2 per device to avoid deadlock. --- src/plugins/auto/src/auto_schedule.cpp | 8 +--- src/plugins/auto/src/cumulative_schedule.cpp | 7 +--- src/plugins/auto/src/schedule.cpp | 43 +++++++------------- src/plugins/auto/src/schedule.hpp | 9 +--- 4 files changed, 19 insertions(+), 48 deletions(-) diff --git a/src/plugins/auto/src/auto_schedule.cpp b/src/plugins/auto/src/auto_schedule.cpp index a9ef0df8f17307..c504e8e4457870 100644 --- a/src/plugins/auto/src/auto_schedule.cpp +++ b/src/plugins/auto/src/auto_schedule.cpp @@ -212,12 +212,10 @@ void AutoSchedule::init() { // initialize containers before run async task m_idle_worker_requests[device.device_name]; m_worker_requests[device.device_name]; - m_worker_requests_conds[device.device_name]; m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr; } m_idle_worker_requests["CPU_HELP"]; m_worker_requests["CPU_HELP"]; - m_worker_requests_conds["CPU_HELP"]; m_infer_pipeline_tasks_device_specific["CPU_HELP"] = nullptr; m_executor->run(m_compile_context[CPU].m_task); m_executor->run(m_compile_context[ACTUALDEVICE].m_task); @@ -490,11 +488,7 @@ bool AutoSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline if (!preferred_device.empty() && (device.device_name != preferred_device)) { continue; } - if (run_pipeline_task(pipeline_task, - m_idle_worker_requests[device.device_name], - preferred_device, - m_worker_requests_conds[device.device_name], - m_worker_infer_mutex)) { + if (run_pipeline_task(pipeline_task, m_idle_worker_requests[device.device_name], preferred_device)) { return true; } } diff --git a/src/plugins/auto/src/cumulative_schedule.cpp b/src/plugins/auto/src/cumulative_schedule.cpp index 153e2c09e52d75..a607205e17d1e5 100644 --- a/src/plugins/auto/src/cumulative_schedule.cpp +++ b/src/plugins/auto/src/cumulative_schedule.cpp @@ -148,7 +148,6 @@ void CumuSchedule::init() { // initialize containers before run async task, if not initialized, it will hang during infer m_idle_worker_requests[device.device_name]; m_worker_requests[device.device_name]; - m_worker_requests_conds[device.device_name]; m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr; } // load devices other than CPU first @@ -248,11 +247,7 @@ bool CumuSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline } auto selected_device_name = preferred_device.empty() ? schedule_to_next_device(devices, current_device_index) : preferred_device; - if (run_pipeline_task(pipeline_task, - m_idle_worker_requests[selected_device_name], - preferred_device, - m_worker_requests_conds[selected_device_name], - m_worker_infer_mutex)) { + if (run_pipeline_task(pipeline_task, m_idle_worker_requests[selected_device_name], preferred_device)) { return true; } else { current_device_index++; diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index ae12e45c810488..198b115fdb5c79 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -53,20 +53,10 @@ void Schedule::run(ov::threading::Task pipeline_task) { bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, NotBusyPriorityWorkerRequests& idle_workerrequests, - const DeviceName& preferred_device, - std::condition_variable& idle_workerrequests_cv, - std::mutex& worker_infer_mutex) { + const DeviceName& preferred_device) { WorkerInferRequest* worker_request_ptr = nullptr; std::pair worker; - { - std::unique_lock lck(worker_infer_mutex); - if (!idle_workerrequests.try_pop(worker)) { - idle_workerrequests_cv.wait(lck, [&idle_workerrequests, &worker] { - return idle_workerrequests.try_pop(worker); - }); - } - } - if (worker.second) { + if (idle_workerrequests.try_pop(worker)) { worker_request_ptr = worker.second; IdleGuard idle_guard{worker_request_ptr, idle_workerrequests}; m_this_worker_infer_request = worker_request_ptr; @@ -95,13 +85,15 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel OPENVINO_THROW("Every device used with AUTO should support query optimal_number_of_infer_requests property from compiled model ", iie.what()); } - auto num_requests = - (m_context->m_device_priorities.end() == it_numrequests || it_numrequests->num_requests_per_devices == -1) - ? optimal_num - : it_numrequests->num_requests_per_devices; + auto num_requests = (m_context->m_device_priorities.end() == it_numrequests || + it_numrequests->num_requests_per_devices == -1) ? optimal_num : it_numrequests->num_requests_per_devices; + // If the user creates only one infer request, we need to ensure at least 2 requests per device. + // This is necessary to handle the case where a request worker is popped from the idle queue before being pushed back. + // Without at least 2 requests, there could be a situation where no requests are available for inference, + // leading to potential deadlocks. + num_requests = num_requests <= 1 ? 2 : num_requests; auto& worker_requests = m_worker_requests[device]; auto& idle_worker_requests = m_idle_worker_requests[device]; - auto& worker_requests_cv = m_worker_requests_conds[device]; worker_requests.resize(num_requests); m_infer_pipeline_tasks_device_specific[device] = std::unique_ptr(new TaskQueue); auto* idle_workerrequests_ptr = &(idle_worker_requests); @@ -111,11 +103,9 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel worker_request.m_inferrequest = {compiled_model->create_infer_request(), compiled_model._so}; auto* worker_request_ptr = &worker_request; worker_request_ptr->m_index = num++; - OPENVINO_ASSERT( - idle_worker_requests.try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr)) == true); + OPENVINO_ASSERT(idle_worker_requests.try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr)) == true); worker_request.m_inferrequest->set_callback( - [worker_request_ptr, this, device, idle_workerrequests_ptr, &worker_requests_cv]( - std::exception_ptr exception_ptr) mutable { + [worker_request_ptr, this, device, idle_workerrequests_ptr](std::exception_ptr exception_ptr) mutable { IdleGuard idleGuard{worker_request_ptr, *idle_workerrequests_ptr}; worker_request_ptr->m_exception_ptr = std::move(exception_ptr); { @@ -143,12 +133,10 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel } else { stop_retry_and_continue(); } - std::unique_lock lck(m_worker_infer_mutex); - if (idleGuard.release()->try_push( - std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { - // let's try to pop a task, as we know there is at least one idle request, schedule if - // succeeded if no device-agnostic tasks, let's try pop the device specific task, schedule - // if succeeded + // try to return the request to the idle list (fails if the overall object destruction has began) + if (idleGuard.release()->try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) { + // let's try to pop a task, as we know there is at least one idle request, schedule if succeeded + // if no device-agnostic tasks, let's try pop the device specific task, schedule if succeeded ov::threading::Task t; do { m_infer_pipeline_tasks.try_pop(t); @@ -156,7 +144,6 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel do { m_infer_pipeline_tasks_device_specific[device]->try_pop(t); } while (t && schedule_to_worker_infer_request(std::move(t), device)); - worker_requests_cv.notify_all(); } } }); diff --git a/src/plugins/auto/src/schedule.hpp b/src/plugins/auto/src/schedule.hpp index 794accc9f0e1df..99efa3138cef00 100644 --- a/src/plugins/auto/src/schedule.hpp +++ b/src/plugins/auto/src/schedule.hpp @@ -29,11 +29,8 @@ class Schedule : public std::enable_shared_from_this, public ov::threa protected: virtual void init() = 0; - static bool run_pipeline_task(ov::threading::Task& pipeline_task, - NotBusyPriorityWorkerRequests& idle_worker_request, - const DeviceName& preferred_device, - std::condition_variable& idle_worker_request_cv, - std::mutex& mutex); + static bool run_pipeline_task(ov::threading::Task& pipeline_task, NotBusyPriorityWorkerRequests& idle_worker_request, + const DeviceName& preferred_device); virtual void generate_workers(const std::string& device, const SoCompiledModel& compiled_model); virtual void try_to_compile_model(AutoCompileContext& context, const std::shared_ptr& model) = 0; virtual bool schedule_to_worker_infer_request(ov::threading::Task, DeviceName preferred_device = "") = 0; @@ -43,7 +40,6 @@ class Schedule : public std::enable_shared_from_this, public ov::threa std::shared_ptr m_executor; DeviceMap m_idle_worker_requests; DeviceMap> m_worker_requests; - DeviceMap m_worker_requests_conds; TaskQueue m_infer_pipeline_tasks; DeviceMap> m_infer_pipeline_tasks_device_specific; SoCompiledModel m_passthrough_compiled_model; @@ -54,7 +50,6 @@ class Schedule : public std::enable_shared_from_this, public ov::threa mutable std::atomic m_request_id = {0}; std::mutex m_dev_infer_mutex; std::unordered_map m_dev_infer; - std::mutex m_worker_infer_mutex; }; } // namespace auto_plugin From 976aa4792c660e51b30da6c1aa92c01050d89450 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Wed, 4 Dec 2024 15:40:33 +0800 Subject: [PATCH 09/11] update. --- src/plugins/auto/src/schedule.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index 198b115fdb5c79..9f59d00311f152 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -53,7 +53,7 @@ void Schedule::run(ov::threading::Task pipeline_task) { bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, NotBusyPriorityWorkerRequests& idle_workerrequests, - const DeviceName& preferred_device) { + const DeviceName& preferred_device) { WorkerInferRequest* worker_request_ptr = nullptr; std::pair worker; if (idle_workerrequests.try_pop(worker)) { From 2ef471a26bf1a8dc2b32314e404acf60f6476cea Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Wed, 4 Dec 2024 15:46:04 +0800 Subject: [PATCH 10/11] update. --- src/plugins/auto/src/schedule.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index 9f59d00311f152..fd2b112e707a36 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -52,8 +52,8 @@ void Schedule::run(ov::threading::Task pipeline_task) { } bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task, - NotBusyPriorityWorkerRequests& idle_workerrequests, - const DeviceName& preferred_device) { + NotBusyPriorityWorkerRequests& idle_workerrequests, + const DeviceName& preferred_device) { WorkerInferRequest* worker_request_ptr = nullptr; std::pair worker; if (idle_workerrequests.try_pop(worker)) { From 10f6a9f2ac536dc21b08bc3bd6fd3b4ea158ffe0 Mon Sep 17 00:00:00 2001 From: "Wang, Yang" Date: Tue, 10 Dec 2024 15:48:18 +0800 Subject: [PATCH 11/11] update number of infer requests for throughput mode. --- src/plugins/auto/src/schedule.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/plugins/auto/src/schedule.cpp b/src/plugins/auto/src/schedule.cpp index fd2b112e707a36..0d7bd4b0ecda3c 100644 --- a/src/plugins/auto/src/schedule.cpp +++ b/src/plugins/auto/src/schedule.cpp @@ -85,13 +85,13 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel OPENVINO_THROW("Every device used with AUTO should support query optimal_number_of_infer_requests property from compiled model ", iie.what()); } - auto num_requests = (m_context->m_device_priorities.end() == it_numrequests || - it_numrequests->num_requests_per_devices == -1) ? optimal_num : it_numrequests->num_requests_per_devices; - // If the user creates only one infer request, we need to ensure at least 2 requests per device. - // This is necessary to handle the case where a request worker is popped from the idle queue before being pushed back. - // Without at least 2 requests, there could be a situation where no requests are available for inference, - // leading to potential deadlocks. - num_requests = num_requests <= 1 ? 2 : num_requests; + auto num_requests = + (m_context->m_device_priorities.end() == it_numrequests || it_numrequests->num_requests_per_devices == -1) + ? optimal_num + : it_numrequests->num_requests_per_devices; + num_requests = num_requests <= 1 && m_context->m_performance_hint == ov::hint::PerformanceMode::THROUGHPUT + ? 2 + : num_requests; auto& worker_requests = m_worker_requests[device]; auto& idle_worker_requests = m_idle_worker_requests[device]; worker_requests.resize(num_requests);