diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 7f3e9201f124..2bfc4908d2f6 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -266,9 +266,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { - ScopedTimer timer(&sortTime_); + // Count copy row time into sortTime_. + Timer sortTime{}; // Serialize [begin, end) - uint64_t offset = 0; + int64_t offset = 0; char* addr; uint32_t size; @@ -278,14 +279,9 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ addr = pageAddresses_[pageIndex.first] + pageIndex.second; size = *(RowSizeType*)addr; if (offset + size > kSortedBufferSize) { - VELOX_CHECK(offset > 0); - auto payload = std::make_unique( - index - begin, - nullptr, - std::vector>{std::make_shared(rawBuffer_, offset)}); - updateSpillMetrics(payload); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + sortTime.stop(); + RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset)); + sortTime.start(); begin = index; offset = 0; } @@ -293,27 +289,37 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ offset += size; index++; } + sortTime.stop(); + RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset)); + + sortTime_ += sortTime.realTimeUsed(); + return arrow::Status::OK(); +} + +arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength) { + VELOX_CHECK(rawLength > 0); auto payload = std::make_unique( - end - begin, + numRows, nullptr, - std::vector>{std::make_shared(rawBuffer_, offset)}); + std::vector>{std::make_shared(rawBuffer_, rawLength)}); updateSpillMetrics(payload); RETURN_NOT_OK( partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); return arrow::Status::OK(); } -uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) { +uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t remainingRows) { // Check how many rows can be handled. if (pages_.empty()) { return 0; } auto remainingBytes = pages_.back()->size() - pageCursor_; if (fixedRowSize_) { - return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), rows); + return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), remainingRows); } auto beginIter = rowSizePrefixSum_.begin() + 1 + offset; - auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes); + auto bytesWritten = rowSizePrefixSum_[offset]; + auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes + bytesWritten); return iter - beginIter; } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 34fbfd243df4..1626573a7dc6 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -77,7 +77,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end); - uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows); + arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength); + + uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows); void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired); diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index af9d5a58db0d..7cbfbcd79cc9 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -407,7 +407,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) { shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), {{blockPid1}, {blockPid2}}); } -TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) { +TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) { + if (GetParam().shuffleWriterType != kSortShuffle) { + return; + } + ASSERT_NOT_OK(initShuffleWriterOptions()); + auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get()); + + // Set memLimit to 0 to force allocate a new buffer for each row. + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0)); + + auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}}); + auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}}); + shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), {{blockPid1}, {blockPid2}}); +} + +TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) { ASSERT_NOT_OK(initShuffleWriterOptions()); std::shared_ptr pool = std::make_shared(); shuffleWriterOptions_.bufferSize = 4; @@ -425,7 +440,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) { ASSERT_TRUE(pool->bytes_allocated() == 0); } -TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) { +TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) { ASSERT_NOT_OK(initShuffleWriterOptions()); std::shared_ptr pool = std::make_shared(0); shuffleWriterOptions_.bufferSize = 4; @@ -438,7 +453,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) { ASSERT_TRUE(status.IsOutOfMemory()); } -TEST_F(VeloxShuffleWriterMemoryTest, kInit) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferSize = 4; auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get()); @@ -508,7 +523,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInit) { ASSERT_NOT_OK(shuffleWriter->stop()); } -TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.partitioning = Partitioning::kSingle; shuffleWriterOptions_.bufferSize = 4; @@ -530,7 +545,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) { ASSERT_NOT_OK(shuffleWriter->stop()); } -TEST_F(VeloxShuffleWriterMemoryTest, kSplit) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferSize = 4; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); @@ -552,7 +567,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplit) { ASSERT_NOT_OK(shuffleWriter->stop()); } -TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.partitioning = Partitioning::kSingle; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); @@ -570,7 +585,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) { ASSERT_NOT_OK(shuffleWriter->stop()); } -TEST_F(VeloxShuffleWriterMemoryTest, kStop) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) { for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.partitioning = partitioning; @@ -592,7 +607,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) { } } -TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferSize = 4; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); @@ -613,7 +628,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) { ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); })); } -TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) { +TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferSize = 4; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); @@ -635,7 +650,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) { ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0); } -TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) { +TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.partitioning = Partitioning::kSingle; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get()); @@ -657,7 +672,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) { ASSERT_EQ(evicted, 0); } -TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) { +TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferReallocThreshold = 1; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index d32e3272186b..102c73ca49fa 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -196,9 +196,12 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase inputVectorComplex_ = makeRowVector(childrenComplex_); } - arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) { + arrow::Status splitRowVector( + VeloxShuffleWriter& shuffleWriter, + facebook::velox::RowVectorPtr vector, + int64_t memLimit = ShuffleWriter::kMinMemLimit) { std::shared_ptr cb = std::make_shared(vector); - return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit); + return shuffleWriter.write(cb, memLimit); } // Create multiple local dirs and join with comma. @@ -533,7 +536,7 @@ class RoundRobinPartitioningShuffleWriter : public MultiplePartitioningShuffleWr } }; -class VeloxShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public testing::Test { +class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public testing::Test { protected: static void SetUpTestCase() { facebook::velox::memory::MemoryManager::testingSetInstance({});