diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 9eaf70010021a..7997d7fbb9e19 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -226,7 +226,7 @@ class FakeMemoryOperatorFactory : public Operator::PlanNodeTranslator { uint32_t maxDrivers_{1}; }; -class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { +class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { protected: static void SetUpTestCase() { exec::test::HiveConnectorTestBase::SetUpTestCase(); @@ -252,7 +252,6 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { fuzzerOpts_.stringLength = 1024; fuzzerOpts_.allowLazyVector = false; vector_ = makeRowVector(rowType_, fuzzerOpts_); - executor_ = std::make_unique(32); numAddedPools_ = 0; } @@ -298,13 +297,55 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { static inline FakeMemoryOperatorFactory* fakeOperatorFactory_; std::unique_ptr memoryManager_; - SharedArbitrator* arbitrator_; + SharedArbitrator* arbitrator_{nullptr}; RowTypePtr rowType_; VectorFuzzer::Options fuzzerOpts_; RowVectorPtr vector_; std::atomic_uint64_t numAddedPools_{0}; }; +namespace { +std::unique_ptr newMultiThreadedExecutor() { + return std::make_unique(32); +} + +struct TestParam { + bool isSingleThreaded{false}; +}; +} // namespace + +/// A test fixture that runs cases within multi-threaded execution mode. +class SharedArbitrationTest : public SharedArbitrationTestBase { + protected: + void SetUp() override { + SharedArbitrationTestBase::SetUp(); + executor_ = newMultiThreadedExecutor(); + } +}; +/// A test fixture that runs cases within both single-threaded and +/// multi-threaded execution modes. +class SharedArbitrationTestWithThreadingModes + : public testing::WithParamInterface, + public SharedArbitrationTestBase { + public: + static std::vector getTestParams() { + return std::vector({{false}, {true}}); + } + + protected: + void SetUp() override { + SharedArbitrationTestBase::SetUp(); + isSingleThreaded_ = GetParam().isSingleThreaded; + if (isSingleThreaded_) { + executor_ = nullptr; + } else { + executor_ = newMultiThreadedExecutor(); + } + } + + bool isSingleThreaded_{false}; +}; + DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) { const std::vector vectors = createVectors(rowType_, 32, 32 << 20); @@ -430,7 +471,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { ASSERT_EQ(taskPausedCount, 1); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -495,6 +536,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(orderByQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .orderBy({"c0 ASC NULLS LAST"}, false) @@ -511,6 +553,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -530,7 +573,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { } } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, + reclaimToAggregation) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -595,6 +640,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(aggregationQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) @@ -612,6 +658,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -631,7 +678,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { } } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, + reclaimToJoinBuilder) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -697,6 +746,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(joinQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder(planNodeIdGenerator) .values(vectors) .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) @@ -724,6 +774,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -1300,6 +1351,12 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { ASSERT_EQ(arbitrator_->stats().numShrinks, numRootPools); } } + +VELOX_INSTANTIATE_TEST_SUITE_P( + SharedArbitrationTestWithThreadingModes, + SharedArbitrationTestWithThreadingModes, + testing::ValuesIn( + SharedArbitrationTestWithThreadingModes::getTestParams())); } // namespace facebook::velox::memory int main(int argc, char** argv) { diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8b4da99c1da04..1e7e4f392ae25 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -355,14 +355,29 @@ void Driver::pushdownFilters(int operatorIndex) { op->clearDynamicFilters(); } -RowVectorPtr Driver::next(std::shared_ptr& blockingState) { +RowVectorPtr Driver::next(ContinueFuture* future) { enqueueInternal(); auto self = shared_from_this(); facebook::velox::process::ScopedThreadDebugInfo scopedInfo( self->driverCtx()->threadDebugInfo); ScopedDriverThreadContext scopedDriverThreadContext(*self->driverCtx()); + std::shared_ptr blockingState; RowVectorPtr result; - auto stop = runInternal(self, blockingState, result); + const auto stop = runInternal(self, blockingState, result); + + if (blockingState != nullptr) { + VELOX_DCHECK_NULL(result); + *future = blockingState->future(); + return nullptr; + } + + if (stop == StopReason::kPause) { + VELOX_DCHECK_NULL(result); + if (!task()->pauseRequested(future)) { + *future = ContinueFuture::makeEmpty(); + } + return nullptr; + } // We get kBlock if 'result' was produced; kAtEnd if pipeline has finished // processing and no more results will be produced; kAlreadyTerminated on diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 46fe88dbb99b8..7ceb696aab459 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -359,7 +359,7 @@ class Driver : public std::enable_shared_from_this { /// Run the pipeline until it produces a batch of data or gets blocked. /// Return the data produced or nullptr if pipeline finished processing and - /// will not produce more data. Return nullptr and set 'blockingState' if + /// will not produce more data. Return nullptr and set 'future' if /// pipeline got blocked. /// /// This API supports execution of a Task synchronously in the caller's @@ -367,7 +367,7 @@ class Driver : public std::enable_shared_from_this { /// When using 'enqueue', the last operator in the pipeline (sink) must not /// return any data from Operator::getOutput(). When using 'next', the last /// operator must produce data that will be returned to caller. - RowVectorPtr next(std::shared_ptr& blockingState); + RowVectorPtr next(ContinueFuture* future); /// Invoked to initialize the operators from this driver once on its first /// execution. diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index dfe0e57a6aada..004cbd09b5f17 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -648,14 +648,14 @@ RowVectorPtr Task::next(ContinueFuture* future) { ++runnableDrivers; - std::shared_ptr blockingState; - auto result = drivers_[i]->next(blockingState); + ContinueFuture driverFuture = ContinueFuture::makeEmpty(); + auto result = drivers_[i]->next(&driverFuture); if (result) { return result; } - if (blockingState) { - futures[i] = blockingState->future(); + if (driverFuture.valid()) { + futures[i] = std::move(driverFuture); } if (error()) { @@ -898,6 +898,12 @@ void Task::initializePartitionOutput() { void Task::resume(std::shared_ptr self) { std::vector> offThreadDrivers; { + std::vector resumePromises; + auto guard = folly::makeGuard([&]() { + for (auto& promise : resumePromises) { + promise.setValue(); + } + }); std::lock_guard l(self->mutex_); // Setting pause requested must be atomic with the resuming so that // suspended sections do not go back on thread during resume. @@ -917,13 +923,17 @@ void Task::resume(std::shared_ptr self) { continue; } VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated()); - if (!driver->state().hasBlockingFuture) { + if (!driver->state().hasBlockingFuture && + driver->task()->queryCtx()->isExecutorSupplied()) { if (driver->state().endExecTimeMs != 0) { driver->state().totalPauseTimeMs += getCurrentTimeMs() - driver->state().endExecTimeMs; } // Do not continue a Driver that is blocked on external // event. The Driver gets enqueued by the promise realization. + // + // Do not continue the driver if no executor is supplied, + // This usually happens in single-threaded execution. Driver::enqueue(driver); } } @@ -953,6 +963,7 @@ void Task::resume(std::shared_ptr self) { offThreadDrivers.push_back(std::move(driver)); } } + resumePromises.swap(self->resumePromises_); } // Get the stats and free the resources of Drivers that were not on thread. @@ -2736,6 +2747,25 @@ ContinueFuture Task::requestPause() { return makeFinishFutureLocked("Task::requestPause"); } +bool Task::pauseRequested(ContinueFuture* future) { + if (FOLLY_LIKELY(future == nullptr)) { + // Once 'pauseRequested_' is set, it will not be cleared until + // task::resume(). It is therefore OK to read it without a mutex + // from a thread that this flag concerns. + return pauseRequested_; + } + + std::lock_guard l(mutex_); + if (!pauseRequested_) { + VELOX_CHECK(resumePromises_.empty()) + *future = ContinueFuture::makeEmpty(); + return false; + } + resumePromises_.emplace_back("Task::isPaused"); + *future = resumePromises_.back().getSemiFuture(); + return true; +} + void Task::createExchangeClientLocked( int32_t pipelineId, const core::PlanNodeId& planNodeId) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 66c302a2ad421..dae427eb97c91 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -618,12 +618,10 @@ class Task : public std::enable_shared_from_this { /// 'this' at the time of requesting yield. Returns 0 if yield not requested. int32_t yieldIfDue(uint64_t startTimeMicros); - /// Once 'pauseRequested_' is set, it will not be cleared until - /// task::resume(). It is therefore OK to read it without a mutex - /// from a thread that this flag concerns. - bool pauseRequested() const { - return pauseRequested_; - } + /// Check if the task is requested to pause. If it is true and 'future' is not + /// null, a task resume future is returned which will be fulfilled once the + /// task is resumed. + bool pauseRequested(ContinueFuture* future = nullptr); std::timed_mutex& mutex() { return mutex_; @@ -1155,7 +1153,9 @@ class Task : public std::enable_shared_from_this { // terminate(). They are fulfilled when the last thread stops // running for 'this'. std::vector threadFinishPromises_; - + // Promises for the futures returned to callers of pauseRequested(). + // They are fulfilled when `resume()` is called for this task. + std::vector resumePromises_; // Base spill directory for this task. std::string spillDirectory_; diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index ca078e62df8f1..f0d53773682af 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1373,77 +1373,94 @@ DEBUG_ONLY_TEST_F(TaskTest, findPeerOperators) { } } -DEBUG_ONLY_TEST_F(TaskTest, raceBetweenTaskPauseAndTerminate) { - const std::vector values = {makeRowVector( - {"t_c0", "t_c1"}, - { - makeFlatVector({1, 2, 3, 4}), - makeFlatVector({10, 20, 30, 40}), - })}; +class TaskPauseTest : public TaskTest { + public: + void testPause() { + const std::vector values = {makeRowVector( + {"t_c0", "t_c1"}, + { + makeFlatVector({1, 2, 3, 4}), + makeFlatVector({10, 20, 30, 40}), + })}; - auto planNodeIdGenerator = std::make_shared(); - CursorParameters params; - params.planNode = - PlanBuilder(planNodeIdGenerator).values(values, true).planNode(); - params.queryCtx = core::QueryCtx::create(driverExecutor_.get()); - params.maxDrivers = 1; + auto planNodeIdGenerator = std::make_shared(); + CursorParameters params; + params.planNode = + PlanBuilder(planNodeIdGenerator).values(values, true).planNode(); + params.queryCtx = core::QueryCtx::create(driverExecutor_.get()); + params.maxDrivers = 1; - auto cursor = TaskCursor::create(params); - auto* task = cursor->task().get(); - folly::EventCount taskPauseWait; - std::atomic taskPaused{false}; + cursor_ = TaskCursor::create(params); + task_ = cursor_->task().get(); + folly::EventCount taskPauseWait; + std::atomic taskPaused{false}; - folly::EventCount taskPauseStartWait; - std::atomic taskPauseStarted{false}; + folly::EventCount taskPauseStartWait; + std::atomic taskPauseStarted{false}; - // Set up a testvalue to trigger task abort when hash build tries to reserve - // memory. - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function([&](Operator* testOp) { - if (taskPauseStarted.exchange(true)) { - return; - } - taskPauseStartWait.notifyAll(); - taskPauseWait.await([&]() { return taskPaused.load(); }); - })); + // Set up a testvalue to trigger task abort when hash build tries to reserve + // memory. + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function([&](Operator* testOp) { + if (taskPauseStarted.exchange(true)) { + return; + } + taskPauseStartWait.notifyAll(); + taskPauseWait.await([&]() { return taskPaused.load(); }); + })); - std::thread taskThread([&]() { - try { - while (cursor->moveNext()) { - }; - } catch (VeloxRuntimeError&) { - } - }); + taskThread_ = std::thread([&]() { + try { + while (cursor_->moveNext()) { + }; + } catch (VeloxRuntimeError&) { + } + }); + + taskPauseStartWait.await([&]() { return taskPauseStarted.load(); }); - taskPauseStartWait.await([&]() { return taskPauseStarted.load(); }); + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 0); + ASSERT_EQ(task_->numRunningDrivers(), 1); - ASSERT_EQ(task->numTotalDrivers(), 1); - ASSERT_EQ(task->numFinishedDrivers(), 0); - ASSERT_EQ(task->numRunningDrivers(), 1); + auto pauseFuture = task_->requestPause(); + taskPaused = true; + taskPauseWait.notifyAll(); + pauseFuture.wait(); - auto pauseFuture = task->requestPause(); - taskPaused = true; - taskPauseWait.notifyAll(); - pauseFuture.wait(); + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 0); + ASSERT_EQ(task_->numRunningDrivers(), 1); + ASSERT_TRUE(task_->pauseRequested()); + } + + void TearDown() override { + cursor_.reset(); + HiveConnectorTestBase::TearDown(); + } - ASSERT_EQ(task->numTotalDrivers(), 1); - ASSERT_EQ(task->numFinishedDrivers(), 0); - ASSERT_EQ(task->numRunningDrivers(), 1); + protected: + Task* task_{nullptr}; + std::unique_ptr cursor_{}; + std::thread taskThread_{}; +}; - task->requestAbort().wait(); +DEBUG_ONLY_TEST_F(TaskPauseTest, raceBetweenTaskPauseAndTerminate) { + testPause(); + task_->requestAbort().wait(); - ASSERT_EQ(task->numTotalDrivers(), 1); - ASSERT_EQ(task->numFinishedDrivers(), 0); - ASSERT_EQ(task->numRunningDrivers(), 0); + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 0); + ASSERT_EQ(task_->numRunningDrivers(), 0); - Task::resume(task->shared_from_this()); + Task::resume(task_->shared_from_this()); - ASSERT_EQ(task->numTotalDrivers(), 1); - ASSERT_EQ(task->numFinishedDrivers(), 1); - ASSERT_EQ(task->numRunningDrivers(), 0); + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 1); + ASSERT_EQ(task_->numRunningDrivers(), 0); - taskThread.join(); + taskThread_.join(); } DEBUG_ONLY_TEST_F(TaskTest, driverCounters) { @@ -1677,6 +1694,69 @@ TEST_F(TaskTest, spillDirNotCreated) { ASSERT_FALSE(fs->exists(tmpDirectoryPath)); } +DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFuture) { + // Test for trivial wait on Task::pauseRequested future. + testPause(); + folly::EventCount taskResumeAllowedWait; + std::atomic taskResumeAllowed{false}; + std::thread observeThread([&]() { + ContinueFuture future = ContinueFuture::makeEmpty(); + bool requested = task_->pauseRequested(&future); + ASSERT_TRUE(requested); + taskResumeAllowed = true; + taskResumeAllowedWait.notifyAll(); + future.wait(); + ASSERT_TRUE(!task_->pauseRequested()); + }); + + taskResumeAllowedWait.await([&]() { return taskResumeAllowed.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + Task::resume(task_->shared_from_this()); + + taskThread_.join(); + observeThread.join(); + + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 1); + ASSERT_EQ(task_->numRunningDrivers(), 0); +} + +DEBUG_ONLY_TEST_F(TaskPauseTest, resumeFutureNeverFulfilled) { + // Test for Task::pauseRequested future to throw when task is never resumed. + testPause(); + folly::EventCount taskAbortAllowedWait; + std::atomic taskAbortAllowed{false}; + std::thread observeThread([&]() { + ContinueFuture future = ContinueFuture::makeEmpty(); + bool requested = task_->pauseRequested(&future); + ASSERT_TRUE(requested); + taskAbortAllowed = true; + taskAbortAllowedWait.notifyAll(); + future.wait(); + ASSERT_TRUE(future.hasException()); + ASSERT_EQ( + std::string(future.result().exception().what()), + "folly::BrokenPromise: Broken promise for type name `folly::Unit`"); + }); + taskAbortAllowedWait.await([&]() { return taskAbortAllowed.load(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 0); + ASSERT_EQ(task_->numRunningDrivers(), 1); + std::vector drivers{}; + task_->testingVisitDrivers( + [&](Driver* driver) -> void { drivers.push_back(driver); }); + for (const auto& driver : drivers) { + Task::removeDriver(task_->shared_from_this(), driver); + } + ASSERT_EQ(task_->numTotalDrivers(), 1); + ASSERT_EQ(task_->numFinishedDrivers(), 1); + ASSERT_EQ(task_->numRunningDrivers(), 0); + cursor_.reset(); + taskThread_.join(); + observeThread.join(); +} + DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { auto probeVector = makeRowVector( {"t_c0"}, {makeFlatVector(10, [](auto row) { return row; })}); diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 3e4b3467354fc..0575b8e4e5dff 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -63,7 +63,13 @@ AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) { } AssertQueryBuilder& AssertQueryBuilder::singleThreaded(bool singleThreaded) { - params_.singleThreaded = singleThreaded; + if (singleThreaded) { + params_.singleThreaded = true; + executor_ = nullptr; + return *this; + } + params_.singleThreaded = false; + executor_ = newExecutor(); return *this; } diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index be7bbd3871682..1358af608ede1 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -179,9 +179,13 @@ class AssertQueryBuilder { std::pair, std::vector> readCursor(); + static std::unique_ptr newExecutor() { + return std::make_unique( + std::thread::hardware_concurrency()); + } + // Used by the created task as the default driver executor. - std::unique_ptr executor_{ - new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())}; + std::unique_ptr executor_{newExecutor()}; DuckDbQueryRunner* const duckDbQueryRunner_; CursorParameters params_; std::unordered_map configs_; diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 15da6cf261fdd..84e96aaef892d 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -368,7 +368,18 @@ class SingleThreadedTaskCursor : public TaskCursorBase { if (!task_->isRunning()) { return false; } - next_ = task_->next(); + while (true) { + ContinueFuture future = ContinueFuture::makeEmpty(); + RowVectorPtr next = task_->next(&future); + if (!future.valid()) { + // Task is not blocked. Update `next_` with whatever returned by task. + next_ = next; + break; + } + // Task is blocked for some reason. Wait and try again. + VELOX_CHECK_NULL(next) + future.wait(); + } return next_ != nullptr; };