-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce a Vec<u8>
based row-wise representation for DataFusion
#1708
Comments
Thanks @yjshen Some other notes
|
Here is the old implementation in case it is of value: /// Appends a sequence of [u8] bytes for the value in `col[row]` to
/// `vec` to be used as a key into the hash map for a dictionary type
///
/// Note that ideally, for dictionary encoded columns, we would be
/// able to simply use the dictionary idicies themselves (no need to
/// look up values) or possibly simply build the hash table entirely
/// on the dictionary indexes.
///
/// This aproach would likely work (very) well for the common case,
/// but it also has to to handle the case where the dictionary itself
/// is not the same across all record batches (and thus indexes in one
/// record batch may not correspond to the same index in another)
fn dictionary_create_key_for_col<K: ArrowDictionaryKeyType>(
col: &ArrayRef,
row: usize,
vec: &mut Vec<u8>,
) -> Result<()> {
let dict_col = col.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
// look up the index in the values dictionary
let keys_col = dict_col.keys();
let values_index = keys_col.value(row).to_usize().ok_or_else(|| {
DataFusionError::Internal(format!(
"Can not convert index to usize in dictionary of type creating group by value {:?}",
keys_col.data_type()
))
})?;
create_key_for_col(dict_col.values(), values_index, vec)
}
/// Appends a sequence of [u8] bytes for the value in `col[row]` to
/// `vec` to be used as a key into the hash map.
///
/// NOTE: This function does not check col.is_valid(). Caller must do so
fn create_key_for_col(col: &ArrayRef, row: usize, vec: &mut Vec<u8>) -> Result<()> {
match col.data_type() {
DataType::Boolean => {
let array = col.as_any().downcast_ref::<BooleanArray>().unwrap();
vec.extend_from_slice(&[array.value(row) as u8]);
}
DataType::Float32 => {
let array = col.as_any().downcast_ref::<Float32Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Float64 => {
let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::UInt8 => {
let array = col.as_any().downcast_ref::<UInt8Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::UInt16 => {
let array = col.as_any().downcast_ref::<UInt16Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::UInt32 => {
let array = col.as_any().downcast_ref::<UInt32Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::UInt64 => {
let array = col.as_any().downcast_ref::<UInt64Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Int8 => {
let array = col.as_any().downcast_ref::<Int8Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Int16 => {
let array = col.as_any().downcast_ref::<Int16Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::Int32 => {
let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Int64 => {
let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
let array = col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let array = col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let array = col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Utf8 => {
let array = col.as_any().downcast_ref::<StringArray>().unwrap();
let value = array.value(row);
// store the size
vec.extend_from_slice(&value.len().to_le_bytes());
// store the string value
vec.extend_from_slice(value.as_bytes());
}
DataType::LargeUtf8 => {
let array = col.as_any().downcast_ref::<LargeStringArray>().unwrap();
let value = array.value(row);
// store the size
vec.extend_from_slice(&value.len().to_le_bytes());
// store the string value
vec.extend_from_slice(value.as_bytes());
}
DataType::Date32 => {
let array = col.as_any().downcast_ref::<Date32Array>().unwrap();
vec.extend_from_slice(&array.value(row).to_le_bytes());
}
DataType::Dictionary(index_type, _) => match **index_type {
DataType::Int8 => {
dictionary_create_key_for_col::<Int8Type>(col, row, vec)?;
}
DataType::Int16 => {
dictionary_create_key_for_col::<Int16Type>(col, row, vec)?;
}
DataType::Int32 => {
dictionary_create_key_for_col::<Int32Type>(col, row, vec)?;
}
DataType::Int64 => {
dictionary_create_key_for_col::<Int64Type>(col, row, vec)?;
}
DataType::UInt8 => {
dictionary_create_key_for_col::<UInt8Type>(col, row, vec)?;
}
DataType::UInt16 => {
dictionary_create_key_for_col::<UInt16Type>(col, row, vec)?;
}
DataType::UInt32 => {
dictionary_create_key_for_col::<UInt32Type>(col, row, vec)?;
}
DataType::UInt64 => {
dictionary_create_key_for_col::<UInt64Type>(col, row, vec)?;
}
_ => {
return Err(DataFusionError::Internal(format!(
"Unsupported GROUP BY type (dictionary index type not supported creating key) {}",
col.data_type(),
)))
}
},
_ => {
// This is internal because we should have caught this before.
return Err(DataFusionError::Internal(format!(
"Unsupported GROUP BY type creating key {}",
col.data_type(),
)));
}
}
Ok(())
}
/// Create a key `Vec<u8>` that is used as key for the hashmap
///
/// This looks like
/// [null_byte][col_value_bytes][null_byte][col_value_bytes]
///
/// Note that relatively uncommon patterns (e.g. not 0x00) are chosen
/// for the null_byte to make debugging easier. The actual values are
/// arbitrary.
///
/// For a NULL value in a column, the key looks like
/// [0xFE]
///
/// For a Non-NULL value in a column, this looks like:
/// [0xFF][byte representation of column value]
///
/// Example of a key with no NULL values:
/// ```text
/// 0xFF byte at the start of each column
/// signifies the value is non-null
/// │
///
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┐
///
/// │ string len │ 0x1234
/// { ▼ (as usize le) "foo" ▼(as u16 le)
/// k1: "foo" ╔ ═┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═┌──┬──┐
/// k2: 0x1234u16 FF║03│00│00│00│00│00│00│00│"f│"o│"o│FF║34│12│
/// } ╚ ═└──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═└──┴──┘
/// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
/// ```
///
/// Example of a key with NULL values:
///
///```text
/// 0xFE byte at the start of k1 column
/// ┌ ─ signifies the value is NULL
///
/// └ ┐
/// 0x1234
/// { ▼ (as u16 le)
/// k1: NULL ╔ ═╔ ═┌──┬──┐
/// k2: 0x1234u16 FE║FF║12│34│
/// } ╚ ═╚ ═└──┴──┘
/// 0 1 2 3
///```
pub(crate) fn create_key(
group_by_keys: &[ArrayRef],
row: usize,
vec: &mut Vec<u8>,
) -> Result<()> {
vec.clear();
for col in group_by_keys {
if !col.is_valid(row) {
vec.push(0xFE);
} else {
vec.push(0xFF);
create_key_for_col(col, row, vec)?
}
}
Ok(()) |
After some code/doc checking into the existing systems, the three systems' row layouts are: Postgresql: var-length tuple
Check Data Alignment in PostgreSQL, Column Storage Internals, CodeSample in Page16 for more details. DuckDB: fixed-length tuple
Check Source Code and a related blog post/external sorting section for more details. SparkSQL: var-length tuple
Check Source Code for more details. |
Given
|
Thanks @yjshen for the detailed research, it looks like postgres's design might be better assuming we only access row values sequentially the majority of the time. I think this is the case for our current hash aggregate and sort implementation? |
Thanks for the research/overview!
Taking inspiration from DuckDB / PostgreSQL sounds reasonable to me.
I am wondering if for certain operations, e.g. hash aggregate, I feel fixed
size input the data is stored better in a columnar format (mutable array,
with offsets), which can have faster (vectorized) operations (batch
updating state values) and faster (free) conversion to a columnar array.
Another row-based format (like we have now, or a more "advanced" one) would
spend some extra time in:
* Converting to the row-wise format values
* Interpreting the row-wise format (accessing cells based on data types)
* Generating columnar data
The story is probably very different for sorting, I still need to read the
DuckDB post in detail.
For join, whether hash-based or sort-based, would suffer from similar
problems as above
I think it isn't is the case for hash join, I think there is no need to
have a row reprentation (as we can keep the left side data in columnar
format in memory, we don't mutate the data).
…On Thu, Feb 3, 2022, 08:22 QP Hou ***@***.***> wrote:
Thanks @yjshen <https://github.com/yjshen> for the detailed research, it
looks like postgres's design might be better assuming we only access row
values sequentially the majority of the time. I think this is the case for
our current hash aggregate and sort implementation?
—
Reply to this email directly, view it on GitHub
<#1708 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABH7GJERPANWE7W7QJDGWLUZIULXANCNFSM5NEAJ2CQ>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
💯 with what @Dandandan and @houqp said; Thank you for writing this up @yjshen ❤️
I agree with @Dandandan that for HashAggregate this would be super helpful -- as the group keys and aggregates could be computed "in place" (so output was free) Sorting is indeed different because the sort key is different than what appears in the output. For example The grouping values are produced. For example p.s. for what it is worth I think DuckDB has a short string optimization so the key may look something more like
|
Depending on when people want to try a "mutable array" approach for improving hash performance, I can probably whip up something (based on arrow-rs), so let me know |
Thank you all for your comments and for fixing my mistakes ❤️ I agree for a small hashtable, i.e., the whole data structure can be CPU cache resident or a few times larger. The columnar organization gives us all benefits at no cost. However, as the group-by key cardinality grows, the bottleneck of hash aggregation or hash join row concatenation becomes more memory access pattern related. The columnar structured hashtable would cause N times extra cache miss/loads since we are constantly accessing a tuple at a time. The problem is identified as the memory wall. It results in quite a lot of systems using row-wise hashtable even they utilize vectorized execution model: in vertica, Typer and Tectorwise, Operator Fusion, and also the latest DuckDB. For sort, I think the problem is similar; re-ordering will cause a random access pattern in memory for each column. If there are many payload columns, this will be slow. The row <-> columnar conversion comes with a price; we need extra computation. But considering we always need to buffer intermediate data in memory and compose/decompose while data is cache resident, the conversion could be pretty efficient. I'm not proposing to change all these data structures at once to row-wised but to point out existing theory and practice on using the row-wise organization for these pipeline breakers' states. We should always implement and benchmark before deciding on these performance-critical codes, as we did in the past. For the row structure, I'd like a combination of all these three systems:
The reason for the above-proposed structure is: we could determine each attribute's offset with schema independently, and we pay the bill with extra padding space for better computation performance. |
One thing we might consider is not storing the group key values directly in the hash table, but separately. Something like:
The current hash aggregate code takes this approach -- but instead of using Mutable/Appendable arrays it uses a Vec of
I agree with this 💯 and among other reasons is why I enjoy working with you (and the rest of the people on this chain!) BTW the DuckDB sorting blog https://duckdb.org/2021/08/27/external-sorting.html (and the paper it references by Goetz Grafe) have a good treatment of sorting (specifically the calculation of sort keys and then a secondary memory shuffle to sort move the original data around correctly) |
@alamb highlighted this thread internally and I saw a couple of interesting points. I work on IOx's Read Buffer, which is an in-memory columnar engine that currently implements Datafusion's table provider (so currently only supports scans with predicate pushdown etc). I have experimented with a prototype that can do grouping/aggregation directly on encoded columnar data (e.g., on integer representations of RLE/dictionary encodings) and I found a couple of things mentioned already in this thread: Using a Using Anyway, just some anecdotal thoughts :-). Whilst there are some significant constraints the Read Buffer can take advantage of that Datafusion can't, based on my experience from playing around with similar ideas, I suspect the direction @yjshen has proposed things go here is going will have a significant improvement on grouping performance 👍 [1]: Because all group columns in the read buffer are dictionary or RLE encoded such that the encoded representation have the same ordinal properties |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Many pipeline-breaking operators are inherently row-based:
For sort that would shuffle records around, re-order would cause random memory access patterns for each column in the current columnar organization. The performance will deteriorate as the number of columns grows. Besides, the compound sort key also requires us to access different columns.
On the other hand, row-based representation avoids this problem (performance deteriorates with payload column number growth). we can check here for more explanations.
For hashtable entries that we buffer aggregation state, we are already utilizing a row-based format indirectly -- We use
Vec<ScalarValue>
as a state for each key. Vector ofScalarValue
is mostly stored continuously in memory but faced with two kinds of inefficiency: 1. memory overhead introduced byScalarValue
enum (16bytes per field according to @alamb); 2. string or other non-primitive values stored on the heap elsewhere and accessed through pointers.I quote these two great diagrams above from @alamb. Thanks again!
For join, whether hash-based or sort-based, would suffer from similar problems as above.
Describe the solution you'd like
Vec<u8>
based representation for tuple, store all columns continuously in memory, for row-logic operations.vec<u8>
tuple efficiently.We could refer to PostgreSQL / DuckDB / Spark for the row format design. But note Spark's
UnsafeRow
incurs a lot of memory overhead due to its 8-byte alignment.Describe alternatives you've considered
Not to introduce or use
Vec<ScalarValue>
with overhead.The text was updated successfully, but these errors were encountered: