Skip to content

Commit

Permalink
GH-36708: [C++] Fully calculate null-counts so the REE allocations ma…
Browse files Browse the repository at this point in the history
…ke sense (#36740)

### Rationale for this change

When `has_validity_buffer` is true, we expect validity buffers to be allocated, but if null_count is calculated and ends up being 0, `ArrayData::Make()` will sneakily remove the validity buffer from the physical array for us and the assumption that it exists stops holding and causes a crash.

Forcing `null_count` calculation with `input.GetNullCount()` ensures `has_validity_buffer` won't be `true` if the `null_count` on the input ends up being 0.

### What changes are included in this PR?

The fix and tests to reproduce it.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: #36708

Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
felipecrv authored Jul 22, 2023
1 parent b557e85 commit a2a3b95
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 59 deletions.
18 changes: 10 additions & 8 deletions cpp/src/arrow/compute/kernels/ree_util_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(

Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, int64_t length,
int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
MemoryPool* pool, int64_t data_buffer_size) {
std::vector<std::shared_ptr<Buffer>> values_data_buffers;
std::shared_ptr<Buffer> validity_buffer = NULLPTR;
if (has_validity_buffer) {
Expand All @@ -79,20 +79,22 @@ Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
} else {
values_data_buffers = {std::move(validity_buffer), std::move(values_buffer)};
}
return ArrayData::Make(value_type, length, std::move(values_data_buffers), null_count);
auto data = ArrayData::Make(value_type, length, std::move(values_data_buffers),
kUnknownNullCount);
DCHECK(!(has_validity_buffer && length > 0) || data->buffers[0]);
return data;
}

Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
int64_t logical_length, int64_t physical_length, int64_t physical_null_count,
MemoryPool* pool, int64_t data_buffer_size) {
int64_t logical_length, int64_t physical_length, MemoryPool* pool,
int64_t data_buffer_size) {
ARROW_ASSIGN_OR_RAISE(
auto run_ends_data,
PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, pool));
ARROW_ASSIGN_OR_RAISE(
auto values_data,
PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, physical_length,
physical_null_count, pool, data_buffer_size));
ARROW_ASSIGN_OR_RAISE(auto values_data, PreallocateValuesArray(
ree_type->value_type(), has_validity_buffer,
physical_length, pool, data_buffer_size));

return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
{std::move(run_ends_data), std::move(values_data)},
Expand Down
27 changes: 24 additions & 3 deletions cpp/src/arrow/compute/kernels/ree_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,18 +333,39 @@ Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
MemoryPool* pool);

/// \brief Preallocate the physical values array for a run-end encoded array
///
/// data_buffer_size is passed here pre-calculated so this function doesn't have
/// to be template-specialized for each type.
///
/// The null_count is left as kUnknownNullCount (or 0 if length is 0) and, if
/// after writing the values, the caller knows the null count, it can be set.
///
/// \post if has_validity_buffer and length > 0, then data.buffer[0] != NULLPTR
///
/// \param has_validity_buffer a validity buffer must be allocated
/// \param length the length of the values array
/// \param data_buffer_size the size of the data buffer for string and binary types
Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, int64_t length,
int64_t null_count, MemoryPool* pool, int64_t data_buffer_size);
MemoryPool* pool, int64_t data_buffer_size);

/// \brief Preallocate the ArrayData for the run-end encoded version
/// of the flat input array
///
/// The top-level null_count is set to 0 (REEs keep all the data in child
/// arrays). The null_count of the values array (child_data[1]) is left as
/// kUnknownNullCount (or 0 if physical_length is 0) and, if after writing
/// the values, the caller knows the null count, it can be set.
///
/// \post if has_validity_buffer and physical_length > 0, then
/// data.child_data[1].buffer[0] != NULLPTR
///
/// \param data_buffer_size the size of the data buffer for string and binary types
Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
int64_t logical_length, int64_t physical_length, int64_t physical_null_count,
MemoryPool* pool, int64_t data_buffer_size);
int64_t logical_length, int64_t physical_length, MemoryPool* pool,
int64_t data_buffer_size);

/// \brief Writes a single run-end to the first slot of the pre-allocated
/// run-end encoded array in out
Expand Down
25 changes: 16 additions & 9 deletions cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ class RunEndEncodeImpl {
ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
ree_util::PreallocateREEArray(std::move(ree_type), has_validity_buffer,
input_length, 0, 0, ctx_->memory_pool(), 0));
/*logical_length=*/input_length,
/*physical_length=*/0, ctx_->memory_pool(),
/*data_buffer_size=*/0));
output_->value = std::move(output_array_data);
return Status::OK();
}
Expand All @@ -196,17 +198,22 @@ class RunEndEncodeImpl {
/*output_run_ends=*/NULLPTR);
std::tie(num_valid_runs, num_output_runs, data_buffer_size) =
counting_loop.CountNumberOfRuns();
const auto physical_null_count = num_output_runs - num_valid_runs;
DCHECK(!has_validity_buffer || physical_null_count > 0)
<< "has_validity_buffer is expected to imply physical_null_count > 0";

ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
ree_util::PreallocateREEArray(
std::move(ree_type), has_validity_buffer, input_length, num_output_runs,
num_output_runs - num_valid_runs, ctx_->memory_pool(), data_buffer_size));
std::move(ree_type), has_validity_buffer, /*logical_length=*/input_length,
/*physical_length=*/num_output_runs, ctx_->memory_pool(), data_buffer_size));

