Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

debug

Fix AssertQueryBuilder::singleThreaded(true)
  • Loading branch information
zhztheplayer committed Aug 13, 2024
1 parent f340734 commit 7959f9c
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 83 deletions.
69 changes: 63 additions & 6 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class FakeMemoryOperatorFactory : public Operator::PlanNodeTranslator {
uint32_t maxDrivers_{1};
};

class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
exec::test::HiveConnectorTestBase::SetUpTestCase();
Expand All @@ -252,7 +252,6 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
fuzzerOpts_.stringLength = 1024;
fuzzerOpts_.allowLazyVector = false;
vector_ = makeRowVector(rowType_, fuzzerOpts_);
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(32);
numAddedPools_ = 0;
}

Expand Down Expand Up @@ -298,13 +297,55 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {

static inline FakeMemoryOperatorFactory* fakeOperatorFactory_;
std::unique_ptr<memory::MemoryManager> memoryManager_;
SharedArbitrator* arbitrator_;
SharedArbitrator* arbitrator_{nullptr};
RowTypePtr rowType_;
VectorFuzzer::Options fuzzerOpts_;
RowVectorPtr vector_;
std::atomic_uint64_t numAddedPools_{0};
};

namespace {
std::unique_ptr<folly::Executor> newMultiThreadedExecutor() {
return std::make_unique<folly::CPUThreadPoolExecutor>(32);
}

struct TestParam {
bool isSingleThreaded{false};
};
} // namespace

/// A test fixture that runs cases within multi-threaded execution mode.
class SharedArbitrationTest : public SharedArbitrationTestBase {
protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
executor_ = newMultiThreadedExecutor();
}
};
/// A test fixture that runs cases within both single-threaded and
/// multi-threaded execution modes.
class SharedArbitrationTestWithThreadingModes
: public testing::WithParamInterface<TestParam>,
public SharedArbitrationTestBase {
public:
static std::vector<TestParam> getTestParams() {
return std::vector<TestParam>({{false}, {true}});
}

protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
isSingleThreaded_ = GetParam().isSingleThreaded;
if (isSingleThreaded_) {
executor_ = nullptr;
} else {
executor_ = newMultiThreadedExecutor();
}
}

bool isSingleThreaded_{false};
};

DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) {
const std::vector<RowVectorPtr> vectors =
createVectors(rowType_, 32, 32 << 20);
Expand Down Expand Up @@ -430,7 +471,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) {
ASSERT_EQ(taskPausedCount, 1);
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -495,6 +536,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(orderByQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.orderBy({"c0 ASC NULLS LAST"}, false)
Expand All @@ -511,6 +553,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -530,7 +573,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
}
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
DEBUG_ONLY_TEST_P(
SharedArbitrationTestWithThreadingModes,
reclaimToAggregation) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -595,6 +640,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(aggregationQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
Expand All @@ -612,6 +658,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -631,7 +678,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
}
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
DEBUG_ONLY_TEST_P(
SharedArbitrationTestWithThreadingModes,
reclaimToJoinBuilder) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -697,6 +746,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(joinQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder(planNodeIdGenerator)
.values(vectors)
.project({"c0 AS t0", "c1 AS t1", "c2 AS t2"})
Expand Down Expand Up @@ -724,6 +774,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand Down Expand Up @@ -1300,6 +1351,12 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) {
ASSERT_EQ(arbitrator_->stats().numShrinks, numRootPools);
}
}

VELOX_INSTANTIATE_TEST_SUITE_P(
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithThreadingModes,
testing::ValuesIn(
SharedArbitrationTestWithThreadingModes::getTestParams()));
} // namespace facebook::velox::memory

