Skip to content

Commit

Permalink
[Runtime] [ThreadPool] Make SpscTaskQueue::Pop(..) spin_count configu…
Browse files Browse the repository at this point in the history
…rable (#3577)

In cases where we have multiple models or threadpools active, spinning around
`sched_yield()` may not be desirable, as it prevents the OS from effectively
scheduling other threads.

Thus, allow users to conditionally disable this behaviour (via an environment
variable `TVM_THREAD_POOL_SPIN_COUNT`, similar to existing environment flags for
the thread pool such as `TVM_BIND_THREADS`, etc).

This substantially improves tail latencies in some of our multi-tenant
workloads in practice.

Unit tests have been added - on my laptop, running:

```
TVM_THREAD_POOL_SPIN_COUNT=0 ./build/threading_backend_test;
TVM_THREAD_POOL_SPIN_COUNT=1 ./build/threading_backend_test;
./build/threading_backend_test;
```

gives https://gist.github.com/ajtulloch/1805ca6cbaa27f5d442d23f9d0021ce6 (i.e.
97ms -> <1ms after this change)
  • Loading branch information
ajtulloch authored and tqchen committed Jul 23, 2019
1 parent 19eb829 commit 9b1c2e0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/runtime/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -44,6 +44,19 @@ const constexpr int kL1CacheBytes = 64;

namespace tvm {
namespace runtime {
namespace {

constexpr uint32_t kDefaultSpinCount = 300000;

uint32_t GetSpinCount() {
const char* val = getenv("TVM_THREAD_POOL_SPIN_COUNT");
if (!val) {
return kDefaultSpinCount;
}
return atoi(val);
}

} // namespace

// stride in the page, fit to cache line.
constexpr int kSyncStride = 64 / sizeof(std::atomic<int>);
Expand Down Expand Up @@ -176,7 +189,7 @@ class SpscTaskQueue {
* \param spin_count The number of iterations to spin before sleep.
* \return Whether pop is successful (true) or we need to exit now (false).
*/
bool Pop(Task* output, uint32_t spin_count = 300000) {
bool Pop(Task* output, uint32_t spin_count) {
// Busy wait a bit when the queue is empty.
// If a new task comes to the queue quickly, this wait avoid the worker from sleeping.
// The default spin count is set by following the typical omp convention
Expand Down Expand Up @@ -335,7 +348,11 @@ class ThreadPool {
SpscTaskQueue* queue = queues_[worker_id].get();
SpscTaskQueue::Task task;
ParallelLauncher::ThreadLocal()->is_worker = true;
while (queue->Pop(&task)) {
// Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on
// the global first use of the ThreadPool.
// TODO(tulloch): should we make this configurable via standard APIs?
static size_t spin_count = GetSpinCount();
while (queue->Pop(&task, spin_count)) {
CHECK(task.launcher != nullptr);
TVMParallelGroupEnv* penv = &(task.launcher->env);
void* cdata = task.launcher->cdata;
Expand Down
71 changes: 71 additions & 0 deletions tests/cpp/threading_backend_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <atomic>
#include <memory>
#include <thread>

#include <gtest/gtest.h>
#include <tvm/runtime/c_backend_api.h>

constexpr size_t N = 128;

static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv,
void* cdata) -> int {
auto* data = reinterpret_cast<std::atomic<size_t>*>(cdata);
const size_t N_per_task = (N + penv->num_task - 1) / penv->num_task;
for (size_t i = task_id * N_per_task; i < N && i < (task_id + 1) * N_per_task; ++i) {
data->fetch_add(i, std::memory_order_relaxed);
}
return 0;
};

TEST(ThreadingBackend, TVMBackendParallelLaunch) {
std::atomic<size_t> acc(0);
TVMBackendParallelLaunch(atomic_add_task_id, &acc, 0);
EXPECT_EQ(acc.load(std::memory_order_relaxed), N * (N - 1) / 2);
}

TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) {
// TODO(tulloch) use parameterised tests when available.
size_t num_jobs_per_thread = 3;
size_t max_num_threads = 2;

for (size_t num_threads = 1; num_threads < max_num_threads; ++num_threads) {
std::vector<std::unique_ptr<std::thread>> ts;
for (size_t i = 0; i < num_threads; ++i) {
ts.emplace_back(new std::thread([&]() {
for (size_t j = 0; j < num_jobs_per_thread; ++j) {
std::atomic<size_t> acc(0);
TVMBackendParallelLaunch(atomic_add_task_id, &acc, 0);
EXPECT_EQ(acc.load(std::memory_order_relaxed), N * (N - 1) / 2);
}
}));
}
for (auto& t : ts) {
t->join();
}
}
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}

0 comments on commit 9b1c2e0

Please sign in to comment.