Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
fixup

fixup

fixup

fixup

debug

Fix AssertQueryBuilder::singleThreaded(true)
  • Loading branch information
zhztheplayer committed Jul 30, 2024
1 parent 2fb811a commit c5f6469
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 12 deletions.
65 changes: 59 additions & 6 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class FakeMemoryOperatorFactory : public Operator::PlanNodeTranslator {
uint32_t maxDrivers_{1};
};

class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
exec::test::HiveConnectorTestBase::SetUpTestCase();
Expand All @@ -252,7 +252,6 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
fuzzerOpts_.stringLength = 1024;
fuzzerOpts_.allowLazyVector = false;
vector_ = makeRowVector(rowType_, fuzzerOpts_);
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(32);
numAddedPools_ = 0;
}

Expand Down Expand Up @@ -298,13 +297,55 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {

static inline FakeMemoryOperatorFactory* fakeOperatorFactory_;
std::unique_ptr<memory::MemoryManager> memoryManager_;
SharedArbitrator* arbitrator_;
SharedArbitrator* arbitrator_{nullptr};
RowTypePtr rowType_;
VectorFuzzer::Options fuzzerOpts_;
RowVectorPtr vector_;
std::atomic_uint64_t numAddedPools_{0};
};

namespace {
std::unique_ptr<folly::Executor> newMultiThreadedExecutor() {
return std::make_unique<folly::CPUThreadPoolExecutor>(32);
}

struct TestParam {
bool isSingleThreaded{false};
};
} // namespace

/// A test fixture that runs cases within multi-threaded execution mode.
class SharedArbitrationTest : public SharedArbitrationTestBase {
protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
executor_ = newMultiThreadedExecutor();
}
};
/// A test fixture that runs cases within both single-threaded and
/// multi-threaded execution modes.
class SharedArbitrationTestWithThreadingModes
: public testing::WithParamInterface<TestParam>,
public SharedArbitrationTestBase {
public:
static std::vector<TestParam> getTestParams() {
return std::vector<TestParam>({{false}, {true}});
}

protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
isSingleThreaded_ = GetParam().isSingleThreaded;
if (isSingleThreaded_) {
executor_ = nullptr;
} else {
executor_ = newMultiThreadedExecutor();
}
}

bool isSingleThreaded_{false};
};

DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) {
const std::vector<RowVectorPtr> vectors =
createVectors(rowType_, 32, 32 << 20);
Expand Down Expand Up @@ -430,7 +471,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) {
ASSERT_EQ(taskPausedCount, 1);
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -495,6 +536,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(orderByQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.orderBy({"c0 ASC NULLS LAST"}, false)
Expand All @@ -511,6 +553,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -531,7 +574,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
}
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToAggregation) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -596,6 +639,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(aggregationQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
Expand All @@ -613,6 +657,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -633,7 +678,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
}
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToJoinBuilder) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -699,6 +744,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(joinQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder(planNodeIdGenerator)
.values(vectors)
.project({"c0 AS t0", "c1 AS t1", "c2 AS t2"})
Expand Down Expand Up @@ -726,6 +772,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand Down Expand Up @@ -1307,6 +1354,12 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) {
ASSERT_EQ(arbitrator_->stats().numReleases, numRootPools);
}
}

VELOX_INSTANTIATE_TEST_SUITE_P(
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithThreadingModes,
testing::ValuesIn(
SharedArbitrationTestWithThreadingModes::getTestParams()));
} // namespace facebook::velox::memory

int main(int argc, char** argv) {
Expand Down
19 changes: 17 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,28 @@ void Driver::pushdownFilters(int operatorIndex) {
}

RowVectorPtr Driver::next(std::shared_ptr<BlockingState>& blockingState) {
enqueueInternal();
auto self = shared_from_this();
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(
self->driverCtx()->threadDebugInfo);
ScopedDriverThreadContext scopedDriverThreadContext(*self->driverCtx());
RowVectorPtr result;
auto stop = runInternal(self, blockingState, result);

StopReason stop;

// Spin wait when task is paused.
while (true) {
if (task()->pauseRequested()) {
continue;
}
enqueueInternal();
stop = runInternal(self, blockingState, result);
if (stop == StopReason::kPause) {
VELOX_DCHECK_NULL(blockingState)
VELOX_DCHECK_NULL(result)
continue;
}
break;
}

// We get kBlock if 'result' was produced; kAtEnd if pipeline has finished
// processing and no more results will be produced; kAlreadyTerminated on
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -917,13 +917,17 @@ void Task::resume(std::shared_ptr<Task> self) {
continue;
}
VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated());
if (!driver->state().hasBlockingFuture) {
if (!driver->state().hasBlockingFuture &&
driver->task()->queryCtx()->isExecutorSupplied()) {
if (driver->state().endExecTimeMs != 0) {
driver->state().totalPauseTimeMs +=
getCurrentTimeMs() - driver->state().endExecTimeMs;
}
// Do not continue a Driver that is blocked on external
// event. The Driver gets enqueued by the promise realization.
//
// Do not continue the driver if no executor is supplied,
// This usually happens in single-threaded execution.
Driver::enqueue(driver);
}
}
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/tests/utils/AssertQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) {
}

AssertQueryBuilder& AssertQueryBuilder::singleThreaded(bool singleThreaded) {
params_.singleThreaded = singleThreaded;
if (singleThreaded) {
params_.singleThreaded = true;
executor_ = nullptr;
return *this;
}
params_.singleThreaded = false;
executor_ = newExecutor();
return *this;
}

Expand Down
8 changes: 6 additions & 2 deletions velox/exec/tests/utils/AssertQueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,13 @@ class AssertQueryBuilder {
std::pair<std::unique_ptr<TaskCursor>, std::vector<RowVectorPtr>>
readCursor();

static std::unique_ptr<folly::Executor> newExecutor() {
return std::make_unique<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
}

// Used by the created task as the default driver executor.
std::unique_ptr<folly::Executor> executor_{
new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())};
std::unique_ptr<folly::Executor> executor_{newExecutor()};
DuckDbQueryRunner* const duckDbQueryRunner_;
CursorParameters params_;
std::unordered_map<std::string, std::string> configs_;
Expand Down

0 comments on commit c5f6469

Please sign in to comment.