From 9b1c2e088b952ab88fceca913658b6adb01e81d2 Mon Sep 17 00:00:00 2001 From: Andrew Tulloch Date: Mon, 22 Jul 2019 19:48:55 -0700 Subject: [PATCH] [Runtime] [ThreadPool] Make SpscTaskQueue::Pop(..) spin_count configurable (#3577) In cases where we have multiple models or threadpools active, spinning around `sched_yield()` may not be desirable, as it prevents the OS from effectively scheduling other threads. Thus, allow users to conditionally disable this behaviour (via an environment variable `TVM_THREAD_POOL_SPIN_COUNT`, similar to existing environment flags for the thread pool such as `TVM_BIND_THREADS`, etc). This substantially improves tail latencies in some of our multi-tenant workloads in practice. Unit tests have been added - on my laptop, running: ``` TVM_THREAD_POOL_SPIN_COUNT=0 ./build/threading_backend_test; TVM_THREAD_POOL_SPIN_COUNT=1 ./build/threading_backend_test; ./build/threading_backend_test; ``` gives https://gist.github.com/ajtulloch/1805ca6cbaa27f5d442d23f9d0021ce6 (i.e. 97ms -> <1ms after this change) --- src/runtime/thread_pool.cc | 25 ++++++++-- tests/cpp/threading_backend_test.cc | 71 +++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 4 deletions(-) create mode 100644 tests/cpp/threading_backend_test.cc diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index c1d97f02e61a..2e101364db2a 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,6 +44,19 @@ const constexpr int kL1CacheBytes = 64; namespace tvm { namespace runtime { +namespace { + +constexpr uint32_t kDefaultSpinCount = 300000; + +uint32_t GetSpinCount() { + const char* val = getenv("TVM_THREAD_POOL_SPIN_COUNT"); + if (!val) { + return kDefaultSpinCount; + } + return atoi(val); +} + +} // namespace // stride in the page, fit to cache line. constexpr int kSyncStride = 64 / sizeof(std::atomic); @@ -176,7 +189,7 @@ class SpscTaskQueue { * \param spin_count The number of iterations to spin before sleep. * \return Whether pop is successful (true) or we need to exit now (false). */ - bool Pop(Task* output, uint32_t spin_count = 300000) { + bool Pop(Task* output, uint32_t spin_count) { // Busy wait a bit when the queue is empty. // If a new task comes to the queue quickly, this wait avoid the worker from sleeping. // The default spin count is set by following the typical omp convention @@ -335,7 +348,11 @@ class ThreadPool { SpscTaskQueue* queue = queues_[worker_id].get(); SpscTaskQueue::Task task; ParallelLauncher::ThreadLocal()->is_worker = true; - while (queue->Pop(&task)) { + // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on + // the global first use of the ThreadPool. + // TODO(tulloch): should we make this configurable via standard APIs? + static size_t spin_count = GetSpinCount(); + while (queue->Pop(&task, spin_count)) { CHECK(task.launcher != nullptr); TVMParallelGroupEnv* penv = &(task.launcher->env); void* cdata = task.launcher->cdata; diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc new file mode 100644 index 000000000000..508705c2630a --- /dev/null +++ b/tests/cpp/threading_backend_test.cc @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include +#include + +constexpr size_t N = 128; + +static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv, + void* cdata) -> int { + auto* data = reinterpret_cast*>(cdata); + const size_t N_per_task = (N + penv->num_task - 1) / penv->num_task; + for (size_t i = task_id * N_per_task; i < N && i < (task_id + 1) * N_per_task; ++i) { + data->fetch_add(i, std::memory_order_relaxed); + } + return 0; +}; + +TEST(ThreadingBackend, TVMBackendParallelLaunch) { + std::atomic acc(0); + TVMBackendParallelLaunch(atomic_add_task_id, &acc, 0); + EXPECT_EQ(acc.load(std::memory_order_relaxed), N * (N - 1) / 2); +} + +TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) { + // TODO(tulloch) use parameterised tests when available. + size_t num_jobs_per_thread = 3; + size_t max_num_threads = 2; + + for (size_t num_threads = 1; num_threads < max_num_threads; ++num_threads) { + std::vector> ts; + for (size_t i = 0; i < num_threads; ++i) { + ts.emplace_back(new std::thread([&]() { + for (size_t j = 0; j < num_jobs_per_thread; ++j) { + std::atomic acc(0); + TVMBackendParallelLaunch(atomic_add_task_id, &acc, 0); + EXPECT_EQ(acc.load(std::memory_order_relaxed), N * (N - 1) / 2); + } + })); + } + for (auto& t : ts) { + t->join(); + } + } +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + testing::FLAGS_gtest_death_test_style = "threadsafe"; + return RUN_ALL_TESTS(); +}