// Initialize the output pointers
auto* output_run_ends =
output_array_data->child_data[0]->template GetMutableValues<RunEndCType>(1, 0);
auto* output_values_array_data = output_array_data->child_data[1].get();
// Set the null_count on the physical array
output_values_array_data->null_count = physical_null_count;

// Second pass: write the runs
RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer> writing_loop(
Expand Down Expand Up @@ -254,7 +261,7 @@ struct RunEndEncodeExec {
return RunEndEncodeNullArray(TypeTraits<RunEndType>::type_singleton(), ctx,
input_array, result);
} else {
const bool has_validity_buffer = input_array.MayHaveNulls();
const bool has_validity_buffer = input_array.GetNullCount() > 0;
if (has_validity_buffer) {
return RunEndEncodeImpl<RunEndType, ValueType, true>(ctx, input_array, result)
.Exec();
Expand Down Expand Up @@ -398,10 +405,10 @@ class RunEndDecodeImpl {
}
}

ARROW_ASSIGN_OR_RAISE(auto output_array_data,
ree_util::PreallocateValuesArray(
ree_type->value_type(), has_validity_buffer, length,
kUnknownNullCount, ctx_->memory_pool(), data_buffer_size));
ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
ree_util::PreallocateValuesArray(ree_type->value_type(), has_validity_buffer,
length, ctx_->memory_pool(), data_buffer_size));

int64_t output_null_count = 0;
if (length > 0) {
Expand Down Expand Up @@ -435,7 +442,7 @@ struct RunEndDecodeExec {
return RunEndDecodeNullREEArray(ctx, input_array, result);
} else {
const bool has_validity_buffer =
arrow::ree_util::ValuesArray(input_array).MayHaveNulls();
arrow::ree_util::ValuesArray(input_array).GetNullCount() > 0;
if (has_validity_buffer) {
return RunEndDecodeImpl<RunEndType, ValueType, true>(ctx, input_array, result)
.Exec();
Expand Down
102 changes: 63 additions & 39 deletions cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ struct REETestData {
std::vector<std::string> inputs_json,
std::vector<std::string> expected_values_json,
std::vector<std::string> expected_run_ends_json,
int64_t input_offset = 0) {
int64_t input_offset = 0,
bool force_validity_bitmap = false) {
std::vector<std::shared_ptr<Array>> inputs;
inputs.reserve(inputs_json.size());
for (const auto& input_json : inputs_json) {
inputs.push_back(ArrayFromJSON(data_type, input_json));
auto chunk = ArrayFromJSON(data_type, input_json);
auto& data = chunk->data();
if (force_validity_bitmap && !data->HasValidityBitmap()) {
EXPECT_OK_AND_ASSIGN(auto validity, AllocateBitmap(data->length));
memset(validity->mutable_data(), 0xFF, validity->size());
data->buffers[0] = std::move(validity);
}
inputs.push_back(std::move(chunk));
}
auto chunked_input = std::make_shared<ChunkedArray>(std::move(inputs));

Expand Down Expand Up @@ -165,47 +173,52 @@ class TestRunEndEncodeDecode : public ::testing::TestWithParam<
DCHECK(datum.is_chunked_array());
return datum.chunked_array();
}
};

TEST_P(TestRunEndEncodeDecode, EncodeDecodeArray) {
auto [data, run_end_type] = GetParam();

ASSERT_OK_AND_ASSIGN(
Datum encoded_datum,
RunEndEncode(data.InputDatum(), RunEndEncodeOptions{run_end_type}));

auto encoded = AsChunkedArray(encoded_datum);
ASSERT_OK(encoded->ValidateFull());
ASSERT_EQ(data.input->length(), encoded->length());
void TestEncodeDecodeArray(REETestData& data,
const std::shared_ptr<DataType>& run_end_type) {
ASSERT_OK_AND_ASSIGN(
Datum encoded_datum,
RunEndEncode(data.InputDatum(), RunEndEncodeOptions{run_end_type}));

auto encoded = AsChunkedArray(encoded_datum);
ASSERT_OK(encoded->ValidateFull());
ASSERT_EQ(data.input->length(), encoded->length());

for (int i = 0; i < encoded->num_chunks(); i++) {
auto& chunk = encoded->chunk(i);
auto run_ends_array = MakeArray(chunk->data()->child_data[0]);
auto values_array = MakeArray(chunk->data()->child_data[1]);
ASSERT_OK(chunk->ValidateFull());
ASSERT_ARRAYS_EQUAL(*ArrayFromJSON(run_end_type, data.expected_run_ends_json[i]),
*run_ends_array);
ASSERT_ARRAYS_EQUAL(*values_array, *data.expected_values[i]);
ASSERT_EQ(chunk->data()->buffers.size(), 1);
ASSERT_EQ(chunk->data()->buffers[0], NULLPTR);
ASSERT_EQ(chunk->data()->child_data.size(), 2);
ASSERT_EQ(run_ends_array->data()->buffers[0], NULLPTR);
ASSERT_EQ(run_ends_array->length(), data.expected_values[i]->length());
ASSERT_EQ(run_ends_array->offset(), 0);
ASSERT_EQ(chunk->data()->length, data.input->chunk(i)->length());
ASSERT_EQ(chunk->data()->offset, 0);
ASSERT_EQ(*chunk->data()->type,
RunEndEncodedType(run_end_type, data.input->type()));
ASSERT_EQ(chunk->data()->null_count, 0);
}

for (int i = 0; i < encoded->num_chunks(); i++) {
auto& chunk = encoded->chunk(i);
auto run_ends_array = MakeArray(chunk->data()->child_data[0]);
auto values_array = MakeArray(chunk->data()->child_data[1]);
ASSERT_OK(chunk->ValidateFull());
ASSERT_ARRAYS_EQUAL(*ArrayFromJSON(run_end_type, data.expected_run_ends_json[i]),
*run_ends_array);
ASSERT_ARRAYS_EQUAL(*values_array, *data.expected_values[i]);
ASSERT_EQ(chunk->data()->buffers.size(), 1);
ASSERT_EQ(chunk->data()->buffers[0], NULLPTR);
ASSERT_EQ(chunk->data()->child_data.size(), 2);
ASSERT_EQ(run_ends_array->data()->buffers[0], NULLPTR);
ASSERT_EQ(run_ends_array->length(), data.expected_values[i]->length());
ASSERT_EQ(run_ends_array->offset(), 0);
ASSERT_EQ(chunk->data()->length, data.input->chunk(i)->length());
ASSERT_EQ(chunk->data()->offset, 0);
ASSERT_EQ(*chunk->data()->type, RunEndEncodedType(run_end_type, data.input->type()));
ASSERT_EQ(chunk->data()->null_count, 0);
ASSERT_OK_AND_ASSIGN(Datum decoded_datum, data.chunked
? RunEndDecode(encoded)
: RunEndDecode(encoded->chunk(0)));
auto decoded = AsChunkedArray(decoded_datum);
ASSERT_OK(decoded->ValidateFull());
for (int i = 0; i < decoded->num_chunks(); i++) {
ASSERT_ARRAYS_EQUAL(*decoded->chunk(i), *data.input->chunk(i));
}
}
};

ASSERT_OK_AND_ASSIGN(Datum decoded_datum, data.chunked
? RunEndDecode(encoded)
: RunEndDecode(encoded->chunk(0)));
auto decoded = AsChunkedArray(decoded_datum);
ASSERT_OK(decoded->ValidateFull());
for (int i = 0; i < decoded->num_chunks(); i++) {
ASSERT_ARRAYS_EQUAL(*decoded->chunk(i), *data.input->chunk(i));
}
TEST_P(TestRunEndEncodeDecode, EncodeDecodeArray) {
auto [data, run_end_type] = GetParam();
TestEncodeDecodeArray(data, run_end_type);
}

// Encoding an input with an offset results in a completely new encoded array without an
Expand Down Expand Up @@ -254,6 +267,17 @@ TEST_P(TestRunEndEncodeDecode, DecodeWithOffset) {
}
}

// GH-36708
TEST_P(TestRunEndEncodeDecode, InputWithValidityAndNoNulls) {
auto data =
REETestData::JSONChunked(int32(),
/*inputs=*/{"[1, 1, 2, 2, 2, 3]", "[4, 5, 5, 5, 6, 6]"},
/*expected_values=*/{"[1, 2, 3]", "[4, 5, 6]"},
/*expected_run_ends=*/{"[2, 5, 6]", "[1, 4, 6]"},
/*input_offset=*/0, /*force_validity_bitmap=*/true);
TestEncodeDecodeArray(data, int32());
}

// This test creates an run-end encoded array with an offset in the child array, which
// removes the first run in the test data. It's no-op for chunked input.
TEST_P(TestRunEndEncodeDecode, DecodeWithOffsetInChildArray) {
Expand Down

0 comments on commit a2a3b95

Please sign in to comment.