Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Reduce spill in sort-based shuffle #6639

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 79 additions & 67 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class LocalPartitionWriter::LocalSpiller {
public:
LocalSpiller(
std::shared_ptr<arrow::io::OutputStream> os,
const std::string& spillFile,
std::string spillFile,
uint32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: os_(os),
spillFile_(spillFile),
spillFile_(std::move(spillFile)),
compressionThreshold_(compressionThreshold),
pool_(pool),
codec_(codec),
Expand Down Expand Up @@ -69,28 +69,24 @@ class LocalPartitionWriter::LocalSpiller {
return arrow::Status::OK();
}

arrow::Result<std::shared_ptr<Spill>> finish() {
if (finished_) {
return arrow::Status::Invalid("Calling toBlockPayload() on a finished SpillEvictor.");
}
arrow::Result<std::shared_ptr<Spill>> finish(bool close) {
ARROW_RETURN_IF(finished_, arrow::Status::Invalid("Calling finish() on a finished LocalSpiller."));
ARROW_RETURN_IF(os_->closed(), arrow::Status::Invalid("Spill file os has been closed."));

finished_ = true;
RETURN_NOT_OK(os_->Close());
diskSpill_->setSpillFile(std::move(spillFile_));
if (close) {
RETURN_NOT_OK(os_->Close());
}
diskSpill_->setSpillFile(spillFile_);
diskSpill_->setSpillTime(spillTime_);
diskSpill_->setCompressTime(compressTime_);
return std::move(diskSpill_);
}

bool finished() const {
return finished_;
}

int64_t getSpillTime() const {
return spillTime_;
}

int64_t getCompressTime() const {
return compressTime_;
}

private:
std::shared_ptr<arrow::io::OutputStream> os_;
std::string spillFile_;
Expand Down Expand Up @@ -442,9 +438,30 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
}
stopped_ = true;

RETURN_NOT_OK(finishSpill());
if (useSpillFileAsDataFile_) {
RETURN_NOT_OK(finishSpill(false));
// The last spill has been written to data file.
auto spill = std::move(spills_.back());
spills_.pop_back();

// Merge the remaining partitions from spills.
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid < numPartitions_; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}

if (!useSpillFileAsDataFile_) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
while (auto payload = spill->nextPayload(pid)) {
partitionLengths_[pid] += payload->rawSize();
}
}
writeTime_ = spill->spillTime();
compressTime_ += spill->compressTime();
} else {
RETURN_NOT_OK(finishSpill(true));
// Open final data file.
// If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill.
RETURN_NOT_OK(openDataFile());
Expand Down Expand Up @@ -473,33 +490,24 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
partitionLengths_[pid] = endInFinalFile - startInFinalFile;
}

for (const auto& spill : spills_) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (spill->hasNextPayload(pid)) {
return arrow::Status::Invalid("Merging from spill is not exhausted.");
}
}
}

ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell());

// Close Final file. Clear buffered resources.
RETURN_NOT_OK(clearResource());
} else {
auto spill = std::move(spills_.back());
}
// Close Final file. Clear buffered resources.
RETURN_NOT_OK(clearResource());
// Check all spills are merged.
auto s = 0;
for (const auto& spill : spills_) {
compressTime_ += spill->compressTime();
spillTime_ += spill->spillTime();
for (auto pid = 0; pid < numPartitions_; ++pid) {
uint64_t length = 0;
while (auto payload = spill->nextPayload(pid)) {
length += payload->rawSize();
if (spill->hasNextPayload(pid)) {
return arrow::Status::Invalid(
"Merging from spill " + std::to_string(s) + " is not exhausted. pid: " + std::to_string(pid));
}
partitionLengths_[pid] = length;
}
totalBytesWritten_ = std::filesystem::file_size(dataFile_);
writeTime_ = spillTime_;
spillTime_ = 0;
DLOG(INFO) << "Use spill file as data file: " << dataFile_;
++s;
}
spills_.clear();

// Populate shuffle writer metrics.
RETURN_NOT_OK(populateMetrics(metrics));
return arrow::Status::OK();
Expand All @@ -508,27 +516,29 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) {
if (!spiller_ || spiller_->finished()) {
std::string spillFile;
if (isFinal && useSpillFileAsDataFile()) {
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
RETURN_NOT_OK(openDataFile());
spillFile = dataFile_;
os = dataFileOs_;
useSpillFileAsDataFile_ = true;
} else {
ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
}
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
spiller_ = std::make_unique<LocalSpiller>(
os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get());
}
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::finishSpill() {
arrow::Status LocalPartitionWriter::finishSpill(bool close) {
// Finish the spiller. No compression, no spill.
if (spiller_ && !spiller_->finished()) {
auto spiller = std::move(spiller_);
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish());
spillTime_ += spiller->getSpillTime();
compressTime_ += spiller->getCompressTime();
ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish(close));
}
return arrow::Status::OK();
}
Expand All @@ -543,18 +553,29 @@ arrow::Status LocalPartitionWriter::evict(
rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();

if (evictType == Evict::kSortSpill) {
if (partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill());
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill(true));
}
lastEvictPid_ = partitionId;

