Skip to content

Commit

Permalink
Add previous row to compare peer group
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Aug 20, 2024
1 parent c5eb526 commit d338045
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 33 deletions.
3 changes: 2 additions & 1 deletion velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ RowsStreamingWindowBuild::RowsStreamingWindowBuild(
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
this);
pool_ = pool;
}

void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {
Expand All @@ -37,7 +38,7 @@ void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {

if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), inversedInputChannels_, sortKeyInfo_));
data_.get(), pool_, inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class RowsStreamingWindowBuild : public WindowBuild {

// Holds all the built window partitions.
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;

velox::memory::MemoryPool* pool_;
};

} // namespace facebook::velox::exec
82 changes: 53 additions & 29 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ WindowPartition::WindowPartition(
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo,
bool partial,
bool complete)
bool complete,
velox::memory::MemoryPool* pool)
: partial_(partial),
data_(data),
partition_(rows),
complete_(complete),
inputMapping_(inputMapping),
sortKeyInfo_(sortKeyInfo) {
sortKeyInfo_(sortKeyInfo),
pool_(pool) {
VELOX_CHECK_NE(partial_, complete_);
VELOX_CHECK_NE(complete_, partition_.empty());

Expand All @@ -43,13 +45,21 @@ WindowPartition::WindowPartition(
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
: WindowPartition(data, rows, inputMapping, sortKeyInfo, false, true) {}
: WindowPartition(
data,
rows,
inputMapping,
sortKeyInfo,
false,
true,
nullptr) {}

WindowPartition::WindowPartition(
RowContainer* data,
velox::memory::MemoryPool* pool,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
: WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false) {}
: WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false, pool) {}

void WindowPartition::addRows(const std::vector<char*>& rows) {
checkPartial();
Expand All @@ -67,23 +77,13 @@ void WindowPartition::eraseRows(vector_size_t numRows) {

void WindowPartition::removeProcessedRows(vector_size_t numRows) {
checkPartial();

if (complete_ && rows_.size() == numRows) {
eraseRows(numRows);
} else {
eraseRows(numRows - 1);
}
eraseRows(numRows);
startRow_ += numRows;
}

vector_size_t WindowPartition::numRowsForProcessing(
vector_size_t partitionOffset) const {
if (partial_) {
if (startRow_ > 0) {
VELOX_CHECK(!partition_.empty());
return partition_.size() - 1;
}

return partition_.size();
} else {
return partition_.size() - partitionOffset;
Expand Down Expand Up @@ -173,13 +173,15 @@ WindowPartition::extractNulls(
: std::nullopt;
}

bool WindowPartition::compareRowsWithSortKeys(const char* lhs, const char* rhs)
const {
bool WindowPartition::compareRowsWithSortKeys(
const char* lhs,
const char* rhs,
RowContainer* data) const {
if (lhs == rhs) {
return false;
}
for (auto& key : sortKeyInfo_) {
if (auto result = data_->compare(
if (auto result = data->compare(
lhs,
rhs,
key.first,
Expand All @@ -193,12 +195,14 @@ bool WindowPartition::compareRowsWithSortKeys(const char* lhs, const char* rhs)
vector_size_t WindowPartition::findPeerRowEndIndex(
vector_size_t startRow,
vector_size_t lastRow,
const std::function<bool(const char*, const char*)>& peerCompare) {
const std::function<bool(const char*, const char*, RowContainer* data)>&
peerCompare) {
auto peerEnd = startRow;
while (peerEnd <= lastRow) {
if (peerCompare(
partition_[startRow - startRow_],
partition_[peerEnd - startRow_])) {
partition_[peerEnd - startRow_],
data_)) {
break;
}
++peerEnd;
Expand All @@ -213,8 +217,9 @@ std::pair<vector_size_t, vector_size_t> WindowPartition::computePeerBuffers(
vector_size_t prevPeerEnd,
vector_size_t* rawPeerStarts,
vector_size_t* rawPeerEnds) {
const auto peerCompare = [&](const char* lhs, const char* rhs) -> bool {
return compareRowsWithSortKeys(lhs, rhs);
const auto peerCompare =
[&](const char* lhs, const char* rhs, RowContainer* data) -> bool {
return compareRowsWithSortKeys(lhs, rhs, data);
};

VELOX_CHECK_LE(end, numRows() + startRow_);
Expand All @@ -226,13 +231,22 @@ std::pair<vector_size_t, vector_size_t> WindowPartition::computePeerBuffers(
size_t next = start;
size_t index{0};
if (partial_ && start > 0) {
const auto peerGroup = peerCompare(partition_[0], partition_[1]);

// The first row is the last row in previous batch so delete it after used
// for the first peer group detection. Correspondingly, we need to decrement
// 'lastPartitionRow'.
eraseRows(1);
--lastPartitionRow;
VELOX_CHECK(previousRow_);
// rowContainer is used to compare whether the current first row is in same
// peer with the last row of previous batch.
auto rowContainer =
std::make_unique<RowContainer>(data_->keyTypes(), pool_);
auto serialized =
BaseVector::create<FlatVector<StringView>>(VARBINARY(), 1, pool_);
data_->extractSerializedRows(
folly::Range(rows_.data() + (start - startRow_), 1), serialized);

auto firstRow = rowContainer->newRow();
rowContainer->storeSerializedRow(*serialized, 0, firstRow);
auto secondRow = rowContainer->newRow();
rowContainer->storeSerializedRow(*previousRow_, 0, secondRow);

auto peerGroup = peerCompare(secondRow, firstRow, rowContainer.get());

if (!peerGroup) {
peerEnd = findPeerRowEndIndex(start, lastPartitionRow, peerCompare);
Expand Down Expand Up @@ -264,6 +278,16 @@ std::pair<vector_size_t, vector_size_t> WindowPartition::computePeerBuffers(
rawPeerEnds[index] = peerEnd - 1;
}
VELOX_CHECK_EQ(index, end - start);

if (partial_) {
// Store the last row in the previous batch for the next batch compare
// whether is in same peer group.
previousRow_ =
BaseVector::create<FlatVector<StringView>>(VARBINARY(), 1, pool_);
data_->extractSerializedRows(
folly::Range(rows_.data() + (end - 1 - startRow_), 1), previousRow_);
}

return {peerStart, peerEnd};
}

Expand Down
15 changes: 12 additions & 3 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class WindowPartition {
/// set for the constructed window partition.
WindowPartition(
RowContainer* data,
velox::memory::MemoryPool* pool,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>&
sortKeyInfo);
Expand Down Expand Up @@ -175,14 +176,19 @@ class WindowPartition {
const std::vector<std::pair<column_index_t, core::SortOrder>>&
sortKeyInfo,
bool partial,
bool complete);
bool compareRowsWithSortKeys(const char* lhs, const char* rhs) const;
bool complete,
velox::memory::MemoryPool* pool);
bool compareRowsWithSortKeys(
const char* lhs,
const char* rhs,
RowContainer* data) const;

// Finds the index of the last peer row in range of ['startRow', 'lastRow'].
vector_size_t findPeerRowEndIndex(
vector_size_t startRow,
vector_size_t lastRow,
const std::function<bool(const char*, const char*)>& peerCompare);
const std::function<bool(const char*, const char*, RowContainer* data)>&
peerCompare);

// Removes 'numRows' from 'data_' and 'rows_'.
void eraseRows(vector_size_t numRows);
Expand Down Expand Up @@ -266,5 +272,8 @@ class WindowPartition {
// partial partition during the data processing but always zero for
// non-partial partition.
vector_size_t startRow_{0};

std::shared_ptr<FlatVector<StringView>> previousRow_ = nullptr;
velox::memory::MemoryPool* pool_;
};
} // namespace facebook::velox::exec

0 comments on commit d338045

Please sign in to comment.