diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 550d43de2b17..d88b13a60ebb 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -1094,10 +1094,24 @@ void HashBuild::reclaim( VELOX_CHECK(canReclaim()); auto* driver = operatorCtx_->driver(); VELOX_CHECK_NOT_NULL(driver); - VELOX_CHECK(!nonReclaimableSection_); TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this); + const auto& task = driver->task(); + const std::vector operators = + task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this); + // Worst case scenario reservation was performed in ensureTableFits() when + // accounting for NextRowVector for duplicated rows, i.e. we assume every + // single row has duplicates. That is normally not the case. So when the + // query is under memory pressure, the excessive (in most cases) reservations + // can be returned. + for (auto i = 0; i <= operators.size(); i++) { + auto* memoryPool = i == 0 ? pool() : operators[i - 1]->pool(); + const auto oldReservedBytes = memoryPool->reservedBytes(); + memoryPool->release(); + stats.reclaimedBytes += (oldReservedBytes - memoryPool->reservedBytes()); + } + if (exceededMaxSpillLevelLimit_) { return; } @@ -1120,10 +1134,7 @@ void HashBuild::reclaim( return; } - const auto& task = driver->task(); VELOX_CHECK(task->pauseRequested()); - const std::vector operators = - task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this); for (auto* op : operators) { HashBuild* buildOp = dynamic_cast(op); VELOX_CHECK_NOT_NULL(buildOp); diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 449a2aaece91..9f46464cf2b5 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -643,8 +643,11 @@ uint64_t Operator::MemoryReclaimer::reclaim( "facebook::velox::exec::Operator::MemoryReclaimer::reclaim", pool); // NOTE: we can't reclaim memory from an operator which is under + // non-reclaimable section, except for HashBuild operator. If it is HashBuild + // operator, we allow it to enter HashBuild::reclaim because there is a good + // chance we can release some unused reserved memory even if it's in // non-reclaimable section. - if (op_->nonReclaimableSection_) { + if (op_->nonReclaimableSection_ && op_->operatorType() != "HashBuild") { // TODO: reduce the log frequency if it is too verbose. ++stats.numNonReclaimableAttempts; RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 74578b37ba26..37601f0f913c 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5925,22 +5925,29 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) { ASSERT_EQ(reclaimable, enableSpilling); if (enableSpilling) { ASSERT_GE(reclaimableBytes, 0); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_); } else { ASSERT_EQ(reclaimableBytes, 0); + VELOX_ASSERT_THROW( + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_), + ""); } - VELOX_ASSERT_THROW( - op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), - reclaimerStats_), - ""); driverWait.notify(); Task::resume(task); task.reset(); taskThread.join(); + if (enableSpilling) { + ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + } else { + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{0}); + } } - ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{0}); } DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) {