Skip to content

Commit

Permalink
refactor: avoid memory allocation during partition write requests (#1208
Browse files Browse the repository at this point in the history
)

## Rationale
Some memory allocation during partitioning write request can be avoided.

## Detailed Changes
Introduce an enum called `PartitionedRows` as the return type of
`PartitionRule.locate_partitions_for_write` to avoid unnecessary memory
allocation.

## Test Plan
Existing test.
  • Loading branch information
ShiKaiWi authored Sep 18, 2023
1 parent d86f639 commit ff6d27b
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 193 deletions.
8 changes: 4 additions & 4 deletions analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub type MemTableVec = Vec<MemTableState>;
struct MemTableView {
/// The memtable for sampling timestamp to suggest segment duration.
///
/// This memtable is special and may contains data in differnt segment, so
/// This memtable is special and may contains data in different segment, so
/// can not be moved into immutable memtable set.
sampling_mem: Option<SamplingMemTable>,
/// Mutable memtables arranged by its time range.
Expand Down Expand Up @@ -399,7 +399,7 @@ impl MutableMemTableSet {
Self(BTreeMap::new())
}

/// Get memtale by timestamp for write
/// Get memtable by timestamp for write
fn memtable_for_write(&self, timestamp: Timestamp) -> Option<&MemTableState> {
// Find the first memtable whose end time (exclusive) > timestamp
if let Some((_, memtable)) = self
Expand Down Expand Up @@ -607,8 +607,8 @@ impl TableVersion {

/// Switch all mutable memtables
///
/// Returns the maxium `SequenceNumber` in the mutable memtables needs to be
/// freezed.
/// Returns the maximum `SequenceNumber` in the mutable memtables needs to
/// be freezed.
pub fn switch_memtables(&self) -> Option<SequenceNumber> {
self.inner.write().unwrap().memtable_view.switch_memtables()
}
Expand Down
4 changes: 2 additions & 2 deletions components/arena/src/arena_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{alloc::Layout, ptr::NonNull, sync::Arc};
/// The trait itself provides and enforces no guarantee about alignment. It's
/// implementation's responsibility to cover.
///
/// All memory-relavent methods (`alloc()` etc.) are not "unsafe". Compare with
/// All memory-relevant methods (`alloc()` etc.) are not "unsafe". Compare with
/// "deallocate" which is not included in this trait, allocating is more safer
/// and not likely to run into UB. However in fact, playing with raw pointer is
/// always dangerous and needs to be careful for both who implements and uses
Expand Down Expand Up @@ -69,7 +69,7 @@ impl BasicStats {
}
}

/// Collect memory usage from Arean
/// Collect memory usage from Arena.
pub trait Collector {
/// Called when `bytes` bytes memory is allocated in arena.
fn on_alloc(&self, bytes: usize);
Expand Down
2 changes: 1 addition & 1 deletion components/skiplist/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl<C: KeyComparator, A: Arena<Stats = BasicStats> + Clone> Skiplist<C, A> {
}

// We always insert from the base level and up. After you add a node in base
// leve, we cannot create a node in the level above because it would
// level, we cannot create a node in the level above because it would
// have discovered the node in the base level
let x: &mut Node = unsafe { &mut *node_ptr };
for i in 0..=height {
Expand Down
179 changes: 109 additions & 70 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ use std::{collections::HashMap, fmt};
use analytic_engine::{table::support_pushdown, TableOptions};
use async_trait::async_trait;
use common_types::{
row::{Row, RowGroupBuilder},
row::{Row, RowGroup, RowGroupBuilder},
schema::Schema,
};
use futures::{stream::FuturesUnordered, StreamExt};
use generic_error::BoxError;
use snafu::ResultExt;
use table_engine::{
partition::{
format_sub_partition_table_name, rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo,
format_sub_partition_table_name,
rule::{
df_adapter::DfPartitionRuleAdapter, PartitionedRow, PartitionedRows,
PartitionedRowsIter,
},
PartitionInfo,
},
remote::{
model::{
Expand Down Expand Up @@ -64,13 +69,23 @@ pub struct TableData {
pub struct PartitionTableImpl {
table_data: TableData,
remote_engine: RemoteEngineRef,
partition_rule: DfPartitionRuleAdapter,
}

impl PartitionTableImpl {
pub fn new(table_data: TableData, remote_engine: RemoteEngineRef) -> Result<Self> {
// Build partition rule.
let partition_rule = DfPartitionRuleAdapter::new(
table_data.partition_info.clone(),
&table_data.table_schema,
)
.box_err()
.context(CreatePartitionRule)?;

Ok(Self {
table_data,
remote_engine,
partition_rule,
})
}

Expand All @@ -84,6 +99,86 @@ impl PartitionTableImpl {
table: format_sub_partition_table_name(&self.table_data.table_name, &partition_name),
}
}

async fn write_single_row_group(
&self,
partition_id: usize,
row_group: RowGroup,
) -> Result<usize> {
let sub_table_ident = self.get_sub_table_ident(partition_id);

let request = RemoteWriteRequest {
table: sub_table_ident,
write_request: WriteRequest { row_group },
};

self.remote_engine
.write(request)
.await
.box_err()
.with_context(|| WriteBatch {
tables: vec![self.table_data.table_name.clone()],
})
}

async fn write_partitioned_row_groups(
&self,
schema: Schema,
partitioned_rows: PartitionedRowsIter,
) -> Result<usize> {
let mut split_rows = HashMap::new();
for PartitionedRow { partition_id, row } in partitioned_rows {
split_rows
.entry(partition_id)
.or_insert_with(Vec::new)
.push(row);
}

// Insert split write request through remote engine.
let mut request_batch = Vec::with_capacity(split_rows.len());
for (partition, rows) in split_rows {
let sub_table_ident = self.get_sub_table_ident(partition);
let row_group = RowGroupBuilder::with_rows(schema.clone(), rows)
.box_err()
.with_context(|| Write {
table: sub_table_ident.table.clone(),
})?
.build();

let request = RemoteWriteRequest {
table: sub_table_ident,
write_request: WriteRequest { row_group },
};
request_batch.push(request);
}

let batch_results = self
.remote_engine
.write_batch(request_batch)
.await
.box_err()
.with_context(|| WriteBatch {
tables: vec![self.table_data.table_name.clone()],
})?;
let mut total_rows = 0;
for batch_result in batch_results {
let WriteBatchResult {
table_idents,
result,
} = batch_result;

let written_rows = result.with_context(|| {
let tables = table_idents
.into_iter()
.map(|ident| ident.table)
.collect::<Vec<_>>();
WriteBatch { tables }
})?;
total_rows += written_rows;
}

Ok(total_rows as usize)
}
}

impl fmt::Debug for PartitionTableImpl {
Expand Down Expand Up @@ -137,83 +232,27 @@ impl Table for PartitionTableImpl {
.with_label_values(&["total"])
.start_timer();

// Build partition rule.
let df_partition_rule = match self.partition_info() {
None => UnexpectedWithMsg {
msg: "partition table partition info can't be empty",
}
.fail()?,
Some(partition_info) => {
DfPartitionRuleAdapter::new(partition_info, &self.table_data.table_schema)
.box_err()
.context(CreatePartitionRule)?
}
};

// Split write request.
let partitions = {
let schema = request.row_group.schema().clone();
let partition_rows = {
let _locate_timer = PARTITION_TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["locate"])
.start_timer();
df_partition_rule
.locate_partitions_for_write(&request.row_group)
self.partition_rule
.locate_partitions_for_write(request.row_group)
.box_err()
.context(LocatePartitions)?
};

let mut split_rows = HashMap::new();
let schema = request.row_group.schema().clone();
for (partition, row) in partitions.into_iter().zip(request.row_group.into_iter()) {
split_rows
.entry(partition)
.or_insert_with(Vec::new)
.push(row);
}

// Insert split write request through remote engine.
let mut request_batch = Vec::with_capacity(split_rows.len());
for (partition, rows) in split_rows {
let sub_table_ident = self.get_sub_table_ident(partition);
let row_group = RowGroupBuilder::with_rows(schema.clone(), rows)
.box_err()
.with_context(|| Write {
table: sub_table_ident.table.clone(),
})?
.build();

let request = RemoteWriteRequest {
table: sub_table_ident,
write_request: WriteRequest { row_group },
};
request_batch.push(request);
}

let batch_results = self
.remote_engine
.write_batch(request_batch)
.await
.box_err()
.context(WriteBatch {
tables: vec![self.table_data.table_name.clone()],
})?;
let mut total_rows = 0;
for batch_result in batch_results {
let WriteBatchResult {
table_idents,
result,
} = batch_result;

let written_rows = result.with_context(|| {
let tables = table_idents
.into_iter()
.map(|ident| ident.table)
.collect::<Vec<_>>();
WriteBatch { tables }
})?;
total_rows += written_rows;
match partition_rows {
PartitionedRows::Single {
partition_id,
row_group,
} => self.write_single_row_group(partition_id, row_group).await,
PartitionedRows::Multiple(iter) => {
self.write_partitioned_row_groups(schema, iter).await
}
}

Ok(total_rows as usize)
}

async fn read(&self, _request: ReadRequest) -> Result<SendableRecordBatchStream> {
Expand Down
9 changes: 6 additions & 3 deletions table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ pub enum Error {
))]
InvalidPartitionInfoEncodingVersion { version: u8, backtrace: Backtrace },

#[snafu(display("Partition info could not be empty"))]
EmptyPartitionInfo {},
#[snafu(display("Partition info could not be empty.\nBacktrace:\n{backtrace}"))]
EmptyPartitionInfo { backtrace: Backtrace },

#[snafu(display("Column in the partition key is not found.\nBacktrace:\n{backtrace}"))]
InvalidPartitionKey { backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -281,7 +284,7 @@ impl TryFrom<ceresdbproto::cluster::PartitionInfo> for PartitionInfo {
Ok(Self::Random(random_partition_info))
}
},
None => Err(Error::EmptyPartitionInfo {}),
None => EmptyPartitionInfo {}.fail(),
}
}
}
Expand Down
27 changes: 16 additions & 11 deletions table_engine/src/partition/rule/df_adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use datafusion::logical_expr::Expr;
use self::extractor::{KeyExtractor, NoopExtractor};
use crate::partition::{
rule::{
df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, PartitionRuleRef,
df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, PartitionRulePtr,
PartitionedRows,
},
BuildPartitionRule, PartitionInfo, Result,
};
Expand All @@ -30,7 +31,7 @@ mod extractor;
/// Partition rule's adapter for datafusion
pub struct DfPartitionRuleAdapter {
/// Partition rule
rule: PartitionRuleRef,
rule: PartitionRulePtr,

/// `PartitionFilter` extractor for datafusion `Expr`
extractor: FilterExtractorRef,
Expand All @@ -44,18 +45,17 @@ impl DfPartitionRuleAdapter {
Ok(Self { rule, extractor })
}

pub fn columns(&self) -> Vec<String> {
self.rule.columns()
pub fn columns(&self) -> &[String] {
self.rule.involved_columns()
}

pub fn locate_partitions_for_write(&self, row_group: &RowGroup) -> Result<Vec<usize>> {
self.rule.locate_partitions_for_write(row_group)
pub fn locate_partitions_for_write(&self, row_group: RowGroup) -> Result<PartitionedRows> {
self.rule.location_partitions_for_write(row_group)
}

pub fn locate_partitions_for_read(&self, filters: &[Expr]) -> Result<Vec<usize>> {
// Extract partition filters from datafusion filters.
let columns = self.columns();
let partition_filters = self.extractor.extract(filters, &columns);
let partition_filters = self.extractor.extract(filters, self.columns());

// Locate partitions from filters.
self.rule.locate_partitions_for_read(&partition_filters)
Expand Down Expand Up @@ -84,6 +84,7 @@ mod tests {
time::Timestamp,
};
use datafusion::logical_expr::{col, lit};
use itertools::Itertools;

use super::*;
use crate::partition::{
Expand Down Expand Up @@ -232,9 +233,13 @@ mod tests {
// Basic flow
let key_rule_adapter =
DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap();
let partitions = key_rule_adapter
.locate_partitions_for_write(&row_group)
let partitioned_rows = key_rule_adapter
.locate_partitions_for_write(row_group)
.unwrap();
let partition_ids = match partitioned_rows {
PartitionedRows::Multiple(iter) => iter.map(|v| v.partition_id).collect_vec(),
_ => panic!("invalid partitioned rows"),
};

// Expected
let partition_keys_1 = test_datums[0].clone();
Expand All @@ -245,7 +250,7 @@ mod tests {
let expected_2 = compute_partition(partition_key_refs_2, partition_num);
let expecteds = vec![expected_1, expected_2];

assert_eq!(partitions, expecteds);
assert_eq!(partition_ids, expecteds);
}

fn build_schema() -> Schema {
Expand Down
Loading

0 comments on commit ff6d27b

Please sign in to comment.