Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40020: [C++] Change offset types to signed in row table related structures and APIs #39685

Closed
wants to merge 13 commits into from
39 changes: 19 additions & 20 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace acero {

int RowArrayAccessor::VarbinaryColumnId(const RowTableMetadata& row_metadata,
int column_id) {
ARROW_DCHECK(row_metadata.num_cols() > static_cast<uint32_t>(column_id));
ARROW_DCHECK(row_metadata.num_cols() > column_id);
ARROW_DCHECK(!row_metadata.is_fixed_length);
ARROW_DCHECK(!row_metadata.column_metadatas[column_id].is_fixed_length);

Expand Down Expand Up @@ -79,7 +79,7 @@ int RowArrayAccessor::NumRowsToSkip(const RowTableImpl& rows, int column_id, int

// Find the length of the requested varying length field in that row
//
uint32_t field_offset_within_row, field_length;
int32_t field_offset_within_row, field_length;
if (varbinary_column_id == 0) {
rows.metadata().first_varbinary_offset_and_length(
row_ptr, &field_offset_within_row, &field_length);
Expand All @@ -94,10 +94,9 @@ int RowArrayAccessor::NumRowsToSkip(const RowTableImpl& rows, int column_id, int
} else {
// Fixed length column
//
uint32_t field_length = rows.metadata().column_metadatas[column_id].fixed_length;
uint32_t num_bytes_skipped = 0;
while (num_rows_left > 0 &&
num_bytes_skipped < static_cast<uint32_t>(num_tail_bytes_to_skip)) {
int32_t field_length = rows.metadata().column_metadatas[column_id].fixed_length;
int32_t num_bytes_skipped = 0;
while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
num_bytes_skipped += field_length;
--num_rows_left;
}
Expand All @@ -122,8 +121,8 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
if (!is_fixed_length_column) {
int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id);
const uint8_t* row_ptr_base = rows.data(2);
const uint32_t* row_offsets = rows.offsets();
uint32_t field_offset_within_row, field_length;
const int32_t* row_offsets = rows.offsets();
int32_t field_offset_within_row, field_length;

if (varbinary_column_id == 0) {
// Case 1: This is the first varbinary column
Expand All @@ -149,15 +148,15 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
}

if (is_fixed_length_column) {
uint32_t field_offset_within_row = rows.metadata().encoded_field_offset(
int32_t field_offset_within_row = rows.metadata().encoded_field_offset(
rows.metadata().pos_after_encoding(column_id));
uint32_t field_length = rows.metadata().column_metadatas[column_id].fixed_length;
int32_t field_length = rows.metadata().column_metadatas[column_id].fixed_length;
// Bit column is encoded as a single byte
//
if (field_length == 0) {
field_length = 1;
}
uint32_t row_length = rows.metadata().fixed_length;
int32_t row_length = rows.metadata().fixed_length;

bool is_fixed_length_row = rows.metadata().is_fixed_length;
if (is_fixed_length_row) {
Expand All @@ -173,7 +172,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
// Case 4: This is a fixed length column in a varying length row
//
const uint8_t* row_ptr_base = rows.data(2) + field_offset_within_row;
const uint32_t* row_offsets = rows.offsets();
const int32_t* row_offsets = rows.offsets();
for (int i = 0; i < num_rows; ++i) {
uint32_t row_id = row_ids[i];
const uint8_t* row_ptr = row_ptr_base + row_offsets[row_id];
Expand Down Expand Up @@ -269,7 +268,7 @@ Status RowArray::DecodeSelected(ResizableArrayData* output, int column_id,
ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata column_metadata, output->column_metadata());

if (column_metadata.is_fixed_length) {
uint32_t fixed_length = column_metadata.fixed_length;
int32_t fixed_length = column_metadata.fixed_length;
switch (fixed_length) {
case 0:
RowArrayAccessor::Visit(rows_, column_id, num_rows_to_append, row_ids,
Expand Down Expand Up @@ -375,7 +374,7 @@ void RowArray::DebugPrintToFile(const char* filename, bool print_sorted) const {
}

for (int64_t row_id = 0; row_id < rows_.length(); ++row_id) {
for (uint32_t column_id = 0; column_id < rows_.metadata().num_cols(); ++column_id) {
for (int column_id = 0; column_id < rows_.metadata().num_cols(); ++column_id) {
bool is_null;
uint32_t row_id_cast = static_cast<uint32_t>(row_id);
RowArrayAccessor::VisitNulls(rows_, column_id, 1, &row_id_cast,
Expand Down Expand Up @@ -493,11 +492,11 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target,
num_rows = 0;
num_bytes = 0;
for (size_t i = 0; i < sources.size(); ++i) {
target->rows_.mutable_offsets()[num_rows] = static_cast<uint32_t>(num_bytes);
target->rows_.mutable_offsets()[num_rows] = static_cast<int32_t>(num_bytes);
num_rows += sources[i]->rows_.length();
num_bytes += sources[i]->rows_.offsets()[sources[i]->rows_.length()];
}
target->rows_.mutable_offsets()[num_rows] = static_cast<uint32_t>(num_bytes);
target->rows_.mutable_offsets()[num_rows] = static_cast<int32_t>(num_bytes);
}

return Status::OK();
Expand Down Expand Up @@ -565,15 +564,15 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t first_target_row_offset,
const int64_t* source_rows_permutation) {
int64_t num_source_rows = source.length();
uint32_t* target_offsets = target->mutable_offsets();
const uint32_t* source_offsets = source.offsets();
int32_t* target_offsets = target->mutable_offsets();
const int32_t* source_offsets = source.offsets();

// Permutation of source rows is optional.
//
if (!source_rows_permutation) {
int64_t target_row_offset = first_target_row_offset;
for (int64_t i = 0; i < num_source_rows; ++i) {
target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = static_cast<int32_t>(target_row_offset);
target_row_offset += source_offsets[i + 1] - source_offsets[i];
}
// We purposefully skip outputting of N+1 offset, to allow concurrent
Expand Down Expand Up @@ -604,7 +603,7 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
*target_row_ptr++ = *source_row_ptr++;
}

target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = static_cast<int32_t>(target_row_offset);
target_row_offset += length;
}
}
Expand Down
13 changes: 5 additions & 8 deletions cpp/src/arrow/acero/swiss_join_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu
if (!is_fixed_length_column) {
int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id);
const uint8_t* row_ptr_base = rows.data(2);
const uint32_t* row_offsets = rows.offsets();
const int32_t* row_offsets = rows.offsets();

if (varbinary_column_id == 0) {
// Case 1: This is the first varbinary column
Expand All @@ -56,8 +56,7 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu
for (int i = 0; i < num_rows / unroll; ++i) {
__m256i row_id =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(row_ids) + i);
__m256i row_offset = _mm256_i32gather_epi32(
reinterpret_cast<const int*>(row_offsets), row_id, sizeof(uint32_t));
__m256i row_offset = _mm256_i32gather_epi32(row_offsets, row_id, sizeof(int32_t));
__m256i field_length = _mm256_sub_epi32(
_mm256_i32gather_epi32(
reinterpret_cast<const int*>(row_ptr_base),
Expand All @@ -78,8 +77,7 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu
for (int i = 0; i < num_rows / unroll; ++i) {
__m256i row_id =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(row_ids) + i);
__m256i row_offset = _mm256_i32gather_epi32(
reinterpret_cast<const int*>(row_offsets), row_id, sizeof(uint32_t));
__m256i row_offset = _mm256_i32gather_epi32(row_offsets, row_id, sizeof(int32_t));
__m256i end_array_offset =
_mm256_add_epi32(row_offset, varbinary_end_array_offset);

Expand Down Expand Up @@ -140,12 +138,11 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu
// Case 4: This is a fixed length column in varying length row
//
const uint8_t* row_ptr_base = rows.data(2);
const uint32_t* row_offsets = rows.offsets();
const int32_t* row_offsets = rows.offsets();
for (int i = 0; i < num_rows / unroll; ++i) {
__m256i row_id =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(row_ids) + i);
__m256i row_offset = _mm256_i32gather_epi32(
reinterpret_cast<const int*>(row_offsets), row_id, sizeof(uint32_t));
__m256i row_offset = _mm256_i32gather_epi32(row_offsets, row_id, sizeof(int32_t));
__m256i field_offset = _mm256_add_epi32(row_offset, field_offset_within_row);
process_8_values_fn(i * unroll, row_ptr_base, field_offset, field_length);
}
Expand Down
38 changes: 18 additions & 20 deletions cpp/src/arrow/compute/key_hash_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void Hashing32::HashVarLenImp(uint32_t num_rows, const T* offsets,
}

void Hashing32::HashVarLen(int64_t hardware_flags, bool combine_hashes, uint32_t num_rows,
const uint32_t* offsets, const uint8_t* concatenated_keys,
const int32_t* offsets, const uint8_t* concatenated_keys,
uint32_t* hashes, uint32_t* hashes_temp_for_combine) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_RUNTIME_AVX2)
Expand All @@ -245,16 +245,16 @@ void Hashing32::HashVarLen(int64_t hardware_flags, bool combine_hashes, uint32_t
}
#endif
if (combine_hashes) {
HashVarLenImp<uint32_t, true>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
HashVarLenImp<int32_t, true>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
} else {
HashVarLenImp<uint32_t, false>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
HashVarLenImp<int32_t, false>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
}
}

void Hashing32::HashVarLen(int64_t hardware_flags, bool combine_hashes, uint32_t num_rows,
const uint64_t* offsets, const uint8_t* concatenated_keys,
const int64_t* offsets, const uint8_t* concatenated_keys,
uint32_t* hashes, uint32_t* hashes_temp_for_combine) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_RUNTIME_AVX2)
Expand All @@ -264,11 +264,11 @@ void Hashing32::HashVarLen(int64_t hardware_flags, bool combine_hashes, uint32_t
}
#endif
if (combine_hashes) {
HashVarLenImp<uint64_t, true>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
HashVarLenImp<int64_t, true>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
} else {
HashVarLenImp<uint64_t, false>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
HashVarLenImp<int64_t, false>(num_rows - num_processed, offsets + num_processed,
concatenated_keys, hashes + num_processed);
}
}

Expand Down Expand Up @@ -705,23 +705,21 @@ void Hashing64::HashVarLenImp(uint32_t num_rows, const T* offsets,
}
}

void Hashing64::HashVarLen(bool combine_hashes, uint32_t num_rows,
const uint32_t* offsets, const uint8_t* concatenated_keys,
uint64_t* hashes) {
void Hashing64::HashVarLen(bool combine_hashes, uint32_t num_rows, const int32_t* offsets,
const uint8_t* concatenated_keys, uint64_t* hashes) {
if (combine_hashes) {
HashVarLenImp<uint32_t, true>(num_rows, offsets, concatenated_keys, hashes);
HashVarLenImp<int32_t, true>(num_rows, offsets, concatenated_keys, hashes);
} else {
HashVarLenImp<uint32_t, false>(num_rows, offsets, concatenated_keys, hashes);
HashVarLenImp<int32_t, false>(num_rows, offsets, concatenated_keys, hashes);
}
}

void Hashing64::HashVarLen(bool combine_hashes, uint32_t num_rows,
const uint64_t* offsets, const uint8_t* concatenated_keys,
uint64_t* hashes) {
void Hashing64::HashVarLen(bool combine_hashes, uint32_t num_rows, const int64_t* offsets,
const uint8_t* concatenated_keys, uint64_t* hashes) {
if (combine_hashes) {
HashVarLenImp<uint64_t, true>(num_rows, offsets, concatenated_keys, hashes);
HashVarLenImp<int64_t, true>(num_rows, offsets, concatenated_keys, hashes);
} else {
HashVarLenImp<uint64_t, false>(num_rows, offsets, concatenated_keys, hashes);
HashVarLenImp<int64_t, false>(num_rows, offsets, concatenated_keys, hashes);
}
}

Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/compute/key_hash_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ class ARROW_EXPORT Hashing32 {
static const int64_t kStripeSize = 4 * sizeof(uint32_t);

static void HashVarLen(int64_t hardware_flags, bool combine_hashes, uint32_t num_rows,
const uint32_t* offsets, const uint8_t* concatenated_keys,
const int32_t* offsets, const uint8_t* concatenated_keys,
uint32_t* hashes, uint32_t* temp_hashes_for_combine);

static void HashVarLen(int64_t hardware_flags, bool combine_hashes, uint32_t num_rows,
const uint64_t* offsets, const uint8_t* concatenated_keys,
const int64_t* offsets, const uint8_t* concatenated_keys,
uint32_t* hashes, uint32_t* temp_hashes_for_combine);

static inline uint32_t Avalanche(uint32_t acc) {
Expand Down Expand Up @@ -140,11 +140,11 @@ class ARROW_EXPORT Hashing32 {
const uint8_t* concatenated_keys, uint32_t* hashes,
uint32_t* hashes_temp_for_combine);
static uint32_t HashVarLen_avx2(bool combine_hashes, uint32_t num_rows,
const uint32_t* offsets,
const int32_t* offsets,
const uint8_t* concatenated_keys, uint32_t* hashes,
uint32_t* hashes_temp_for_combine);
static uint32_t HashVarLen_avx2(bool combine_hashes, uint32_t num_rows,
const uint64_t* offsets,
const int64_t* offsets,
const uint8_t* concatenated_keys, uint32_t* hashes,
uint32_t* hashes_temp_for_combine);
#endif
Expand Down Expand Up @@ -178,10 +178,10 @@ class ARROW_EXPORT Hashing64 {
static const uint32_t kCombineConst = 0x9e3779b9UL;
static const int64_t kStripeSize = 4 * sizeof(uint64_t);

static void HashVarLen(bool combine_hashes, uint32_t num_rows, const uint32_t* offsets,
static void HashVarLen(bool combine_hashes, uint32_t num_rows, const int32_t* offsets,
const uint8_t* concatenated_keys, uint64_t* hashes);

static void HashVarLen(bool combine_hashes, uint32_t num_rows, const uint64_t* offsets,
static void HashVarLen(bool combine_hashes, uint32_t num_rows, const int64_t* offsets,
const uint8_t* concatenated_keys, uint64_t* hashes);

static inline uint64_t Avalanche(uint64_t acc);
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/arrow/compute/key_hash_internal_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,28 +288,28 @@ uint32_t Hashing32::HashVarLenImp_avx2(uint32_t num_rows, const T* offsets,
}

uint32_t Hashing32::HashVarLen_avx2(bool combine_hashes, uint32_t num_rows,
const uint32_t* offsets,
const int32_t* offsets,
const uint8_t* concatenated_keys, uint32_t* hashes,
uint32_t* hashes_temp_for_combine) {
if (combine_hashes) {
return HashVarLenImp_avx2<uint32_t, true>(num_rows, offsets, concatenated_keys,
hashes, hashes_temp_for_combine);
return HashVarLenImp_avx2<int32_t, true>(num_rows, offsets, concatenated_keys, hashes,
hashes_temp_for_combine);
} else {
return HashVarLenImp_avx2<uint32_t, false>(num_rows, offsets, concatenated_keys,
hashes, hashes_temp_for_combine);
return HashVarLenImp_avx2<int32_t, false>(num_rows, offsets, concatenated_keys,
hashes, hashes_temp_for_combine);
}
}

uint32_t Hashing32::HashVarLen_avx2(bool combine_hashes, uint32_t num_rows,
const uint64_t* offsets,
const int64_t* offsets,
const uint8_t* concatenated_keys, uint32_t* hashes,
uint32_t* hashes_temp_for_combine) {
if (combine_hashes) {
return HashVarLenImp_avx2<uint64_t, true>(num_rows, offsets, concatenated_keys,
hashes, hashes_temp_for_combine);
return HashVarLenImp_avx2<int64_t, true>(num_rows, offsets, concatenated_keys, hashes,
hashes_temp_for_combine);
} else {
return HashVarLenImp_avx2<uint64_t, false>(num_rows, offsets, concatenated_keys,
hashes, hashes_temp_for_combine);
return HashVarLenImp_avx2<int64_t, false>(num_rows, offsets, concatenated_keys,
hashes, hashes_temp_for_combine);
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/key_hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TestVectorHash {
bool use_varlen_input, int min_length, int max_length) {
using ArrayType = typename TypeTraits<Type>::ArrayType;
using OffsetType = typename TypeTraits<Type>::OffsetType;
using offset_t = typename std::make_unsigned<typename OffsetType::c_type>::type;
using offset_t = typename OffsetType::c_type;

constexpr int min_num_unique = 100;
constexpr int max_num_unique = 1000;
Expand Down
Loading
Loading