Skip to content

Commit

Permalink
Release excessively reserved memory in HashBuild even if non-reclaima…
Browse files Browse the repository at this point in the history
…ble (facebookincubator#10782)

Summary:
When hash build is under table building stage, we reserve excessive amount of memory to account for the worst case scenario duplicate rows for NextRowVector (i.e. we assume every row in build table has duplicates, which in most cases is not true). Other than let the query fail because the current stage is unreclaimable, we can perform a desperate try to release the unused reserved memory, giving the query a chance to succeed.

Pull Request resolved: facebookincubator#10782

Reviewed By: xiaoxmeng

Differential Revision: D61510068

Pulled By: tanjialiang

fbshipit-source-id: 1d62804d22ab11d08080e7cf872da0656cbd1010
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Aug 20, 2024
1 parent 9e48da6 commit 55888da
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
19 changes: 15 additions & 4 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1094,10 +1094,24 @@ void HashBuild::reclaim(
VELOX_CHECK(canReclaim());
auto* driver = operatorCtx_->driver();
VELOX_CHECK_NOT_NULL(driver);
VELOX_CHECK(!nonReclaimableSection_);

TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

const auto& task = driver->task();
const std::vector<Operator*> operators =
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);
// Worst case scenario reservation was performed in ensureTableFits() when
// accounting for NextRowVector for duplicated rows, i.e. we assume every
// single row has duplicates. That is normally not the case. So when the
// query is under memory pressure, the excessive (in most cases) reservations
// can be returned.
for (auto i = 0; i <= operators.size(); i++) {
auto* memoryPool = i == 0 ? pool() : operators[i - 1]->pool();
const auto oldReservedBytes = memoryPool->reservedBytes();
memoryPool->release();
stats.reclaimedBytes += (oldReservedBytes - memoryPool->reservedBytes());
}

if (exceededMaxSpillLevelLimit_) {
return;
}
Expand All @@ -1120,10 +1134,7 @@ void HashBuild::reclaim(
return;
}

const auto& task = driver->task();
VELOX_CHECK(task->pauseRequested());
const std::vector<Operator*> operators =
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);
for (auto* op : operators) {
HashBuild* buildOp = dynamic_cast<HashBuild*>(op);
VELOX_CHECK_NOT_NULL(buildOp);
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,11 @@ uint64_t Operator::MemoryReclaimer::reclaim(
"facebook::velox::exec::Operator::MemoryReclaimer::reclaim", pool);

// NOTE: we can't reclaim memory from an operator which is under
// non-reclaimable section, except for HashBuild operator. If it is HashBuild
// operator, we allow it to enter HashBuild::reclaim because there is a good
// chance we can release some unused reserved memory even if it's in
// non-reclaimable section.
if (op_->nonReclaimableSection_) {
if (op_->nonReclaimableSection_ && op_->operatorType() != "HashBuild") {
// TODO: reduce the log frequency if it is too verbose.
++stats.numNonReclaimableAttempts;
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
Expand Down
19 changes: 13 additions & 6 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5925,22 +5925,29 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) {
ASSERT_EQ(reclaimable, enableSpilling);
if (enableSpilling) {
ASSERT_GE(reclaimableBytes, 0);
op->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
} else {
ASSERT_EQ(reclaimableBytes, 0);
VELOX_ASSERT_THROW(
op->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_),
"");
}
VELOX_ASSERT_THROW(
op->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_),
"");

driverWait.notify();
Task::resume(task);
task.reset();

taskThread.join();
if (enableSpilling) {
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
} else {
ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{0});
}
}
ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{0});
}

DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) {
Expand Down

0 comments on commit 55888da

Please sign in to comment.