-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-37895: [C++] Feature: support concatenate recordbatches. #37896
Conversation
|
cpp/src/arrow/record_batch.cc
Outdated
if (cols == 0) { | ||
// special case: null batch, no data, just length | ||
for (size_t i = 0; i < batches.size(); ++i) { | ||
length += batches[i]->num_rows(); | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need a special case for zero columns:
if (cols == 0) { | |
// special case: null batch, no data, just length | |
for (size_t i = 0; i < batches.size(); ++i) { | |
length += batches[i]->num_rows(); | |
} | |
} else { | |
// special case: null batch, no data, just length | |
for (size_t i = 0; i < batches.size(); ++i) { | |
length += batches[i]->num_rows(); | |
} |
Then the loop for (int col = 0; col < cols; ++col)
will just never be entered if cols == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, My pseudocode is below, with the else branch
if (cols == 0) {
// special case: null batch, no data, just length
for (size_t i = 0; i < batches.size(); ++i) {
length += batches[i]->num_rows();
}
} else {
for (int col = 0; col < cols; ++col)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why the special case of column 0 is processed here is that if we want to implement count(*), then this RecordBatch does not store data, that is, only the length. This scenario is triggered when small batches need to be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the following change make any difference on the result?
Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
const RecordBatchVector& batches, MemoryPool* pool) {
int64_t length = 0;
size_t n = batches.size();
if (n == 0) {
return Status::Invalid("Must pass at least one recordbatch");
}
int cols = batches[0]->num_columns();
auto schema = batches[0]->schema();
for (int i = 0; i < batches.size(); ++i) {
length += batches[i]->num_rows();
if (!schema->Equals(batches[i]->schema())) {
return Status::Invalid(
"Schema of RecordBatch index ", i, " is ", batches[i]->schema()->ToString(),
", which does not match index 0 recordbatch schema: ", schema->ToString());
}
}
std::vector<std::shared_ptr<Array>> concatenated_columns;
for (int col = 0; col < cols; ++col) {
ArrayVector column_arrays;
for (const auto & batch : batches) {
column_arrays.emplace_back(std::move(batch->column(col)));
}
ARROW_ASSIGN_OR_RAISE(auto concatenated_column, Concatenate(column_arrays, pool))
concatenated_columns.emplace_back(std::move(concatenated_column));
}
return RecordBatch::Make(std::move(schema), length, std::move(concatenated_columns));
}
I assumed:
- Schema checking can be moved out of the nested loop.
- The result length is just the sum of all batch lengths, no matter how many (or zero) columns there are.
- Each column has the same length in a RecordBatch, so no checking of column length is needed here.
- Variable names like
column_data
,data
,columns
are ambiguous so I changed them to more descriptive names. - More data can be std::moved to avoid copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks very refreshing, but it does not handle the case of cols = 0. In this case, cols = 0, length is not obtained.
For a batch, there may be cols = 0, the schema is empty, but there is length. This kind of batch is used to process count(*) and does not store the actual columns, only the number of rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int i = 0; i < batches.size(); ++i) {
length += batches[i]->num_rows();
...
}
Doesn't this include the cols=0 case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, you are correct, it is handled here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the modification. This code modification has been submitted.
note:
column_arrays.emplace_back(std::move(batch->column(col)));
Change to
column_arrays.emplace_back(batch->column(col));
7bef38b
to
cd8cf5a
Compare
0bea160
to
0ebf614
Compare
cpp/src/arrow/record_batch.cc
Outdated
if (cols == 0) { | ||
// special case: null batch, no data, just length | ||
for (size_t i = 0; i < batches.size(); ++i) { | ||
length += batches[i]->num_rows(); | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the following change make any difference on the result?
Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
const RecordBatchVector& batches, MemoryPool* pool) {
int64_t length = 0;
size_t n = batches.size();
if (n == 0) {
return Status::Invalid("Must pass at least one recordbatch");
}
int cols = batches[0]->num_columns();
auto schema = batches[0]->schema();
for (int i = 0; i < batches.size(); ++i) {
length += batches[i]->num_rows();
if (!schema->Equals(batches[i]->schema())) {
return Status::Invalid(
"Schema of RecordBatch index ", i, " is ", batches[i]->schema()->ToString(),
", which does not match index 0 recordbatch schema: ", schema->ToString());
}
}
std::vector<std::shared_ptr<Array>> concatenated_columns;
for (int col = 0; col < cols; ++col) {
ArrayVector column_arrays;
for (const auto & batch : batches) {
column_arrays.emplace_back(std::move(batch->column(col)));
}
ARROW_ASSIGN_OR_RAISE(auto concatenated_column, Concatenate(column_arrays, pool))
concatenated_columns.emplace_back(std::move(concatenated_column));
}
return RecordBatch::Make(std::move(schema), length, std::move(concatenated_columns));
}
I assumed:
- Schema checking can be moved out of the nested loop.
- The result length is just the sum of all batch lengths, no matter how many (or zero) columns there are.
- Each column has the same length in a RecordBatch, so no checking of column length is needed here.
- Variable names like
column_data
,data
,columns
are ambiguous so I changed them to more descriptive names. - More data can be std::moved to avoid copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Thanks! But it will need @bkietz's approval before merging.
I think the C++ Windows Checks are failing due to this change but I couldn't figure out why... |
+1, I am also very confused, mac and linux are compiled correctly But Windows is
|
Found that ARROW_EXPORT is missing, this should be the reason |
CI failure seems unrelated |
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit c5bce96. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 8 possible false positives for unstable benchmarks that are known to sometimes produce them. |
…ache#37896) ### Rationale for this change User scenario: When we use acero plan, many smaller batches may be generated through agg and hashjoin. In addition, due to the mpp database, there is data distribution. When there are many segments, each segment data is compared at this time. Small, in order to improve performance, we hope to merge multiple fragmented small batches into one large batch for calculation together. ### What changes are included in this PR? record_batch.cc record_batch.h record_batch_test.cc ### Are these changes tested? yes, see record_batch_test.cc ### Are there any user-facing changes? yes * Closes: apache#37895 Authored-by: light-city <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
…ache#37896) ### Rationale for this change User scenario: When we use acero plan, many smaller batches may be generated through agg and hashjoin. In addition, due to the mpp database, there is data distribution. When there are many segments, each segment data is compared at this time. Small, in order to improve performance, we hope to merge multiple fragmented small batches into one large batch for calculation together. ### What changes are included in this PR? record_batch.cc record_batch.h record_batch_test.cc ### Are these changes tested? yes, see record_batch_test.cc ### Are there any user-facing changes? yes * Closes: apache#37895 Authored-by: light-city <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
…ache#37896) ### Rationale for this change User scenario: When we use acero plan, many smaller batches may be generated through agg and hashjoin. In addition, due to the mpp database, there is data distribution. When there are many segments, each segment data is compared at this time. Small, in order to improve performance, we hope to merge multiple fragmented small batches into one large batch for calculation together. ### What changes are included in this PR? record_batch.cc record_batch.h record_batch_test.cc ### Are these changes tested? yes, see record_batch_test.cc ### Are there any user-facing changes? yes * Closes: apache#37895 Authored-by: light-city <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
Rationale for this change
User scenario: When we use acero plan, many smaller batches may be generated through agg and hashjoin. In addition, due to the mpp database, there is data distribution. When there are many segments, each segment data is compared at this time. Small, in order to improve performance, we hope to merge multiple fragmented small batches into one large batch for calculation together.
What changes are included in this PR?
record_batch.cc
record_batch.h
record_batch_test.cc
Are these changes tested?
yes, see record_batch_test.cc
Are there any user-facing changes?
yes