Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jul 30, 2024
1 parent c5f6469 commit c8f398d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 31 deletions.
29 changes: 12 additions & 17 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,28 +355,23 @@ void Driver::pushdownFilters(int operatorIndex) {
op->clearDynamicFilters();
}

RowVectorPtr Driver::next(std::shared_ptr<BlockingState>& blockingState) {
RowVectorPtr Driver::next(ContinueFuture* future) {
enqueueInternal();
auto self = shared_from_this();
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(
self->driverCtx()->threadDebugInfo);
ScopedDriverThreadContext scopedDriverThreadContext(*self->driverCtx());
std::shared_ptr<BlockingState> blockingState;
RowVectorPtr result;

StopReason stop;

// Spin wait when task is paused.
while (true) {
if (task()->pauseRequested()) {
continue;
}
enqueueInternal();
stop = runInternal(self, blockingState, result);
if (stop == StopReason::kPause) {
VELOX_DCHECK_NULL(blockingState)
VELOX_DCHECK_NULL(result)
continue;
}
break;
auto stop = runInternal(self, blockingState, result);

if (stop == StopReason::kPause) {
VELOX_DCHECK_NULL(blockingState)
VELOX_DCHECK_NULL(result)
ContinueFuture resumeFuture;
task()->pauseRequested(&resumeFuture);
*future = std::move(resumeFuture);
return nullptr;
}

// We get kBlock if 'result' was produced; kAtEnd if pipeline has finished
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,15 @@ class Driver : public std::enable_shared_from_this<Driver> {

/// Run the pipeline until it produces a batch of data or gets blocked.
/// Return the data produced or nullptr if pipeline finished processing and
/// will not produce more data. Return nullptr and set 'blockingState' if
/// will not produce more data. Return nullptr and set 'future' if
/// pipeline got blocked.
///
/// This API supports execution of a Task synchronously in the caller's
/// thread. The caller must use either this API or 'enqueue', but not both.
/// When using 'enqueue', the last operator in the pipeline (sink) must not
/// return any data from Operator::getOutput(). When using 'next', the last
/// operator must produce data that will be returned to caller.
RowVectorPtr next(std::shared_ptr<BlockingState>& blockingState);
RowVectorPtr next(ContinueFuture* future);

/// Invoked to initialize the operators from this driver once on its first
/// execution.
Expand Down
33 changes: 29 additions & 4 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,14 +648,14 @@ RowVectorPtr Task::next(ContinueFuture* future) {

++runnableDrivers;

std::shared_ptr<BlockingState> blockingState;
auto result = drivers_[i]->next(blockingState);
ContinueFuture driverFuture;
auto result = drivers_[i]->next(&driverFuture);
if (result) {
return result;
}

if (blockingState) {
futures[i] = blockingState->future();
if (driverFuture.valid()) {
futures[i] = std::move(driverFuture);
}

if (error()) {
Expand Down Expand Up @@ -898,6 +898,12 @@ void Task::initializePartitionOutput() {
void Task::resume(std::shared_ptr<Task> self) {
std::vector<std::shared_ptr<Driver>> offThreadDrivers;
{
std::vector<ContinuePromise> resumePromises;
auto guard = folly::makeGuard([&]() {
for (auto& promise : resumePromises) {
promise.setValue();
}
});
std::lock_guard<std::timed_mutex> l(self->mutex_);
// Setting pause requested must be atomic with the resuming so that
// suspended sections do not go back on thread during resume.
Expand Down Expand Up @@ -957,6 +963,7 @@ void Task::resume(std::shared_ptr<Task> self) {
offThreadDrivers.push_back(std::move(driver));
}
}
resumePromises.swap(self->resumePromises_);
}

// Get the stats and free the resources of Drivers that were not on thread.
Expand Down Expand Up @@ -2740,6 +2747,24 @@ ContinueFuture Task::requestPause() {
return makeFinishFutureLocked("Task::requestPause");
}

bool Task::pauseRequested(ContinueFuture* future) {
if (FOLLY_LIKELY(future == nullptr)) {
// Once 'pauseRequested_' is set, it will not be cleared until
// task::resume(). It is therefore OK to read it without a mutex
// from a thread that this flag concerns.
return pauseRequested_;
}
std::lock_guard<std::timed_mutex> l(mutex_);
if (!pauseRequested_) {
VELOX_CHECK(resumePromises_.empty())
*future = ContinueFuture ::makeEmpty();
return false;
}
resumePromises_.emplace_back("Task::isPaused");
*future = resumePromises_.back().getSemiFuture();
return true;
}

void Task::createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId) {
Expand Down
13 changes: 6 additions & 7 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,9 @@ class Task : public std::enable_shared_from_this<Task> {
/// 'this' at the time of requesting yield. Returns 0 if yield not requested.
int32_t yieldIfDue(uint64_t startTimeMicros);

/// Once 'pauseRequested_' is set, it will not be cleared until
/// task::resume(). It is therefore OK to read it without a mutex
/// from a thread that this flag concerns.
bool pauseRequested() const {
return pauseRequested_;
}
/// Check if the task is requested to pause. If true, future will be set once
/// th task is resumed if it's not nullptr.
bool pauseRequested(ContinueFuture* future = nullptr);

std::timed_mutex& mutex() {
return mutex_;
Expand Down Expand Up @@ -1155,7 +1152,9 @@ class Task : public std::enable_shared_from_this<Task> {
// terminate(). They are fulfilled when the last thread stops
// running for 'this'.
std::vector<ContinuePromise> threadFinishPromises_;

// Promises for the futures returned to callers of pauseRequested().
// They are fulfilled when `resume` is called for this task.
std::vector<ContinuePromise> resumePromises_;
// Base spill directory for this task.
std::string spillDirectory_;

Expand Down
13 changes: 12 additions & 1 deletion velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,18 @@ class SingleThreadedTaskCursor : public TaskCursorBase {
if (!task_->isRunning()) {
return false;
}
next_ = task_->next();
while (true) {
ContinueFuture future = ContinueFuture::makeEmpty();
RowVectorPtr next = task_->next(&future);
if (!future.valid()) {
// Task is not blocked. Update `next_` with whatever returned by task.
next_ = next;
break;
}
// Task is blocked for some reason. Wait and try again.
VELOX_CHECK_NULL(next)
future.wait();
}
return next_ != nullptr;
};

Expand Down

0 comments on commit c8f398d

Please sign in to comment.