Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): make memory engine as non-local table #16955

Merged
merged 7 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ pub trait Table: Sync + Send {
false
}

fn support_distributed_insert(&self) -> bool {
false
}

/// whether table has the exact number of total rows
fn has_exact_total_row_count(&self) -> bool {
false
Expand Down
13 changes: 7 additions & 6 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Interpreter for InsertInterpreter {
}
InsertInputSource::SelectPlan(plan) => {
let table1 = table.clone();
let (mut select_plan, select_column_bindings, metadata) = match plan.as_ref() {
let (select_plan, select_column_bindings, metadata) = match plan.as_ref() {
Plan::Query {
s_expr,
metadata,
Expand All @@ -166,10 +166,11 @@ impl Interpreter for InsertInterpreter {
dml_build_update_stream_req(self.ctx.clone(), metadata).await?;

// here we remove the last exchange merge plan to trigger distribute insert
let insert_select_plan = match select_plan {
PhysicalPlan::Exchange(ref mut exchange) => {
// insert can be dispatched to different nodes
let insert_select_plan = match (select_plan, table.support_distributed_insert()) {
(PhysicalPlan::Exchange(ref mut exchange), true) => {
// insert can be dispatched to different nodes if table support_distributed_insert
let input = exchange.input.clone();

exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(Box::new(
DistributedInsertSelect {
// TODO(leiysky): we reuse the id of exchange here,
Expand All @@ -183,9 +184,9 @@ impl Interpreter for InsertInterpreter {
cast_needed: self.check_schema_cast(plan)?,
},
)));
select_plan
PhysicalPlan::Exchange(exchange.clone())
}
other_plan => {
(other_plan, _) => {
// insert should wait until all nodes finished
PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect {
// TODO: we reuse the id of other plan here,
Expand Down
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ impl Table for FuseTable {
true
}

fn support_distributed_insert(&self) -> bool {
true
}

fn has_exact_total_row_count(&self) -> bool {
true
}
Expand Down
18 changes: 6 additions & 12 deletions src/query/storages/memory/src/memory_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ use std::any::Any;
use std::sync::Arc;

use databend_common_catalog::plan::PartInfo;
use databend_common_catalog::plan::PartInfoPtr;

/// Memory table lazy partition information.
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct MemoryPartInfo {
pub total: usize,
pub part_start: usize,
pub part_end: usize,
}
pub struct MemoryPartInfo {}

#[typetag::serde(name = "memory")]
#[typetag::serde(name = "memory_part")]
impl PartInfo for MemoryPartInfo {
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -42,11 +40,7 @@ impl PartInfo for MemoryPartInfo {
}

impl MemoryPartInfo {
pub fn create(start: usize, end: usize, total: usize) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(MemoryPartInfo {
total,
part_start: start,
part_end: end,
}))
pub fn create() -> PartInfoPtr {
Arc::new(Box::new(MemoryPartInfo {}))
}
}
53 changes: 20 additions & 33 deletions src/query/storages/memory/src/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,31 +123,6 @@ impl MemoryTable {

Arc::new(Mutex::new(read_data_blocks))
}

pub fn generate_memory_parts(start: usize, workers: usize, total: usize) -> Partitions {
let part_size = total / workers;
let part_remain = total % workers;

let mut partitions = Vec::with_capacity(workers);
if part_size == 0 {
partitions.push(MemoryPartInfo::create(start, total, total));
} else {
for part in 0..workers {
let mut part_begin = part * part_size;
if part == 0 && start > 0 {
part_begin = start;
}
let mut part_end = (part + 1) * part_size;
if part == (workers - 1) && part_remain > 0 {
part_end += part_remain;
}

partitions.push(MemoryPartInfo::create(part_begin, part_end, total));
}
}

Partitions::create(PartitionsShuffleKind::Seq, partitions)
}
}

#[async_trait::async_trait]
Expand All @@ -160,6 +135,12 @@ impl Table for MemoryTable {
&self.table_info
}

/// MemoryTable could be distributed table, yet we only insert data in one node per query
/// Because commit_insert did not support distributed transaction
fn is_local(&self) -> bool {
false
}

fn support_column_projection(&self) -> bool {
true
}
Expand All @@ -176,8 +157,7 @@ impl Table for MemoryTable {
_dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
let blocks = self.blocks.read();

let statistics = match push_downs {
let mut statistics = match push_downs {
Some(push_downs) => {
let projection_filter: Box<dyn Fn(usize) -> bool> = match push_downs.projection {
Some(prj) => {
Expand Down Expand Up @@ -214,12 +194,19 @@ impl Table for MemoryTable {
}
};

let parts = Self::generate_memory_parts(
0,
ctx.get_settings().get_max_threads()? as usize,
blocks.len(),
);
Ok((statistics, parts))
let cluster = ctx.get_cluster();
if !cluster.is_empty() {
statistics.read_bytes = statistics.read_bytes.max(cluster.nodes.len());
statistics.read_rows = statistics.read_rows.max(cluster.nodes.len());
xudong963 marked this conversation as resolved.
Show resolved Hide resolved
statistics.partitions_total = statistics.partitions_total.max(cluster.nodes.len());
statistics.partitions_scanned = statistics.partitions_scanned.max(cluster.nodes.len());
}

let parts = vec![MemoryPartInfo::create()];
return Ok((
statistics,
Partitions::create(PartitionsShuffleKind::Broadcast, parts),
));
}

fn read_data(
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/null/src/null_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ impl Table for NullTable {
&self.table_info
}

/// Null do not keep data, it's safe to make it non-local.
fn is_local(&self) -> bool {
false
}

#[async_backtrace::framed]
async fn read_partitions(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,24 @@ select count(*) from test_order;
----
4000000

statement ok
BohuTANG marked this conversation as resolved.
Show resolved Hide resolved
create or replace table test_memory engine = Memory as select number, (number + 1) d from numbers(100000);

query I
select count() from test_memory
----
100000

statement ok
insert into test_memory select number, sum(number) from numbers(100000) group by number;

query I
select count() from test_memory
----
200000

statement ok
drop table test_memory;

statement ok
set enable_distributed_copy_into = 0;