-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-33484: [C++][Compute] Implement Grouper::Reset
#41352
Changes from all commits
c232972
7c34623
12a0410
0cbdb27
536dbce
184392e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -217,18 +217,18 @@ struct SimpleKeySegmenter : public BaseRowSegmenter { | |
struct AnyKeysSegmenter : public BaseRowSegmenter { | ||
static Result<std::unique_ptr<RowSegmenter>> Make( | ||
const std::vector<TypeHolder>& key_types, ExecContext* ctx) { | ||
ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx)); // check types | ||
return std::make_unique<AnyKeysSegmenter>(key_types, ctx); | ||
ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_types, ctx)); // check types | ||
return std::make_unique<AnyKeysSegmenter>(key_types, ctx, std::move(grouper)); | ||
} | ||
|
||
AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx) | ||
AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx, | ||
std::unique_ptr<Grouper> grouper) | ||
: BaseRowSegmenter(key_types), | ||
ctx_(ctx), | ||
grouper_(nullptr), | ||
grouper_(std::move(grouper)), | ||
save_group_id_(kNoGroupId) {} | ||
|
||
Status Reset() override { | ||
grouper_ = nullptr; | ||
ARROW_RETURN_NOT_OK(grouper_->Reset()); | ||
save_group_id_ = kNoGroupId; | ||
return Status::OK(); | ||
} | ||
|
@@ -245,7 +245,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter { | |
// first row of a new segment to see if it extends the previous segment. | ||
template <typename Batch> | ||
Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) { | ||
if (!grouper_) return kNoGroupId; | ||
ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset, | ||
/*length=*/1)); | ||
if (!datum.is_array()) { | ||
|
@@ -264,9 +263,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter { | |
if (offset == batch.length) { | ||
return MakeSegment(batch.length, offset, 0, kEmptyExtends); | ||
} | ||
// ARROW-18311: make Grouper support Reset() | ||
// so it can be reset instead of recreated below | ||
// | ||
// the group id must be computed prior to resetting the grouper, since it is compared | ||
// to save_group_id_, and after resetting the grouper produces incomparable group ids | ||
ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset)); | ||
|
@@ -276,7 +272,7 @@ struct AnyKeysSegmenter : public BaseRowSegmenter { | |
return extends; | ||
}; | ||
// resetting drops grouper's group-ids, freeing-up memory for the next segment | ||
ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_)); // TODO: reset it | ||
ARROW_RETURN_NOT_OK(grouper_->Reset()); | ||
// GH-34475: cache the grouper-consume result across invocations of GetNextSegment | ||
ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset)); | ||
if (datum.is_array()) { | ||
|
@@ -299,7 +295,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter { | |
} | ||
|
||
private: | ||
ExecContext* const ctx_; | ||
std::unique_ptr<Grouper> grouper_; | ||
group_id_t save_group_id_; | ||
}; | ||
|
@@ -354,6 +349,7 @@ struct GrouperNoKeysImpl : Grouper { | |
RETURN_NOT_OK(builder->Finish(&array)); | ||
return std::move(array); | ||
} | ||
Status Reset() override { return Status::OK(); } | ||
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override { | ||
ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0)); | ||
return Datum(array); | ||
|
@@ -419,6 +415,14 @@ struct GrouperImpl : public Grouper { | |
return std::move(impl); | ||
} | ||
|
||
Status Reset() override { | ||
map_.clear(); | ||
offsets_.clear(); | ||
key_bytes_.clear(); | ||
num_groups_ = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From what I saw in https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/row_encoder_internal.h, none of the encoders holds states that would change during the lifespan of the grouper. |
||
return Status::OK(); | ||
} | ||
|
||
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override { | ||
ARROW_RETURN_NOT_OK(CheckAndCapLengthForConsume(batch.length, offset, &length)); | ||
if (offset != 0 || length != batch.length) { | ||
|
@@ -595,7 +599,17 @@ struct GrouperFastImpl : public Grouper { | |
return std::move(impl); | ||
} | ||
|
||
~GrouperFastImpl() { map_.cleanup(); } | ||
Status Reset() override { | ||
rows_.Clean(); | ||
rows_minibatch_.Clean(); | ||
map_.cleanup(); | ||
RETURN_NOT_OK(map_.init(encode_ctx_.hardware_flags, ctx_->memory_pool())); | ||
pitrou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// TODO: It is now assumed that the dictionaries_ are identical to the first batch | ||
// throughout the grouper's lifespan so no resetting is needed. But if we want to | ||
// support different dictionaries for different batches, we need to reset the | ||
// dictionaries_ here. | ||
return Status::OK(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think we should enlarge the size of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think all the temp vectors allocated by the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, i see, thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a DCHECK that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that's probably reasonable. But considering current temp stack doesn't have a public method for size/empty check, and I have other open PRs for temp stack restructure, I'd add the necessary methods after other PRs are done and get this one rebased. |
||
} | ||
|
||
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override { | ||
ARROW_RETURN_NOT_OK(CheckAndCapLengthForConsume(batch.length, offset, &length)); | ||
|
@@ -838,8 +852,7 @@ struct GrouperFastImpl : public Grouper { | |
return out; | ||
} | ||
|
||
static constexpr int log_minibatch_max_ = 10; | ||
static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_; | ||
static constexpr int minibatch_size_max_ = arrow::util::MiniBatch::kMiniBatchLength; | ||
static constexpr int minibatch_size_min_ = 128; | ||
int minibatch_size_; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should something be checked about
segment.length
here? Or is undetermined?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last
segment.length
is supposed to be0
. Updated.