Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 13, 2024
1 parent e0c444d commit 25e300f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 86 deletions.
9 changes: 4 additions & 5 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,15 @@ RowVectorPtr Driver::next(ContinueFuture* future) {
const auto stop = runInternal(self, blockingState, result);

if (blockingState != nullptr) {
VELOX_DCHECK_NULL(result);
VELOX_CHECK_NULL(result);
*future = blockingState->future();
return nullptr;
}

if (stop == StopReason::kPause) {
VELOX_DCHECK_NULL(result);
if (!task()->pauseRequested(future)) {
*future = ContinueFuture::makeEmpty();
}
VELOX_CHECK_NULL(result);
const auto paused = task()->pauseRequested(future);
VELOX_CHECK_EQ(paused, future->valid());
return nullptr;
}

Expand Down
28 changes: 14 additions & 14 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,14 +896,21 @@ void Task::initializePartitionOutput() {

// static
void Task::resume(std::shared_ptr<Task> self) {
std::vector<std::shared_ptr<Driver>> offThreadDrivers;
{
std::vector<std::shared_ptr<Driver>> offThreadDrivers;
std::vector<ContinuePromise> resumePromises;
auto guard = folly::makeGuard([&]() {
SCOPE_EXIT {
// Get the stats and free the resources of Drivers that were not on
// thread.
for (auto& driver : offThreadDrivers) {
self->driversClosedByTask_.emplace_back(driver);
driver->closeByTask();
}
// Fulfill resume futures.
for (auto& promise : resumePromises) {
promise.setValue();
}
});
};
std::lock_guard<std::timed_mutex> l(self->mutex_);
// Setting pause requested must be atomic with the resuming so that
// suspended sections do not go back on thread during resume.
Expand Down Expand Up @@ -965,12 +972,6 @@ void Task::resume(std::shared_ptr<Task> self) {
}
resumePromises.swap(self->resumePromises_);
}

// Get the stats and free the resources of Drivers that were not on thread.
for (auto& driver : offThreadDrivers) {
self->driversClosedByTask_.emplace_back(driver);
driver->closeByTask();
}
}

void Task::validateGroupedExecutionLeafNodes() {
Expand Down Expand Up @@ -2749,16 +2750,15 @@ ContinueFuture Task::requestPause() {

bool Task::pauseRequested(ContinueFuture* future) {
if (FOLLY_LIKELY(future == nullptr)) {
// Once 'pauseRequested_' is set, it will not be cleared until
// task::resume(). It is therefore OK to read it without a mutex
// from a thread that this flag concerns.
// It is ok to return 'pauseRequested_' without a mutex lock if user doesn't
// want to wait for task to resume.
return pauseRequested_;
}

std::lock_guard<std::timed_mutex> l(mutex_);
if (!pauseRequested_) {
VELOX_CHECK(resumePromises_.empty())
*future = ContinueFuture::makeEmpty();
VELOX_CHECK(resumePromises_.empty());
VELOX_CHECK(!future->valid());
return false;
}
resumePromises_.emplace_back("Task::isPaused");
Expand Down
138 changes: 75 additions & 63 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,81 @@ DEBUG_ONLY_TEST_F(TaskPauseTest, raceBetweenTaskPauseAndTerminate) {
taskThread_.join();
}

DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFuture) {
// Test for trivial wait on Task::pauseRequested future.
testPause();
folly::EventCount taskResumeAllowedWait;
std::atomic<bool> taskResumeAllowed{false};
std::thread observeThread([&]() {
ContinueFuture future = ContinueFuture::makeEmpty();
const bool requested = task_->pauseRequested(&future);
ASSERT_TRUE(requested);
taskResumeAllowed = true;
taskResumeAllowedWait.notifyAll();
future.wait();
ASSERT_FALSE(task_->pauseRequested());
});

taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
Task::resume(task_->shared_from_this());

taskThread_.join();
observeThread.join();

ASSERT_EQ(task_->numTotalDrivers(), 1);
ASSERT_EQ(task_->numFinishedDrivers(), 1);
ASSERT_EQ(task_->numRunningDrivers(), 0);
}

DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFutureAfterTaskTerminated) {
testPause();
folly::EventCount taskCancelAllowedWait;
std::atomic<bool> taskCancelAllowed{false};
folly::EventCount taskCanceledWait;
std::atomic<bool> taskCanceled{false};
folly::EventCount taskResumeAllowedWait;
std::atomic<bool> taskResumeAllowed{false};
folly::EventCount taskResumedWait;
std::atomic<bool> taskResumed{false};
std::thread observeThread([&]() {
ContinueFuture future = ContinueFuture::makeEmpty();
const bool requested = task_->pauseRequested(&future);
ASSERT_TRUE(requested);
taskCancelAllowed = true;
taskCancelAllowedWait.notifyAll();
taskCanceledWait.await([&]() { return taskCanceled.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
ASSERT_FALSE(future.isReady());
taskResumeAllowed = true;
taskResumeAllowedWait.notifyAll();
taskResumedWait.await([&]() { return taskResumed.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
ASSERT_TRUE(future.isReady());
future.wait();
ASSERT_FALSE(task_->pauseRequested());
});
taskCancelAllowedWait.await([&]() { return taskCancelAllowed.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
ASSERT_EQ(task_->numTotalDrivers(), 1);
ASSERT_EQ(task_->numFinishedDrivers(), 0);
ASSERT_EQ(task_->numRunningDrivers(), 1);
std::vector<Driver*> drivers{};
task_->requestCancel().wait();
ASSERT_EQ(task_->numTotalDrivers(), 1);
ASSERT_EQ(task_->numFinishedDrivers(), 0);
ASSERT_EQ(task_->numRunningDrivers(), 0);
taskCanceled = true;
taskCanceledWait.notifyAll();
taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
Task::resume(task_->shared_from_this());
taskResumed = true;
taskResumedWait.notifyAll();
taskThread_.join();
observeThread.join();
}

DEBUG_ONLY_TEST_F(TaskTest, driverCounters) {
auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
Expand Down Expand Up @@ -1694,69 +1769,6 @@ TEST_F(TaskTest, spillDirNotCreated) {
ASSERT_FALSE(fs->exists(tmpDirectoryPath));
}

DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFuture) {
// Test for trivial wait on Task::pauseRequested future.
testPause();
folly::EventCount taskResumeAllowedWait;
std::atomic<bool> taskResumeAllowed{false};
std::thread observeThread([&]() {
ContinueFuture future = ContinueFuture::makeEmpty();
bool requested = task_->pauseRequested(&future);
ASSERT_TRUE(requested);
taskResumeAllowed = true;
taskResumeAllowedWait.notifyAll();
future.wait();
ASSERT_TRUE(!task_->pauseRequested());
});

taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
Task::resume(task_->shared_from_this());

taskThread_.join();
observeThread.join();

ASSERT_EQ(task_->numTotalDrivers(), 1);
ASSERT_EQ(task_->numFinishedDrivers(), 1);
ASSERT_EQ(task_->numRunningDrivers(), 0);
}

DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFutureNeverFulfilled) {
// Test for Task::pauseRequested future to throw when task is never resumed.
testPause();
folly::EventCount taskAbortAllowedWait;
std::atomic<bool> taskAbortAllowed{false};
std::thread observeThread([&]() {
ContinueFuture future = ContinueFuture::makeEmpty();
bool requested = task_->pauseRequested(&future);
ASSERT_TRUE(requested);
taskAbortAllowed = true;
taskAbortAllowedWait.notifyAll();
future.wait();
ASSERT_TRUE(future.hasException());
ASSERT_EQ(
std::string(future.result().exception().what()),
"folly::BrokenPromise: Broken promise for type name `folly::Unit`");
});
taskAbortAllowedWait.await([&]() { return taskAbortAllowed.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
ASSERT_EQ(task_->numTotalDrivers(), 1);
ASSERT_EQ(task_->numFinishedDrivers(), 0);
ASSERT_EQ(task_->numRunningDrivers(), 1);
std::vector<Driver*> drivers{};
task_->testingVisitDrivers(
[&](Driver* driver) -> void { drivers.push_back(driver); });
for (const auto& driver : drivers) {
Task::removeDriver(task_->shared_from_this(), driver);
}
ASSERT_EQ(task_->numTotalDrivers(), 1);
ASSERT_EQ(task_->numFinishedDrivers(), 1);
ASSERT_EQ(task_->numRunningDrivers(), 0);
cursor_.reset();
taskThread_.join();
observeThread.join();
}

DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) {
auto probeVector = makeRowVector(
{"t_c0"}, {makeFlatVector<int32_t>(10, [](auto row) { return row; })});
Expand Down
11 changes: 7 additions & 4 deletions velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,19 @@ class SingleThreadedTaskCursor : public TaskCursorBase {
while (true) {
ContinueFuture future = ContinueFuture::makeEmpty();
RowVectorPtr next = task_->next(&future);
if (!future.valid()) {
// Task is not blocked. Update `next_` with whatever returned by task.
if (next != nullptr) {
next_ = next;
break;
return true;
}
// When next is returned from task as a null pointer.
if (!future.valid()) {
VELOX_CHECK(!task_->isRunning());
return false;
}
// Task is blocked for some reason. Wait and try again.
VELOX_CHECK_NULL(next)
future.wait();
}
return next_ != nullptr;
};

RowVectorPtr& current() override {
Expand Down

0 comments on commit 25e300f

Please sign in to comment.