From 25e300fc093670f65715f2a683eae184900a82c8 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 13 Aug 2024 11:55:59 +0800 Subject: [PATCH] fixup --- velox/exec/Driver.cpp | 9 +- velox/exec/Task.cpp | 28 +++--- velox/exec/tests/TaskTest.cpp | 138 ++++++++++++++++-------------- velox/exec/tests/utils/Cursor.cpp | 11 ++- 4 files changed, 100 insertions(+), 86 deletions(-) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 1e7e4f392ae25..f511d247a1178 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -366,16 +366,15 @@ RowVectorPtr Driver::next(ContinueFuture* future) { const auto stop = runInternal(self, blockingState, result); if (blockingState != nullptr) { - VELOX_DCHECK_NULL(result); + VELOX_CHECK_NULL(result); *future = blockingState->future(); return nullptr; } if (stop == StopReason::kPause) { - VELOX_DCHECK_NULL(result); - if (!task()->pauseRequested(future)) { - *future = ContinueFuture::makeEmpty(); - } + VELOX_CHECK_NULL(result); + const auto paused = task()->pauseRequested(future); + VELOX_CHECK_EQ(paused, future->valid()); return nullptr; } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 004cbd09b5f17..01f72bfaaf5c4 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -896,14 +896,21 @@ void Task::initializePartitionOutput() { // static void Task::resume(std::shared_ptr self) { - std::vector> offThreadDrivers; { + std::vector> offThreadDrivers; std::vector resumePromises; - auto guard = folly::makeGuard([&]() { + SCOPE_EXIT { + // Get the stats and free the resources of Drivers that were not on + // thread. + for (auto& driver : offThreadDrivers) { + self->driversClosedByTask_.emplace_back(driver); + driver->closeByTask(); + } + // Fulfill resume futures. for (auto& promise : resumePromises) { promise.setValue(); } - }); + }; std::lock_guard l(self->mutex_); // Setting pause requested must be atomic with the resuming so that // suspended sections do not go back on thread during resume. @@ -965,12 +972,6 @@ void Task::resume(std::shared_ptr self) { } resumePromises.swap(self->resumePromises_); } - - // Get the stats and free the resources of Drivers that were not on thread. - for (auto& driver : offThreadDrivers) { - self->driversClosedByTask_.emplace_back(driver); - driver->closeByTask(); - } } void Task::validateGroupedExecutionLeafNodes() { @@ -2749,16 +2750,15 @@ ContinueFuture Task::requestPause() { bool Task::pauseRequested(ContinueFuture* future) { if (FOLLY_LIKELY(future == nullptr)) { - // Once 'pauseRequested_' is set, it will not be cleared until - // task::resume(). It is therefore OK to read it without a mutex - // from a thread that this flag concerns. + // It is ok to return 'pauseRequested_' without a mutex lock if user doesn't + // want to wait for task to resume. return pauseRequested_; } std::lock_guard l(mutex_); if (!pauseRequested_) { - VELOX_CHECK(resumePromises_.empty()) - *future = ContinueFuture::makeEmpty(); + VELOX_CHECK(resumePromises_.empty()); + VELOX_CHECK(!future->valid()); return false; } resumePromises_.emplace_back("Task::isPaused"); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index f0d53773682af..242774728fe40 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1463,6 +1463,81 @@ DEBUG_ONLY_TEST_F(TaskPauseTest, raceBetweenTaskPauseAndTerminate) { taskThread_.join(); } +DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFuture) { + // Test for trivial wait on Task::pauseRequested future. + testPause(); + folly::EventCount taskResumeAllowedWait; + std::atomic taskResumeAllowed{false}; + std::thread observeThread([&]() { + ContinueFuture future = ContinueFuture::makeEmpty(); + const bool requested = task_->pauseRequested(&future); + ASSERT_TRUE(requested); + taskResumeAllowed = true; + taskResumeAllowedWait.notifyAll(); + future.wait(); + ASSERT_FALSE(task_->pauseRequested()); + }); + + taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + Task::resume(task_->shared_from_this()); + + taskThread_.join(); + observeThread.join(); + + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 1); + ASSERT_EQ(task_->numRunningDrivers(), 0); +} + +DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFutureAfterTaskTerminated) { + testPause(); + folly::EventCount taskCancelAllowedWait; + std::atomic taskCancelAllowed{false}; + folly::EventCount taskCanceledWait; + std::atomic taskCanceled{false}; + folly::EventCount taskResumeAllowedWait; + std::atomic taskResumeAllowed{false}; + folly::EventCount taskResumedWait; + std::atomic taskResumed{false}; + std::thread observeThread([&]() { + ContinueFuture future = ContinueFuture::makeEmpty(); + const bool requested = task_->pauseRequested(&future); + ASSERT_TRUE(requested); + taskCancelAllowed = true; + taskCancelAllowedWait.notifyAll(); + taskCanceledWait.await([&]() { return taskCanceled.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + ASSERT_FALSE(future.isReady()); + taskResumeAllowed = true; + taskResumeAllowedWait.notifyAll(); + taskResumedWait.await([&]() { return taskResumed.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + ASSERT_TRUE(future.isReady()); + future.wait(); + ASSERT_FALSE(task_->pauseRequested()); + }); + taskCancelAllowedWait.await([&]() { return taskCancelAllowed.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 0); + ASSERT_EQ(task_->numRunningDrivers(), 1); + std::vector drivers{}; + task_->requestCancel().wait(); + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 0); + ASSERT_EQ(task_->numRunningDrivers(), 0); + taskCanceled = true; + taskCanceledWait.notifyAll(); + taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + Task::resume(task_->shared_from_this()); + taskResumed = true; + taskResumedWait.notifyAll(); + taskThread_.join(); + observeThread.join(); +} + DEBUG_ONLY_TEST_F(TaskTest, driverCounters) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), @@ -1694,69 +1769,6 @@ TEST_F(TaskTest, spillDirNotCreated) { ASSERT_FALSE(fs->exists(tmpDirectoryPath)); } -DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFuture) { - // Test for trivial wait on Task::pauseRequested future. - testPause(); - folly::EventCount taskResumeAllowedWait; - std::atomic taskResumeAllowed{false}; - std::thread observeThread([&]() { - ContinueFuture future = ContinueFuture::makeEmpty(); - bool requested = task_->pauseRequested(&future); - ASSERT_TRUE(requested); - taskResumeAllowed = true; - taskResumeAllowedWait.notifyAll(); - future.wait(); - ASSERT_TRUE(!task_->pauseRequested()); - }); - - taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); }); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT - Task::resume(task_->shared_from_this()); - - taskThread_.join(); - observeThread.join(); - - ASSERT_EQ(task_->numTotalDrivers(), 1); - ASSERT_EQ(task_->numFinishedDrivers(), 1); - ASSERT_EQ(task_->numRunningDrivers(), 0); -} - -DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFutureNeverFulfilled) { - // Test for Task::pauseRequested future to throw when task is never resumed. - testPause(); - folly::EventCount taskAbortAllowedWait; - std::atomic taskAbortAllowed{false}; - std::thread observeThread([&]() { - ContinueFuture future = ContinueFuture::makeEmpty(); - bool requested = task_->pauseRequested(&future); - ASSERT_TRUE(requested); - taskAbortAllowed = true; - taskAbortAllowedWait.notifyAll(); - future.wait(); - ASSERT_TRUE(future.hasException()); - ASSERT_EQ( - std::string(future.result().exception().what()), - "folly::BrokenPromise: Broken promise for type name `folly::Unit`"); - }); - taskAbortAllowedWait.await([&]() { return taskAbortAllowed.load(); }); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT - ASSERT_EQ(task_->numTotalDrivers(), 1); - ASSERT_EQ(task_->numFinishedDrivers(), 0); - ASSERT_EQ(task_->numRunningDrivers(), 1); - std::vector drivers{}; - task_->testingVisitDrivers( - [&](Driver* driver) -> void { drivers.push_back(driver); }); - for (const auto& driver : drivers) { - Task::removeDriver(task_->shared_from_this(), driver); - } - ASSERT_EQ(task_->numTotalDrivers(), 1); - ASSERT_EQ(task_->numFinishedDrivers(), 1); - ASSERT_EQ(task_->numRunningDrivers(), 0); - cursor_.reset(); - taskThread_.join(); - observeThread.join(); -} - DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { auto probeVector = makeRowVector( {"t_c0"}, {makeFlatVector(10, [](auto row) { return row; })}); diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 84e96aaef892d..e303b0ae88a9e 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -371,16 +371,19 @@ class SingleThreadedTaskCursor : public TaskCursorBase { while (true) { ContinueFuture future = ContinueFuture::makeEmpty(); RowVectorPtr next = task_->next(&future); - if (!future.valid()) { - // Task is not blocked. Update `next_` with whatever returned by task. + if (next != nullptr) { next_ = next; - break; + return true; + } + // When next is returned from task as a null pointer. + if (!future.valid()) { + VELOX_CHECK(!task_->isRunning()); + return false; } // Task is blocked for some reason. Wait and try again. VELOX_CHECK_NULL(next) future.wait(); } - return next_ != nullptr; }; RowVectorPtr& current() override {