Skip to content

Commit

Permalink
apacheGH-43758: [C++] Compute: More comment in RowEncoder (apache#43763)
Browse files Browse the repository at this point in the history
### Rationale for this change

Some comments for RowEncoder

### What changes are included in this PR?

Some comments for RowEncoder

### Are these changes tested?

Covered by existing

### Are there any user-facing changes?

no

* GitHub Issue: apache#43758

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
5 people authored Sep 2, 2024
1 parent 9ab9532 commit 44d3f76
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 63 deletions.
6 changes: 3 additions & 3 deletions cpp/src/arrow/compute/light_array_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ struct ARROW_EXPORT KeyColumnMetadata {
/// If this is true the column will have a validity buffer and
/// a data buffer and the third buffer will be unused.
bool is_fixed_length;
/// \brief True if this column is the null type
/// \brief True if this column is the null type(NA).
bool is_null_type;
/// \brief The number of bytes for each item
///
/// Zero has a special meaning, indicating a bit vector with one bit per value if it
/// isn't a null type column.
/// isn't a null type column. Generally, this means that the column is a boolean type.
///
/// For a varying-length binary column this represents the number of bytes per offset.
uint32_t fixed_length;
Expand Down Expand Up @@ -405,7 +405,7 @@ class ARROW_EXPORT ExecBatchBuilder {

int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }

static int num_rows_max() { return 1 << kLogNumRows; }
static constexpr int num_rows_max() { return 1 << kLogNumRows; }

private:
static constexpr int kLogNumRows = 15;
Expand Down
56 changes: 26 additions & 30 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,37 @@ void FixedWidthKeyEncoder::AddLengthNull(int32_t* length) {

Status FixedWidthKeyEncoder::Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) {
auto handle_next_valid_value = [&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
};
auto handle_next_null_value = [&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
};
if (data.is_array()) {
ArraySpan viewed = data.array;
// The original type might not be FixedSizeBinaryType, but it would
// treat the input as binary data.
auto view_ty = fixed_size_binary(byte_width_);
viewed.type = view_ty.get();
VisitArraySpanInline<FixedSizeBinaryType>(
viewed,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
});
VisitArraySpanInline<FixedSizeBinaryType>(viewed, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<arrow::internal::PrimitiveScalarBase>();
if (scalar.is_valid) {
const std::string_view data = scalar.view();
DCHECK_EQ(data.size(), static_cast<size_t>(byte_width_));
const std::string_view scalar_data = scalar.view();
DCHECK_EQ(scalar_data.size(), static_cast<size_t>(byte_width_));
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, data.data(), data.size());
encoded_ptr += byte_width_;
handle_next_valid_value(scalar_data);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -267,11 +263,11 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*

for (size_t i = 0; i < column_types.size(); ++i) {
const bool is_extension = column_types[i].id() == Type::EXTENSION;
const TypeHolder& type = is_extension
? arrow::internal::checked_pointer_cast<ExtensionType>(
column_types[i].GetSharedPtr())
->storage_type()
: column_types[i];
const TypeHolder& type =
is_extension
? arrow::internal::checked_cast<const ExtensionType*>(column_types[i].type)
->storage_type()
: column_types[i];

if (is_extension) {
extension_types_[i] = arrow::internal::checked_pointer_cast<ExtensionType>(
Expand Down Expand Up @@ -379,7 +375,7 @@ Result<ExecBatch> RowEncoder::Decode(int64_t num_rows, const int32_t* row_ids) {
ARROW_ASSIGN_OR_RAISE(out.values[i], ::arrow::internal::GetArrayView(
column_array_data, extension_types_[i]))
} else {
out.values[i] = column_array_data;
out.values[i] = std::move(column_array_data);
}
}

Expand Down
154 changes: 125 additions & 29 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,41 @@ struct ARROW_EXPORT KeyEncoder {

virtual ~KeyEncoder() = default;

// Increment the values in the lengths array by the length of the encoded key for the
// corresponding value in the given column.
//
// Generally if Encoder is for a fixed-width type, the length of the encoded key
// would add ExtraByteForNull + byte_width.
// If Encoder is for a variable-width type, the length would add ExtraByteForNull +
// sizeof(Offset) + buffer_size.
// If Encoder is for null type, the length would add 0.
virtual void AddLength(const ExecValue& value, int64_t batch_length,
int32_t* lengths) = 0;

// Increment the length by the length of an encoded null value.
// It's a special case for AddLength like `AddLength(Null-Scalar, 1, lengths)`.
virtual void AddLengthNull(int32_t* length) = 0;

// Encode the column into the encoded_bytes, which is an array of pointers to each row
// buffer.
//
// If value is an array, the array-size should be batch_length.
// If value is a scalar, the value would repeat batch_length times.
// NB: The pointers in the encoded_bytes will be advanced as values being encoded into.
virtual Status Encode(const ExecValue&, int64_t batch_length,
uint8_t** encoded_bytes) = 0;

// Encode a null value into the encoded_bytes, which is an array of pointers to each row
// buffer.
//
// It's a special case for Encode like `Encode(Null-Scalar, 1, encoded_bytes)`.
// NB: The pointers in the encoded_bytes will be advanced as values being encoded into.
virtual void EncodeNull(uint8_t** encoded_bytes) = 0;

// Decode the encoded key from the encoded_bytes, which is an array of pointers to each
// row buffer, into an ArrayData.
//
// NB: The pointers in the encoded_bytes will be advanced as values being decoded from.
virtual Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes,
int32_t length, MemoryPool*) = 0;

Expand Down Expand Up @@ -94,7 +119,7 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
MemoryPool* pool) override;

std::shared_ptr<DataType> type_;
int byte_width_;
const int byte_width_;
};

struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder {
Expand All @@ -118,6 +143,7 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {
void AddLength(const ExecValue& data, int64_t batch_length, int32_t* lengths) override {
if (data.is_array()) {
int64_t i = 0;
ARROW_DCHECK_EQ(data.array.length, batch_length);
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
Expand All @@ -142,41 +168,34 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {

Status Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) override {
auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
};
auto handle_next_null_value = [&encoded_bytes]() {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
};
if (data.is_array()) {
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
});
DCHECK_EQ(data.length(), batch_length);
VisitArraySpanInline<T>(data.array, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<BaseBinaryScalar>();
if (scalar.is_valid) {
const auto& bytes = *scalar.value;
const auto bytes = std::string_view{*scalar.value};
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
handle_next_valid_value(bytes);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -250,6 +269,68 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};

/// RowEncoder encodes ExecSpan to a variable length byte sequence
/// created by concatenating the encoded form of each column. The encoding
/// for each column depends on its data type.
///
/// This is used to encode columns into row-major format, which will be
/// beneficial for grouping and joining operations.
///
/// Unlike DuckDB and arrow-rs, currently this row format can not help
/// sortings because the row-format is uncomparable.
///
/// # Key Column Encoding
///
/// The row format is composed of the the KeyColumn encodings for each,
/// and the column is encoded as follows:
/// 1. A null byte for each column, indicating whether the column is null.
/// "1" for null, "0" for non-null.
/// 2. The "fixed width" encoding for the column, it would exist whether
/// the column is null or not.
/// 3. The "variable payload" encoding for the column, it would exists only
/// for non-null string/binary columns.
/// For string/binary columns, the length of the payload is in
/// "fixed width" part, and the binary contents are in the
/// "variable payload" part.
/// 4. Specially, if all columns in a row are null, the caller may decide
/// to refer to kRowIdForNulls instead of actually encoding/decoding
/// it using any KeyEncoder. See the comment for encoded_nulls_.
///
/// The endianness of the encoded bytes is platform-dependent.
///
/// ## Null Type
///
/// Null Type is a special case, it doesn't occupy any space in the
/// encoded row.
///
/// ## Fixed Width Type
///
/// Fixed Width Type is encoded as a fixed-width byte sequence. For example:
/// ```
/// Int8: 5, null, 6
/// ```
/// Would be encoded as [0 5], [1 0], [0 6].
///
/// ### Dictionary Type
///
/// Dictionary Type is encoded as a fixed-width byte sequence using
/// dictionary indices, the dictionary should be identical for all
/// rows.
///
/// ## Variable Width Type
///
/// Variable Width Type is encoded as:
/// [null byte, variable-byte length, variable bytes]. For example:
///
/// String "abc" Would be encoded as:
/// 0 ( 1 byte for not null) + 3 ( 4 bytes for length ) + "abc" (payload)
///
/// Null string Would be encoded as:
/// 1 ( 1 byte for null) + 0 ( 4 bytes for length )
///
/// # Row Encoding
///
/// The row format is the concatenation of the encodings of each column.
class ARROW_EXPORT RowEncoder {
public:
static constexpr int kRowIdForNulls() { return -1; }
Expand All @@ -259,6 +340,9 @@ class ARROW_EXPORT RowEncoder {
Status EncodeAndAppend(const ExecSpan& batch);
Result<ExecBatch> Decode(int64_t num_rows, const int32_t* row_ids);

// Returns the encoded representation of the row at index i.
// If i is kRowIdForNulls, it returns the pre-encoded all-nulls
// row.
inline std::string encoded_row(int32_t i) const {
if (i == kRowIdForNulls()) {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
Expand All @@ -270,14 +354,26 @@ class ARROW_EXPORT RowEncoder {
}

int32_t num_rows() const {
return offsets_.size() == 0 ? 0 : static_cast<int32_t>(offsets_.size() - 1);
return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
}

private:
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
// offsets_ vector stores the starting position (offset) of each encoded row
// within the bytes_ vector. This allows for quick access to individual rows.
//
// The size would be num_rows + 1 if not empty, the last element is the total
// length of the bytes_ vector.
std::vector<int32_t> offsets_;
// The encoded bytes of all non "kRowIdForNulls" rows.
std::vector<uint8_t> bytes_;
// A pre-encoded constant row with all its columns being null. Useful when
// the caller is certain that an entire row is null and then uses kRowIdForNulls
// to refer to it.
//
// EncodeAndAppend would never append this row, but encoded_row and Decode would
// return this row when kRowIdForNulls is passed.
std::vector<uint8_t> encoded_nulls_;
std::vector<std::shared_ptr<ExtensionType>> extension_types_;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct ARROW_EXPORT RowTableMetadata {
/// For a fixed-length binary row, common size of rows in bytes,
/// rounded up to the multiple of alignment.
///
/// For a varying-length binary, size of all encoded fixed-length key columns,
/// For a varying-length binary row, size of all encoded fixed-length key columns,
/// including lengths of varying-length columns, rounded up to the multiple of string
/// alignment.
uint32_t fixed_length;
Expand Down

0 comments on commit 44d3f76

Please sign in to comment.