int main(int argc, char** argv) {
Expand Down
19 changes: 17 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,29 @@ void Driver::pushdownFilters(int operatorIndex) {
op->clearDynamicFilters();
}

RowVectorPtr Driver::next(std::shared_ptr<BlockingState>& blockingState) {
RowVectorPtr Driver::next(ContinueFuture* future) {
enqueueInternal();
auto self = shared_from_this();
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(
self->driverCtx()->threadDebugInfo);
ScopedDriverThreadContext scopedDriverThreadContext(*self->driverCtx());
std::shared_ptr<BlockingState> blockingState;
RowVectorPtr result;
auto stop = runInternal(self, blockingState, result);
const auto stop = runInternal(self, blockingState, result);

if (blockingState != nullptr) {
VELOX_DCHECK_NULL(result);
*future = blockingState->future();
return nullptr;
}

if (stop == StopReason::kPause) {
VELOX_DCHECK_NULL(result);
if (!task()->pauseRequested(future)) {
*future = ContinueFuture::makeEmpty();
}
return nullptr;
}

// We get kBlock if 'result' was produced; kAtEnd if pipeline has finished
// processing and no more results will be produced; kAlreadyTerminated on
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,15 @@ class Driver : public std::enable_shared_from_this<Driver> {

/// Run the pipeline until it produces a batch of data or gets blocked.
/// Return the data produced or nullptr if pipeline finished processing and
/// will not produce more data. Return nullptr and set 'blockingState' if
/// will not produce more data. Return nullptr and set 'future' if
/// pipeline got blocked.
///
/// This API supports execution of a Task synchronously in the caller's
/// thread. The caller must use either this API or 'enqueue', but not both.
/// When using 'enqueue', the last operator in the pipeline (sink) must not
/// return any data from Operator::getOutput(). When using 'next', the last
/// operator must produce data that will be returned to caller.
RowVectorPtr next(std::shared_ptr<BlockingState>& blockingState);
RowVectorPtr next(ContinueFuture* future);

/// Invoked to initialize the operators from this driver once on its first
/// execution.
Expand Down
40 changes: 35 additions & 5 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,14 +648,14 @@ RowVectorPtr Task::next(ContinueFuture* future) {

++runnableDrivers;

std::shared_ptr<BlockingState> blockingState;
auto result = drivers_[i]->next(blockingState);
ContinueFuture driverFuture = ContinueFuture::makeEmpty();
auto result = drivers_[i]->next(&driverFuture);
if (result) {
return result;
}

if (blockingState) {
futures[i] = blockingState->future();
if (driverFuture.valid()) {
futures[i] = std::move(driverFuture);
}

if (error()) {
Expand Down Expand Up @@ -898,6 +898,12 @@ void Task::initializePartitionOutput() {
void Task::resume(std::shared_ptr<Task> self) {
std::vector<std::shared_ptr<Driver>> offThreadDrivers;
{
std::vector<ContinuePromise> resumePromises;
auto guard = folly::makeGuard([&]() {
for (auto& promise : resumePromises) {
promise.setValue();
}
});
std::lock_guard<std::timed_mutex> l(self->mutex_);
// Setting pause requested must be atomic with the resuming so that
// suspended sections do not go back on thread during resume.
Expand All @@ -917,13 +923,17 @@ void Task::resume(std::shared_ptr<Task> self) {
continue;
}
VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated());
if (!driver->state().hasBlockingFuture) {
if (!driver->state().hasBlockingFuture &&
driver->task()->queryCtx()->isExecutorSupplied()) {
if (driver->state().endExecTimeMs != 0) {
driver->state().totalPauseTimeMs +=
getCurrentTimeMs() - driver->state().endExecTimeMs;
}
// Do not continue a Driver that is blocked on external
// event. The Driver gets enqueued by the promise realization.
//
// Do not continue the driver if no executor is supplied,
// This usually happens in single-threaded execution.
Driver::enqueue(driver);
}
}
Expand Down Expand Up @@ -953,6 +963,7 @@ void Task::resume(std::shared_ptr<Task> self) {
offThreadDrivers.push_back(std::move(driver));
}
}
resumePromises.swap(self->resumePromises_);
}

// Get the stats and free the resources of Drivers that were not on thread.
Expand Down Expand Up @@ -2736,6 +2747,25 @@ ContinueFuture Task::requestPause() {
return makeFinishFutureLocked("Task::requestPause");
}

bool Task::pauseRequested(ContinueFuture* future) {
if (FOLLY_LIKELY(future == nullptr)) {
// Once 'pauseRequested_' is set, it will not be cleared until
// task::resume(). It is therefore OK to read it without a mutex
// from a thread that this flag concerns.
return pauseRequested_;
}

std::lock_guard<std::timed_mutex> l(mutex_);
if (!pauseRequested_) {
VELOX_CHECK(resumePromises_.empty())
*future = ContinueFuture::makeEmpty();
return false;
}
resumePromises_.emplace_back("Task::isPaused");
*future = resumePromises_.back().getSemiFuture();
return true;
}

void Task::createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId) {
Expand Down
14 changes: 7 additions & 7 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,10 @@ class Task : public std::enable_shared_from_this<Task> {
/// 'this' at the time of requesting yield. Returns 0 if yield not requested.
int32_t yieldIfDue(uint64_t startTimeMicros);

/// Once 'pauseRequested_' is set, it will not be cleared until
/// task::resume(). It is therefore OK to read it without a mutex
/// from a thread that this flag concerns.
bool pauseRequested() const {
return pauseRequested_;
}
/// Check if the task is requested to pause. If it is true and 'future' is not
/// null, a task resume future is returned which will be fulfilled once the
/// task is resumed.
bool pauseRequested(ContinueFuture* future = nullptr);

std::timed_mutex& mutex() {
return mutex_;
Expand Down Expand Up @@ -1155,7 +1153,9 @@ class Task : public std::enable_shared_from_this<Task> {
// terminate(). They are fulfilled when the last thread stops
// running for 'this'.
std::vector<ContinuePromise> threadFinishPromises_;

// Promises for the futures returned to callers of pauseRequested().
// They are fulfilled when `resume()` is called for this task.
std::vector<ContinuePromise> resumePromises_;
// Base spill directory for this task.
std::string spillDirectory_;

Expand Down
Loading

0 comments on commit 7959f9c

Please sign in to comment.