diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 08ef1c253675c..58cdae0683558 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -186,15 +186,19 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema, *ss << ']'; } +// Handle the input batch +// If a segment is closed by this batch, then we output the aggregation for the segment +// If a segment is not closed by this batch, then we add the batch to the segment template -Status HandleSegments(std::unique_ptr& segmenter, - const ExecBatch& batch, const std::vector& ids, - const BatchHandler& handle_batch) { +Status HandleSegments(std::unique_ptr& segmenter, const ExecBatch& batch, + const std::vector& ids, const BatchHandler& handle_batch) { int64_t offset = 0; ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids)); ExecSpan segment_batch(segment_exec_batch); + while (true) { - ARROW_ASSIGN_OR_RAISE(auto segment, segmenter->GetNextSegment(segment_batch, offset)); + ARROW_ASSIGN_OR_RAISE(compute::SegmentPiece segment, + segmenter->GetNextSegmentPiece(segment_batch, offset)); if (segment.offset >= segment_batch.length) break; // condition of no-next-segment ARROW_RETURN_NOT_OK(handle_batch(batch, segment)); offset = segment.offset + segment.length; @@ -234,11 +238,12 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { public: ScalarAggregateNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, - std::unique_ptr segmenter, + std::unique_ptr segmenter, std::vector segment_field_ids, std::vector> target_fieldsets, std::vector aggs, std::vector kernels, + std::vector> kernel_intypes, std::vector>> states) : ExecNode(plan, std::move(inputs), {"target"}, /*output_schema=*/std::move(output_schema)), @@ -248,16 +253,8 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { target_fieldsets_(std::move(target_fieldsets)), aggs_(std::move(aggs)), kernels_(std::move(kernels)), - states_(std::move(states)) { - const auto& input_schema = *this->inputs()[0]->output_schema(); - for (size_t i = 0; i < kernels_.size(); ++i) { - std::vector in_types; - for (const auto& target : target_fieldsets_[i]) { - in_types.emplace_back(input_schema.field(target)->type().get()); - } - in_typesets_.push_back(std::move(in_types)); - } - } + kernel_intypes_(std::move(kernel_intypes)), + states_(std::move(states)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -282,7 +279,7 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::vector segment_field_ids(segment_keys.size()); std::vector segment_key_types(segment_keys.size()); for (size_t i = 0; i < segment_keys.size(); i++) { - ARROW_ASSIGN_OR_RAISE(auto match, segment_keys[i].FindOne(input_schema)); + ARROW_ASSIGN_OR_RAISE(FieldPath match, segment_keys[i].FindOne(input_schema)); if (match.indices().size() > 1) { // ARROW-18369: Support nested references as segment ids return Status::Invalid("Nested references cannot be used as segment ids"); @@ -291,9 +288,10 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { segment_key_types[i] = input_schema.field(match[0])->type().get(); } - ARROW_ASSIGN_OR_RAISE( - auto segmenter, GroupingSegmenter::Make(std::move(segment_key_types), exec_ctx)); + ARROW_ASSIGN_OR_RAISE(auto segmenter, + RowSegmenter::Make(std::move(segment_key_types), exec_ctx)); + std::vector> kernel_intypes(aggregates.size()); std::vector kernels(aggregates.size()); std::vector>> states(kernels.size()); FieldVector fields(kernels.size() + segment_keys.size()); @@ -324,7 +322,9 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { for (const auto& target : target_fieldsets[i]) { in_types.emplace_back(input_schema.field(target)->type().get()); } - ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact(in_types)); + kernel_intypes[i] = in_types; + ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, + function->DispatchExact(kernel_intypes[i])); kernels[i] = static_cast(kernel); if (aggregates[i].options == nullptr) { @@ -338,13 +338,14 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { KernelContext kernel_ctx{exec_ctx}; states[i].resize(plan->query_context()->max_concurrency()); RETURN_NOT_OK(Kernel::InitAll( - &kernel_ctx, KernelInitArgs{kernels[i], in_types, aggregates[i].options.get()}, + &kernel_ctx, + KernelInitArgs{kernels[i], kernel_intypes[i], aggregates[i].options.get()}, &states[i])); // pick one to resolve the kernel signature kernel_ctx.SetState(states[i][0].get()); ARROW_ASSIGN_OR_RAISE(auto out_type, kernels[i]->signature->out_type().Resolve( - &kernel_ctx, in_types)); + &kernel_ctx, kernel_intypes[i])); fields[i] = field(aggregate_options.aggregates[i].name, out_type.GetSharedPtr()); } @@ -356,7 +357,7 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { return plan->EmplaceNode( plan, std::move(inputs), schema(std::move(fields)), std::move(segmenter), std::move(segment_field_ids), std::move(target_fieldsets), std::move(aggregates), - std::move(kernels), std::move(states)); + std::move(kernels), std::move(kernel_intypes), std::move(states)); } const char* kind_name() const override { return "ScalarAggregateNode"; } @@ -388,12 +389,21 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { auto thread_index = plan_->query_context()->GetThreadIndex(); auto handler = [this, thread_index](const ExecBatch& full_batch, - const GroupingSegment& segment) { - if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult()); + const SegmentPiece& segment) { + // (1) The segment piece is starting of a new segment and points to + // the beginning of the batch, then it means no data in the batch belongs + // to the current segment. We can output and reset kernel states. + if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false)); + + // We add segment piece to the current segment aggregation auto exec_batch = full_batch.Slice(segment.offset, segment.length); RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index)); RETURN_NOT_OK(GetScalarFields(&segmenter_values_, exec_batch, segment_field_ids_)); - if (!segment.is_open) RETURN_NOT_OK(OutputResult()); + + // If the segment piece closes the current segment, we can output segment + // aggregation. + if (!segment.is_open) RETURN_NOT_OK(OutputResult(false)); + return Status::OK(); }; RETURN_NOT_OK(HandleSegments(segmenter_, batch, segment_field_ids_, handler)); @@ -438,20 +448,20 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { } private: - Status ResetAggregates() { + Status ResetKernelStates() { auto exec_ctx = plan()->query_context()->exec_context(); for (size_t i = 0; i < kernels_.size(); ++i) { - const std::vector& in_types = in_typesets_[i]; states_[i].resize(plan()->query_context()->max_concurrency()); KernelContext kernel_ctx{exec_ctx}; RETURN_NOT_OK(Kernel::InitAll( - &kernel_ctx, KernelInitArgs{kernels_[i], in_types, aggs_[i].options.get()}, + &kernel_ctx, + KernelInitArgs{kernels_[i], kernel_intypes_[i], aggs_[i].options.get()}, &states_[i])); } return Status::OK(); } - Status OutputResult(bool is_last = false) { + Status OutputResult(bool is_last) { ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size() + segment_field_ids_.size()); @@ -474,12 +484,12 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { if (is_last) { ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_)); } else { - ARROW_RETURN_NOT_OK(ResetAggregates()); + ARROW_RETURN_NOT_OK(ResetKernelStates()); } return Status::OK(); } - std::unique_ptr segmenter_; + std::unique_ptr segmenter_; const std::vector segment_field_ids_; std::vector segmenter_values_; @@ -487,7 +497,8 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { const std::vector aggs_; const std::vector kernels_; - std::vector> in_typesets_; + // Input type holders for each kernel, used for state initialization + std::vector> kernel_intypes_; std::vector>> states_; AtomicCounter input_counter_; @@ -498,7 +509,7 @@ class GroupByNode : public ExecNode, public TracedNode { public: GroupByNode(ExecNode* input, std::shared_ptr output_schema, std::vector key_field_ids, std::vector segment_key_field_ids, - std::unique_ptr segmenter, + std::unique_ptr segmenter, std::vector> agg_src_types, std::vector> agg_src_fieldsets, std::vector aggs, @@ -591,7 +602,7 @@ class GroupByNode : public ExecNode, public TracedNode { auto ctx = plan->query_context()->exec_context(); ARROW_ASSIGN_OR_RAISE(auto segmenter, - GroupingSegmenter::Make(std::move(segment_key_types), ctx)); + RowSegmenter::Make(std::move(segment_key_types), ctx)); // Construct aggregates ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs, agg_src_types)); @@ -630,12 +641,7 @@ class GroupByNode : public ExecNode, public TracedNode { Status ResetAggregates() { auto ctx = plan()->query_context()->exec_context(); - - ARROW_ASSIGN_OR_RAISE(agg_kernels_, GetKernels(ctx, aggs_, agg_src_types_)); - - ARROW_ASSIGN_OR_RAISE(auto agg_states, - InitKernels(agg_kernels_, ctx, aggs_, agg_src_types_)); - + ARROW_RETURN_NOT_OK(InitKernels(agg_kernels_, ctx, aggs_, agg_src_types_)); return Status::OK(); } @@ -797,7 +803,7 @@ class GroupByNode : public ExecNode, public TracedNode { DCHECK_EQ(input, inputs_[0]); - auto handler = [this](const ExecBatch& full_batch, const GroupingSegment& segment) { + auto handler = [this](const ExecBatch& full_batch, const SegmentPiece& segment) { if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult()); auto exec_batch = full_batch.Slice(segment.offset, segment.length); auto batch = ExecSpan(exec_batch); @@ -912,7 +918,7 @@ class GroupByNode : public ExecNode, public TracedNode { } int output_task_group_id_; - std::unique_ptr segmenter_; + std::unique_ptr segmenter_; std::vector segmenter_values_; const std::vector key_field_ids_; @@ -920,7 +926,7 @@ class GroupByNode : public ExecNode, public TracedNode { const std::vector> agg_src_types_; const std::vector> agg_src_fieldsets_; const std::vector aggs_; - std::vector agg_kernels_; + const std::vector agg_kernels_; AtomicCounter input_counter_; int total_output_batches_ = 0; diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 18d067cc65a49..a43d0655cd2f5 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -480,11 +480,11 @@ void TestGroupClassSupportedKeys() { ASSERT_RAISES(NotImplemented, GroupClass::Make({dense_union({field("", int32())})})); } -void TestSegments(std::unique_ptr& segmenter, const ExecSpan& batch, - std::vector expected_segments) { +void TestSegments(std::unique_ptr& segmenter, const ExecSpan& batch, + std::vector expected_segments) { int64_t offset = 0; for (auto expected_segment : expected_segments) { - ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset)); + ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegmentPiece(batch, offset)); ASSERT_EQ(expected_segment, segment); offset = segment.offset + segment.length; } @@ -492,11 +492,9 @@ void TestSegments(std::unique_ptr& segmenter, const ExecSpan& } // namespace -TEST(GroupingSegmenter, SupportedKeys) { - TestGroupClassSupportedKeys(); -} +TEST(RowSegmenter, SupportedKeys) { TestGroupClassSupportedKeys(); } -TEST(GroupingSegmenter, Basics) { +TEST(RowSegmenter, Basics) { std::vector bad_types2 = {int32(), float32()}; std::vector types2 = {int32(), int32()}; std::vector bad_types1 = {float32()}; @@ -507,53 +505,53 @@ TEST(GroupingSegmenter, Basics) { ExecBatch batch0({}, 3); { SCOPED_TRACE("offset"); - ASSERT_OK_AND_ASSIGN(auto segmenter, GroupingSegmenter::Make(types0)); + ASSERT_OK_AND_ASSIGN(auto segmenter, RowSegmenter::Make(types0)); ExecSpan span0(batch0); for (int64_t offset : {-1, 4}) { EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("invalid grouping segmenter offset"), - segmenter->GetNextSegment(span0, offset)); + segmenter->GetNextSegmentPiece(span0, offset)); } } { SCOPED_TRACE("types0 segmenting of batch2"); - ASSERT_OK_AND_ASSIGN(auto segmenter, GroupingSegmenter::Make(types0)); + ASSERT_OK_AND_ASSIGN(auto segmenter, RowSegmenter::Make(types0)); ExecSpan span2(batch2); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 0 "), - segmenter->GetNextSegment(span2, 0)); + segmenter->GetNextSegmentPiece(span2, 0)); ExecSpan span0(batch0); TestSegments(segmenter, span0, {{0, 3, true, true}, {3, 0, true, true}}); } { SCOPED_TRACE("bad_types1 segmenting of batch1"); - ASSERT_OK_AND_ASSIGN(auto segmenter, GroupingSegmenter::Make(bad_types1)); + ASSERT_OK_AND_ASSIGN(auto segmenter, RowSegmenter::Make(bad_types1)); ExecSpan span1(batch1); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 0 of type "), - segmenter->GetNextSegment(span1, 0)); + segmenter->GetNextSegmentPiece(span1, 0)); } { SCOPED_TRACE("types1 segmenting of batch2"); - ASSERT_OK_AND_ASSIGN(auto segmenter, GroupingSegmenter::Make(types1)); + ASSERT_OK_AND_ASSIGN(auto segmenter, RowSegmenter::Make(types1)); ExecSpan span2(batch2); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 1 "), - segmenter->GetNextSegment(span2, 0)); + segmenter->GetNextSegmentPiece(span2, 0)); ExecSpan span1(batch1); TestSegments(segmenter, span1, {{0, 2, false, true}, {2, 1, true, false}, {3, 0, true, true}}); } { SCOPED_TRACE("bad_types2 segmenting of batch2"); - ASSERT_OK_AND_ASSIGN(auto segmenter, GroupingSegmenter::Make(bad_types2)); + ASSERT_OK_AND_ASSIGN(auto segmenter, RowSegmenter::Make(bad_types2)); ExecSpan span2(batch2); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 1 of type "), - segmenter->GetNextSegment(span2, 0)); + segmenter->GetNextSegmentPiece(span2, 0)); } { SCOPED_TRACE("types2 segmenting of batch1"); - ASSERT_OK_AND_ASSIGN(auto segmenter, GroupingSegmenter::Make(types2)); + ASSERT_OK_AND_ASSIGN(auto segmenter, RowSegmenter::Make(types2)); ExecSpan span1(batch1); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 2 "), - segmenter->GetNextSegment(span1, 0)); + segmenter->GetNextSegmentPiece(span1, 0)); ExecSpan span2(batch2); TestSegments(segmenter, span2, {{0, 1, false, true}, diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc index 0f11c71ccf554..ccad89add2130 100644 --- a/cpp/src/arrow/compute/row/grouper.cc +++ b/cpp/src/arrow/compute/row/grouper.cc @@ -62,8 +62,9 @@ inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0 } template -Status CheckForGetNextSegment(const std::vector& values, int64_t length, - int64_t offset, const std::vector& key_types) { +Status CheckForGetNextSegmentPiece(const std::vector& values, int64_t length, + int64_t offset, + const std::vector& key_types) { if (offset < 0 || offset > length) { return Status::Invalid("invalid grouping segmenter offset: ", offset); } @@ -85,13 +86,13 @@ Status CheckForGetNextSegment(const std::vector& values, int64_t length, template enable_if_t::value || std::is_same::value, Status> -CheckForGetNextSegment(const Batch& batch, int64_t offset, - const std::vector& key_types) { - return CheckForGetNextSegment(batch.values, batch.length, offset, key_types); +CheckForGetNextSegmentPiece(const Batch& batch, int64_t offset, + const std::vector& key_types) { + return CheckForGetNextSegmentPiece(batch.values, batch.length, offset, key_types); } -struct BaseGroupingSegmenter : public GroupingSegmenter { - explicit BaseGroupingSegmenter(const std::vector& key_types) +struct BaseRowSegmenter : public RowSegmenter { + explicit BaseRowSegmenter(const std::vector& key_types) : key_types_(key_types) {} const std::vector& key_types() const override { return key_types_; } @@ -99,9 +100,9 @@ struct BaseGroupingSegmenter : public GroupingSegmenter { std::vector key_types_; }; -GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length, - bool extends) { - return GroupingSegment{offset, length, offset + length >= batch_length, extends}; +SegmentPiece MakeSegmentPiece(int64_t batch_length, int64_t offset, int64_t length, + bool extends) { + return SegmentPiece{offset, length, offset + length >= batch_length, extends}; } int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width, @@ -121,28 +122,29 @@ using ExtendFunc = std::function; constexpr bool kDefaultExtends = true; constexpr bool kEmptyExtends = true; -struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter { - static std::unique_ptr Make() { - return std::make_unique(); +struct NoKeysSegmenter : public BaseRowSegmenter { + static std::unique_ptr Make() { + return std::make_unique(); } - NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {} + NoKeysSegmenter() : BaseRowSegmenter({}) {} Status Reset() override { return Status::OK(); } - Result GetNextSegment(const ExecSpan& batch, int64_t offset) override { - ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {})); - return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends); + Result GetNextSegmentPiece(const ExecSpan& batch, + int64_t offset) override { + ARROW_RETURN_NOT_OK(CheckForGetNextSegmentPiece(batch, offset, {})); + return MakeSegmentPiece(batch.length, offset, batch.length - offset, kDefaultExtends); } }; -struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter { - static Result> Make(TypeHolder key_type) { - return std::make_unique(key_type); +struct SimpleKeySegmenter : public BaseRowSegmenter { + static Result> Make(TypeHolder key_type) { + return std::make_unique(key_type); } - explicit SimpleKeyGroupingSegmenter(TypeHolder key_type) - : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {} + explicit SimpleKeySegmenter(TypeHolder key_type) + : BaseRowSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {} Status CheckType(const DataType& type) { if (!is_fixed_width(type)) { @@ -168,43 +170,45 @@ struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter { return extends; } - Result GetNextSegment(const Scalar& scalar, int64_t offset, - int64_t length) { + Result GetNextSegmentPiece(const Scalar& scalar, int64_t offset, + int64_t length) { ARROW_RETURN_NOT_OK(CheckType(*scalar.type)); if (!scalar.is_valid) { return Status::Invalid("segmenting an invalid scalar"); } auto data = checked_cast(scalar).data(); bool extends = length > 0 ? Extend(data) : kEmptyExtends; - return MakeSegment(length, offset, length, extends); + return MakeSegmentPiece(length, offset, length, extends); } - Result GetNextSegment(const DataType& array_type, - const uint8_t* array_bytes, int64_t offset, - int64_t length) { + Result GetNextSegmentPiece(const DataType& array_type, + const uint8_t* array_bytes, int64_t offset, + int64_t length) { RETURN_NOT_OK(CheckType(array_type)); int64_t byte_width = array_type.byte_width(); int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width, array_bytes, offset, length); bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends; - return MakeSegment(length, offset, match_length, extends); + return MakeSegmentPiece(length, offset, match_length, extends); } - Result GetNextSegment(const ExecSpan& batch, int64_t offset) override { - ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_})); + Result GetNextSegmentPiece(const ExecSpan& batch, + int64_t offset) override { + ARROW_RETURN_NOT_OK(CheckForGetNextSegmentPiece(batch, offset, {key_type_})); if (offset == batch.length) { - return MakeSegment(batch.length, offset, 0, kEmptyExtends); + return MakeSegmentPiece(batch.length, offset, 0, kEmptyExtends); } const auto& value = batch.values[0]; if (value.is_scalar()) { - return GetNextSegment(*value.scalar, offset, batch.length); + return GetNextSegmentPiece(*value.scalar, offset, batch.length); } ARROW_DCHECK(value.is_array()); const auto& array = value.array; if (array.GetNullCount() > 0) { return Status::NotImplemented("segmenting a nullable array"); } - return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length); + return GetNextSegmentPiece(*array.type, GetValuesAsBytes(array), offset, + batch.length); } private: @@ -212,15 +216,15 @@ struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter { std::vector save_key_data_; }; -struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter { - static Result> Make( +struct AnyKeysSegmenter : public BaseRowSegmenter { + static Result> Make( const std::vector& key_types, ExecContext* ctx) { ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx)); // check types - return std::make_unique(key_types, ctx); + return std::make_unique(key_types, ctx); } - AnyKeysGroupingSegmenter(const std::vector& key_types, ExecContext* ctx) - : BaseGroupingSegmenter(key_types), + AnyKeysSegmenter(const std::vector& key_types, ExecContext* ctx) + : BaseRowSegmenter(key_types), ctx_(ctx), grouper_(nullptr), save_group_id_(kNoGroupId) {} @@ -245,7 +249,7 @@ struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter { Result MapGroupIdAt(const Batch& batch, int64_t offset) { if (!grouper_) return kNoGroupId; ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset, - /*consume_length=*/1)); + /*length=*/1)); if (!datum.is_array()) { return Status::Invalid("accessing unsupported datum kind ", datum.kind()); } @@ -257,10 +261,11 @@ struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter { return values[0]; } - Result GetNextSegment(const ExecSpan& batch, int64_t offset) override { - ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_)); + Result GetNextSegmentPiece(const ExecSpan& batch, + int64_t offset) override { + ARROW_RETURN_NOT_OK(CheckForGetNextSegmentPiece(batch, offset, key_types_)); if (offset == batch.length) { - return MakeSegment(batch.length, offset, 0, kEmptyExtends); + return MakeSegmentPiece(batch.length, offset, 0, kEmptyExtends); } // ARROW-18311: make Grouper support Reset() // so it can be cached instead of recreated below @@ -286,7 +291,7 @@ struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter { } int64_t length = std::min(cursor, batch.length - offset); bool extends = length > 0 ? bound_extend(values) : kEmptyExtends; - return MakeSegment(batch.length, offset, length, extends); + return MakeSegmentPiece(batch.length, offset, length, extends); } else { return Status::Invalid("segmenting unsupported datum kind ", datum.kind()); } @@ -311,17 +316,17 @@ Status CheckForConsume(int64_t batch_length, int64_t& consume_offset, } // namespace -Result> GroupingSegmenter::Make( +Result> RowSegmenter::Make( const std::vector& key_types, bool nullable_keys, ExecContext* ctx) { if (key_types.size() == 0) { - return NoKeysGroupingSegmenter::Make(); + return NoKeysSegmenter::Make(); } else if (!nullable_keys && key_types.size() == 1) { const DataType* type = key_types[0].type; if (type != NULLPTR && is_fixed_width(*type)) { - return SimpleKeyGroupingSegmenter::Make(key_types[0]); + return SimpleKeySegmenter::Make(key_types[0]); } } - return AnyKeysGroupingSegmenter::Make(key_types, ctx); + return AnyKeysSegmenter::Make(key_types, ctx); } namespace { @@ -329,9 +334,8 @@ namespace { struct BaseGrouper : public Grouper { using Grouper::Consume; - Result Consume(const ExecBatch& batch, int64_t consume_offset, - int64_t consume_length) override { - return Consume(ExecSpan(batch), consume_offset, consume_length); + Result Consume(const ExecBatch& batch, int64_t offset, int64_t length) override { + return Consume(ExecSpan(batch), offset, length); } }; @@ -352,14 +356,12 @@ struct GrouperNoKeysImpl : Grouper { RETURN_NOT_OK(builder->Finish(&array)); return std::move(array); } - Result Consume(const ExecSpan& batch, int64_t consume_offset, - int64_t consume_length) override { - ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(consume_length, 0)); + Result Consume(const ExecSpan& batch, int64_t offset, int64_t length) override { + ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0)); return Datum(array); } - Result Consume(const ExecBatch& batch, int64_t consume_offset, - int64_t consume_length) override { - ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(consume_length, 0)); + Result Consume(const ExecBatch& batch, int64_t offset, int64_t length) override { + ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0)); return Datum(array); } Result GetUniques() override { @@ -425,11 +427,10 @@ struct GrouperImpl : public BaseGrouper { using BaseGrouper::Consume; - Result Consume(const ExecSpan& batch, int64_t consume_offset, - int64_t consume_length) override { - ARROW_RETURN_NOT_OK(CheckForConsume(batch.length, consume_offset, &consume_length)); - if (consume_offset != 0 || consume_length != batch.length) { - auto batch_slice = batch.ToExecBatch().Slice(consume_offset, consume_length); + Result Consume(const ExecSpan& batch, int64_t offset, int64_t length) override { + ARROW_RETURN_NOT_OK(CheckForConsume(batch.length, offset, &length)); + if (offset != 0 || length != batch.length) { + auto batch_slice = batch.ToExecBatch().Slice(offset, length); return Consume(ExecSpan(batch_slice), 0, -1); } std::vector offsets_batch(batch.length + 1); @@ -606,11 +607,10 @@ struct GrouperFastImpl : public BaseGrouper { using BaseGrouper::Consume; - Result Consume(const ExecSpan& batch, int64_t consume_offset, - int64_t consume_length) override { - ARROW_RETURN_NOT_OK(CheckForConsume(batch.length, consume_offset, &consume_length)); - if (consume_offset != 0 || consume_length != batch.length) { - auto batch_slice = batch.ToExecBatch().Slice(consume_offset, consume_length); + Result Consume(const ExecSpan& batch, int64_t offset, int64_t length) override { + ARROW_RETURN_NOT_OK(CheckForConsume(batch.length, offset, &length)); + if (offset != 0 || length != batch.length) { + auto batch_slice = batch.ToExecBatch().Slice(offset, length); return Consume(ExecSpan(batch_slice), 0, -1); } // ARROW-14027: broadcast scalar arguments for now diff --git a/cpp/src/arrow/compute/row/grouper.h b/cpp/src/arrow/compute/row/grouper.h index 62a4d4258dbb0..16f753144d40b 100644 --- a/cpp/src/arrow/compute/row/grouper.h +++ b/cpp/src/arrow/compute/row/grouper.h @@ -30,23 +30,30 @@ namespace arrow { namespace compute { -/// \brief A segment of contiguous rows for grouping -struct ARROW_EXPORT GroupingSegment { +/// \brief A segment piece. +/// A segment is a chunk of continous rows that has the same segment key. (For example, +/// in ordered time series processing, segment key can be "date", and a segment can +/// be rows that belong to the same date.) A segment can span across multiple exec +/// batches. A segment piece is a chunk of continous rows that has the same segment key +/// within a given batch. When a segment span cross batches, it will have multiple segment +/// pieces. Segment piece never span cross batches. The segment piece data structure only +/// makes sense when used along with a exec batch. +struct ARROW_EXPORT SegmentPiece { /// \brief the offset into the batch where the segment starts int64_t offset; /// \brief the length of the segment int64_t length; - /// \brief whether the segment may be extended by a next one + /// \brief whether the segment piece may be extended by a next one bool is_open; - /// \brief whether the segment extends a preceeding one + /// \brief whether the segment piece extends a preceeding one bool extends; }; -inline bool operator==(const GroupingSegment& segment1, const GroupingSegment& segment2) { +inline bool operator==(const SegmentPiece& segment1, const SegmentPiece& segment2) { return segment1.offset == segment2.offset && segment1.length == segment2.length && segment1.is_open == segment2.is_open && segment1.extends == segment2.extends; } -inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& segment2) { +inline bool operator!=(const SegmentPiece& segment1, const SegmentPiece& segment2) { return !(segment1 == segment2); } @@ -69,28 +76,28 @@ inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& s /// /// If the next call to the segmenter starts with `A A` then that segment would set the /// "extends" flag, which indicates whether the segment continues the last open batch. -class ARROW_EXPORT GroupingSegmenter { +class ARROW_EXPORT RowSegmenter { public: - virtual ~GroupingSegmenter() = default; + virtual ~RowSegmenter() = default; /// \brief Construct a GroupingSegmenter which segments on the specified key types /// /// \param[in] key_types the specified key types /// \param[in] nullable_keys whether values of the specified keys may be null /// \param[in] ctx the execution context to use - static Result> Make( + static Result> Make( const std::vector& key_types, bool nullable_keys = false, ExecContext* ctx = default_exec_context()); /// \brief Return the key types of this segmenter virtual const std::vector& key_types() const = 0; - /// \brief Reset this grouping segmenter + /// \brief Reset this segmenter virtual Status Reset() = 0; - /// \brief Get the next segment for the given batch starting from the given offset - virtual Result GetNextSegment(const ExecSpan& batch, - int64_t offset) = 0; + /// \brief Get the next segment piece for the given batch starting from the given offset + virtual Result GetNextSegmentPiece(const ExecSpan& batch, + int64_t offset) = 0; }; /// Consumes batches of keys and yields batches of the group ids. @@ -106,15 +113,15 @@ class ARROW_EXPORT Grouper { /// over a slice defined by an offset and length, which defaults to the batch length. /// Currently only uint32 indices will be produced, eventually the bit width will only /// be as wide as necessary. - virtual Result Consume(const ExecSpan& batch, int64_t consume_offset = 0, - int64_t consume_length = -1) = 0; + virtual Result Consume(const ExecSpan& batch, int64_t offset = 0, + int64_t length = -1) = 0; /// Consume a batch of keys, producing the corresponding group ids as an integer array, /// over a slice defined by an offset and length, which defaults to the batch length. /// Currently only uint32 indices will be produced, eventually the bit width will only /// be as wide as necessary. - virtual Result Consume(const ExecBatch& batch, int64_t consume_offset = 0, - int64_t consume_length = -1) = 0; + virtual Result Consume(const ExecBatch& batch, int64_t offset = 0, + int64_t length = -1) = 0; /// Get current unique keys. May be called multiple times. virtual Result GetUniques() = 0; diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 33b4e23376c28..e2d244ab9a84d 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 33b4e23376c28e489c6a08b9207829b29e4bffb8 +Subproject commit e2d244ab9a84d382e3a50f55db41f362e450428b diff --git a/testing b/testing index d2c73bf782463..ecab1162cbec8 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d2c73bf78246331d8e58b6f11aa8aa199cbb5929 +Subproject commit ecab1162cbec872e17d949ecc86181670aee045c