RETURN_NOT_OK(requestSpill(isFinal));

auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
if (!isFinal) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

Expand Down Expand Up @@ -586,8 +607,8 @@ arrow::Status LocalPartitionWriter::evict(
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) {
rawPartitionLengths_[partitionId] += blockPayload->rawSize();

if (partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill());
if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill(true));
}
lastEvictPid_ = partitionId;

Expand All @@ -598,7 +619,7 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<

arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
// Finish last spiller.
RETURN_NOT_OK(finishSpill());
RETURN_NOT_OK(finishSpill(true));

int64_t reclaimed = 0;
// Reclaim memory from payloadCache.
Expand Down Expand Up @@ -629,7 +650,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
// This is not accurate. When the evicted partition buffers are not copied, the merged ones
// are resized from the original buffers thus allocated from partitionBufferPool.
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
RETURN_NOT_OK(finishSpill());
RETURN_NOT_OK(finishSpill(true));
}
*actual = reclaimed;
return arrow::Status::OK();
Expand All @@ -646,18 +667,9 @@ arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
metrics->totalEvictTime += spillTime_;
metrics->totalWriteTime += writeTime_;
metrics->totalBytesEvicted += totalBytesEvicted_;
metrics->totalBytesWritten += totalBytesWritten_;
metrics->totalBytesWritten += std::filesystem::file_size(dataFile_);
metrics->partitionLengths = std::move(partitionLengths_);
metrics->rawPartitionLengths = std::move(rawPartitionLengths_);
return arrow::Status::OK();
}

bool LocalPartitionWriter::useSpillFileAsDataFile() {
if (!payloadCache_ && !merger_ && !spiller_ && spills_.size() == 0) {
useSpillFileAsDataFile_ = true;
return true;
}
return false;
}

} // namespace gluten
7 changes: 2 additions & 5 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class LocalPartitionWriter : public PartitionWriter {

arrow::Status requestSpill(bool isFinal);

arrow::Status finishSpill();
arrow::Status finishSpill(bool close);

std::string nextSpilledFileDir();

Expand All @@ -95,8 +95,6 @@ class LocalPartitionWriter : public PartitionWriter {

arrow::Status populateMetrics(ShuffleWriterMetrics* metrics);

bool useSpillFileAsDataFile();

std::string dataFile_;
std::vector<std::string> localDirs_;

Expand All @@ -113,10 +111,9 @@ class LocalPartitionWriter : public PartitionWriter {
std::shared_ptr<arrow::io::OutputStream> dataFileOs_;

int64_t totalBytesEvicted_{0};
int64_t totalBytesWritten_{0};
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;

uint32_t lastEvictPid_{0};
int32_t lastEvictPid_{-1};
};
} // namespace gluten
16 changes: 16 additions & 0 deletions cpp/core/shuffle/Spill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,23 @@ void Spill::setSpillFile(const std::string& spillFile) {
spillFile_ = spillFile;
}

void Spill::setSpillTime(int64_t spillTime) {
spillTime_ = spillTime;
}

void Spill::setCompressTime(int64_t compressTime) {
compressTime_ = compressTime;
}

std::string Spill::spillFile() const {
return spillFile_;
}

int64_t Spill::spillTime() const {
return spillTime_;
}

int64_t Spill::compressTime() const {
return compressTime_;
}
} // namespace gluten
10 changes: 10 additions & 0 deletions cpp/core/shuffle/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,16 @@ class Spill final {

void setSpillFile(const std::string& spillFile);

void setSpillTime(int64_t spillTime);

void setCompressTime(int64_t compressTime);

std::string spillFile() const;

int64_t spillTime() const;

int64_t compressTime() const;

private:
struct PartitionPayload {
uint32_t partitionId{};
Expand All @@ -65,6 +73,8 @@ class Spill final {
std::list<PartitionPayload> partitionPayloads_{};
std::shared_ptr<arrow::io::MemoryMappedFile> inputStream_{};
std::string spillFile_;
int64_t spillTime_;
int64_t compressTime_;

arrow::io::InputStream* rawIs_;

Expand Down
6 changes: 4 additions & 2 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,20 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u
}
}

arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) {
arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) {
if ((uint64_t)offset_ + nextRows > std::numeric_limits<uint32_t>::max()) {
RETURN_NOT_OK(evictAllPartitions());
}
return arrow::Status::OK();
}

arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
VELOX_CHECK(offset_ > 0);
EvictGuard evictGuard{evictState_};

auto numRecords = offset_;
// offset_ is used for checking spillable data.
offset_ = 0;
int32_t begin = 0;
{
ScopedTimer timer(&sortTime_);
Expand Down Expand Up @@ -257,7 +260,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
pageCursor_ = 0;

// Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill.
offset_ = 0;
initArray();
}
return arrow::Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows);

arrow::Status maybeSpill(int32_t nextRows);
arrow::Status maybeSpill(uint32_t nextRows);

arrow::Status evictAllPartitions();

Expand Down
Loading