From a616578994991a97a753e9599a25413bf448a8ca Mon Sep 17 00:00:00 2001 From: "Rossi(Ruoxi) Sun" Date: Thu, 21 Dec 2023 14:14:45 -0800 Subject: [PATCH] GH-32570: [C++] Fix the issue of `ExecBatchBuilder` when appending consecutive tail rows with the same id may exceed buffer boundary (#39234) ### Rationale for this change Addressed in https://github.com/apache/arrow/issues/32570#issuecomment-1856473812 ### What changes are included in this PR? 1. Skip consecutive rows with the same id when calculating rows to skip when appending to `ExecBatchBuilder`. 2. Fix the bug that column offset is neglected when calculating rows to skip. ### Are these changes tested? Yes. New UT included and the change is also protected by the existing case mentioned in the issue. ### Are there any user-facing changes? No. **This PR contains a "Critical Fix".** Because #32570 is labeled critical, and causes a crash even when the API contract is upheld. * Closes: #32570 Authored-by: zanmato Signed-off-by: Antoine Pitrou --- cpp/src/arrow/compute/light_array.cc | 7 ++++-- cpp/src/arrow/compute/light_array.h | 4 +++- cpp/src/arrow/compute/light_array_test.cc | 26 +++++++++++++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 4e8b2b2d7cc3a..93a054de1957c 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -398,9 +398,12 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, } else { --num_rows_left; int row_id_removed = row_ids[num_rows_left]; - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()); + const int32_t* offsets = column->GetValues(1); num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; + // Skip consecutive rows with the same id + while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) { + --num_rows_left; + } } } diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 87f6b6c76a12c..84aa86d64bb62 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -416,7 +416,9 @@ class ARROW_EXPORT ExecBatchBuilder { // without checking buffer bounds (useful with SIMD or fixed size memory loads // and stores). // - // The sequence of row_ids provided must be non-decreasing. + // The sequence of row_ids provided must be non-decreasing. In case of consecutive rows + // with the same row id, they are skipped all at once because they occupy the same + // space. // static int NumRowsToSkip(const std::shared_ptr& column, int num_rows, const uint16_t* row_ids, int num_tail_bytes_to_skip); diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 4e33f7b578ea8..52121530fe91d 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -471,6 +471,32 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) { ASSERT_EQ(0, pool->bytes_allocated()); } +TEST(ExecBatchBuilder, AppendBatchDupRows) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + // Case of cross-word copying for the last row, which may exceed the buffer boundary. + // This is a simplified case of GH-32570 + { + // 64-byte data fully occupying one minimal 64-byte aligned memory region. + ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["ABCDEF0"], + ["123456789"]])"); // 9-byte tail row, larger than a word. + ASSERT_EQ(batch_string[0].array()->buffers[1]->capacity(), 64); + ASSERT_EQ(batch_string[0].array()->buffers[2]->capacity(), 64); + ExecBatchBuilder builder; + uint16_t row_ids[2] = {4, 4}; + ASSERT_OK(builder.AppendSelected(pool, batch_string, 2, row_ids, /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ExecBatch batch_string_appended = + JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])"); + ASSERT_EQ(batch_string_appended, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + TEST(ExecBatchBuilder, AppendBatchesSomeCols) { std::unique_ptr owned_pool = MemoryPool::CreateDefault(); MemoryPool* pool = owned_pool.get();