diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 06b4f143082f6..b7ace28bf47c1 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -226,7 +226,7 @@ class FakeMemoryOperatorFactory : public Operator::PlanNodeTranslator { uint32_t maxDrivers_{1}; }; -class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { +class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { protected: static void SetUpTestCase() { exec::test::HiveConnectorTestBase::SetUpTestCase(); @@ -252,7 +252,6 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { fuzzerOpts_.stringLength = 1024; fuzzerOpts_.allowLazyVector = false; vector_ = makeRowVector(rowType_, fuzzerOpts_); - executor_ = std::make_unique(32); numAddedPools_ = 0; } @@ -298,13 +297,55 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { static inline FakeMemoryOperatorFactory* fakeOperatorFactory_; std::unique_ptr memoryManager_; - SharedArbitrator* arbitrator_; + SharedArbitrator* arbitrator_{nullptr}; RowTypePtr rowType_; VectorFuzzer::Options fuzzerOpts_; RowVectorPtr vector_; std::atomic_uint64_t numAddedPools_{0}; }; +namespace { +std::unique_ptr newMultiThreadedExecutor() { + return std::make_unique(32); +} + +struct TestParam { + bool isSingleThreaded{false}; +}; +} // namespace + +/// A test fixture that runs cases within multi-threaded execution mode. +class SharedArbitrationTest : public SharedArbitrationTestBase { + protected: + void SetUp() override { + SharedArbitrationTestBase::SetUp(); + executor_ = newMultiThreadedExecutor(); + } +}; +/// A test fixture that runs cases within both single-threaded and +/// multi-threaded execution modes. +class SharedArbitrationTestWithThreadingModes + : public testing::WithParamInterface, + public SharedArbitrationTestBase { + public: + static std::vector getTestParams() { + return std::vector({{false}, {true}}); + } + + protected: + void SetUp() override { + SharedArbitrationTestBase::SetUp(); + isSingleThreaded_ = GetParam().isSingleThreaded; + if (isSingleThreaded_) { + executor_ = nullptr; + } else { + executor_ = newMultiThreadedExecutor(); + } + } + + bool isSingleThreaded_{false}; +}; + DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) { const std::vector vectors = createVectors(rowType_, 32, 32 << 20); @@ -430,7 +471,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { ASSERT_EQ(taskPausedCount, 1); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -495,6 +536,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(orderByQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .orderBy({"c0 ASC NULLS LAST"}, false) @@ -511,6 +553,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -531,7 +574,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { } } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToAggregation) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -596,6 +639,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(aggregationQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) @@ -613,6 +657,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -633,7 +678,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { } } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToJoinBuilder) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -699,6 +744,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(joinQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder(planNodeIdGenerator) .values(vectors) .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) @@ -726,6 +772,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) + .singleThreaded(isSingleThreaded_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -1307,6 +1354,12 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { ASSERT_EQ(arbitrator_->stats().numReleases, numRootPools); } } + +VELOX_INSTANTIATE_TEST_SUITE_P( + SharedArbitrationTestWithThreadingModes, + SharedArbitrationTestWithThreadingModes, + testing::ValuesIn( + SharedArbitrationTestWithThreadingModes::getTestParams())); } // namespace facebook::velox::memory int main(int argc, char** argv) { diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8b4da99c1da04..0de9ef5ec22d8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -356,13 +356,28 @@ void Driver::pushdownFilters(int operatorIndex) { } RowVectorPtr Driver::next(std::shared_ptr& blockingState) { - enqueueInternal(); auto self = shared_from_this(); facebook::velox::process::ScopedThreadDebugInfo scopedInfo( self->driverCtx()->threadDebugInfo); ScopedDriverThreadContext scopedDriverThreadContext(*self->driverCtx()); RowVectorPtr result; - auto stop = runInternal(self, blockingState, result); + + StopReason stop; + + // Spin wait when task is paused. + while (true) { + if (task()->pauseRequested()) { + continue; + } + enqueueInternal(); + stop = runInternal(self, blockingState, result); + if (stop == StopReason::kPause) { + VELOX_DCHECK_NULL(blockingState) + VELOX_DCHECK_NULL(result) + continue; + } + break; + } // We get kBlock if 'result' was produced; kAtEnd if pipeline has finished // processing and no more results will be produced; kAlreadyTerminated on diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 741b9f8649619..fdc50496af1dd 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -917,13 +917,17 @@ void Task::resume(std::shared_ptr self) { continue; } VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated()); - if (!driver->state().hasBlockingFuture) { + if (!driver->state().hasBlockingFuture && + driver->task()->queryCtx()->isExecutorSupplied()) { if (driver->state().endExecTimeMs != 0) { driver->state().totalPauseTimeMs += getCurrentTimeMs() - driver->state().endExecTimeMs; } // Do not continue a Driver that is blocked on external // event. The Driver gets enqueued by the promise realization. + // + // Do not continue the driver if no executor is supplied, + // This usually happens in single-threaded execution. Driver::enqueue(driver); } } diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 3e4b3467354fc..0575b8e4e5dff 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -63,7 +63,13 @@ AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) { } AssertQueryBuilder& AssertQueryBuilder::singleThreaded(bool singleThreaded) { - params_.singleThreaded = singleThreaded; + if (singleThreaded) { + params_.singleThreaded = true; + executor_ = nullptr; + return *this; + } + params_.singleThreaded = false; + executor_ = newExecutor(); return *this; } diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index be7bbd3871682..1358af608ede1 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -179,9 +179,13 @@ class AssertQueryBuilder { std::pair, std::vector> readCursor(); + static std::unique_ptr newExecutor() { + return std::make_unique( + std::thread::hardware_concurrency()); + } + // Used by the created task as the default driver executor. - std::unique_ptr executor_{ - new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())}; + std::unique_ptr executor_{newExecutor()}; DuckDbQueryRunner* const duckDbQueryRunner_; CursorParameters params_; std::unordered_map configs_;