diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index e8549ae85025b..9b4b90314b049 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -543,6 +543,41 @@ def test_order_multi_pk(): check_rows_data([17,17,17,17],row[4],"Insert") drop_table_subscription() +def test_explain_cursor(): + print(f"test_explain_cursor") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + execute_insert("insert into t5 values(1,1,1,1)",conn) + execute_insert("flush",conn) + execute_insert("insert into t5 values(2,2,2,2)",conn) + execute_insert("flush",conn) + execute_insert("declare cur subscription cursor for sub5 full",conn) + execute_insert("insert into t5 values(3,3,3,3)",conn) + execute_insert("flush",conn) + execute_insert("insert into t5 values(4,4,4,4)",conn) + execute_insert("flush",conn) + plan = execute_query("explain fetch next from cur",conn) + assert plan[0][0] == "BatchExchange { order: [t5.v1 ASC, t5.v2 ASC], dist: Single }" + assert plan[1][0] == "└─BatchScan { table: t5, columns: [v1, v2, v3, v4] }" + execute_query("fetch next from cur",conn) + plan = execute_query("explain fetch next from cur",conn) + assert plan[0][0] == "BatchExchange { order: [t5.v1 ASC, t5.v2 ASC], dist: Single }" + assert plan[1][0] == "└─BatchScan { table: t5, columns: [v1, v2, v3, v4], scan_ranges: [(v1, v2) > (Int32(1), Int32(1))] }" + execute_query("fetch next from cur",conn) + execute_query("fetch next from cur",conn) + plan = execute_query("explain fetch next from cur",conn) + print(plan) + assert plan[0][0] == "BatchExchange { order: [t5.v1 ASC, t5.v2 ASC], dist: Single }" + assert "└─BatchLogSeqScan { table: t5, columns: [v1, v2, v3, v4, op]" in plan[1][0] + assert "scan_ranges: [(v1, v2) > (Int32(3), Int32(3))] }" in plan[1][0] + execute_query("fetch next from cur",conn) + drop_table_subscription() + if __name__ == "__main__": test_cursor_snapshot() test_cursor_op() @@ -559,3 +594,4 @@ def test_order_multi_pk(): test_order_mv() test_order_multi_pk() test_block_cursor() + test_explain_cursor() diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 2373b7d483e30..284c47ce08ef4 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -140,6 +140,7 @@ message LogRowSeqScanNode { common.BatchQueryEpoch old_epoch = 4; common.BatchQueryEpoch new_epoch = 5; bool ordered = 6; + repeated ScanRange scan_ranges = 7; } message InsertNode { diff --git a/src/batch/executors/src/executor/log_row_seq_scan.rs b/src/batch/executors/src/executor/log_row_seq_scan.rs index 05ab69013db62..256be5c347f89 100644 --- a/src/batch/executors/src/executor/log_row_seq_scan.rs +++ b/src/batch/executors/src/executor/log_row_seq_scan.rs @@ -33,8 +33,11 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::collect_data_chunk; use risingwave_storage::{dispatch_state_store, StateStore}; -use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; +use super::{ + BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange, +}; use crate::error::{BatchError, Result}; +use crate::executor::build_scan_ranges_from_pb; use crate::monitor::BatchMetrics; pub struct LogRowSeqScanExecutor { @@ -52,6 +55,7 @@ pub struct LogRowSeqScanExecutor { new_epoch: u64, version_id: HummockVersionId, ordered: bool, + scan_ranges: Vec, } impl LogRowSeqScanExecutor { @@ -64,6 +68,7 @@ impl LogRowSeqScanExecutor { identity: String, metrics: Option, ordered: bool, + scan_ranges: Vec, ) -> Self { let mut schema = table.schema().clone(); schema.fields.push(Field::with_name( @@ -80,6 +85,7 @@ impl LogRowSeqScanExecutor { new_epoch, version_id, ordered, + scan_ranges, } } } @@ -137,6 +143,9 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { let old_epoch = old_epoch.epoch; let new_epoch = new_epoch.epoch; + let scan_ranges = + build_scan_ranges_from_pb(&log_store_seq_scan_node.scan_ranges, table_desc)?; + dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); Ok(Box::new(LogRowSeqScanExecutor::new( @@ -148,6 +157,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { source.plan_node().get_identity().clone(), metrics, log_store_seq_scan_node.ordered, + scan_ranges, ))) }) } @@ -178,6 +188,7 @@ impl LogRowSeqScanExecutor { version_id, schema, ordered, + scan_ranges, .. } = *self; let table = std::sync::Arc::new(table); @@ -189,20 +200,23 @@ impl LogRowSeqScanExecutor { // Range Scan // WARN: DO NOT use `select` to execute range scans concurrently // it can consume too much memory if there're too many ranges. - let stream = Self::execute_range( - table.clone(), - old_epoch, - new_epoch, - version_id, - chunk_size, - histogram, - Arc::new(schema.clone()), - ordered, - ); - #[for_await] - for chunk in stream { - let chunk = chunk?; - yield chunk; + for range in scan_ranges { + let stream = Self::execute_range( + table.clone(), + old_epoch, + new_epoch, + version_id, + chunk_size, + histogram, + Arc::new(schema.clone()), + ordered, + range, + ); + #[for_await] + for chunk in stream { + let chunk = chunk?; + yield chunk; + } } } @@ -216,13 +230,18 @@ impl LogRowSeqScanExecutor { histogram: Option>, schema: Arc, ordered: bool, + scan_range: ScanRange, ) { + let pk_prefix = scan_range.pk_prefix.clone(); + let range_bounds = scan_range.convert_to_range_bounds(&table); // Range Scan. let iter = table .batch_iter_log_with_pk_bounds( old_epoch, HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id), ordered, + range_bounds, + pk_prefix, ) .await? .flat_map(|r| { diff --git a/src/batch/executors/src/executor/row_seq_scan.rs b/src/batch/executors/src/executor/row_seq_scan.rs index a9efb465df187..e095b218d25a3 100644 --- a/src/batch/executors/src/executor/row_seq_scan.rs +++ b/src/batch/executors/src/executor/row_seq_scan.rs @@ -11,23 +11,19 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::{Bound, Deref}; +use std::ops::Deref; use std::sync::Arc; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; -use itertools::Itertools; use prometheus::Histogram; use risingwave_common::array::DataChunk; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::{scan_range, PbScanRange}; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::as_of::AsOfType; use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc}; @@ -35,9 +31,11 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::{dispatch_state_store, StateStore}; +use super::ScanRange; use crate::error::{BatchError, Result}; use crate::executor::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, + build_scan_ranges_from_pb, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, + ExecutorBuilder, }; use crate::monitor::BatchMetrics; @@ -57,17 +55,6 @@ pub struct RowSeqScanExecutor { limit: Option, as_of: Option, } - -/// Range for batch scan. -#[derive(Debug)] -pub struct ScanRange { - /// The prefix of the primary key. - pub pk_prefix: OwnedRow, - - /// The range bounds of the next column. - pub next_col_bounds: (Bound, Bound), -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AsOf { pub timestamp: i64, @@ -98,71 +85,6 @@ impl From<&AsOf> for PbAsOf { } } -impl ScanRange { - /// Create a scan range from the prost representation. - pub fn new(scan_range: PbScanRange, pk_types: Vec) -> Result { - let mut index = 0; - let pk_prefix = OwnedRow::new( - scan_range - .eq_conds - .iter() - .map(|v| { - let ty = pk_types.get(index).unwrap(); - index += 1; - deserialize_datum(v.as_slice(), ty) - }) - .try_collect()?, - ); - if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() { - return Ok(Self { - pk_prefix, - ..Self::full() - }); - } - - let build_bound = |bound: &scan_range::Bound, mut index| -> Result> { - let next_col_bounds = OwnedRow::new( - bound - .value - .iter() - .map(|v| { - let ty = pk_types.get(index).unwrap(); - index += 1; - deserialize_datum(v.as_slice(), ty) - }) - .try_collect()?, - ); - if bound.inclusive { - Ok(Bound::Included(next_col_bounds)) - } else { - Ok(Bound::Excluded(next_col_bounds)) - } - }; - - let next_col_bounds: (Bound, Bound) = match ( - scan_range.lower_bound.as_ref(), - scan_range.upper_bound.as_ref(), - ) { - (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?), - (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?), - (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded), - (None, None) => unreachable!(), - }; - Ok(Self { - pk_prefix, - next_col_bounds, - }) - } - - /// Create a scan range for full table scan. - pub fn full() -> Self { - Self { - pk_prefix: OwnedRow::default(), - next_col_bounds: (Bound::Unbounded, Bound::Unbounded), - } - } -} - impl RowSeqScanExecutor { pub fn new( table: StorageTable, @@ -219,31 +141,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { None => Some(Bitmap::ones(table_desc.vnode_count()).into()), }; - let scan_ranges = { - let scan_ranges = &seq_scan_node.scan_ranges; - if scan_ranges.is_empty() { - vec![ScanRange::full()] - } else { - scan_ranges - .iter() - .map(|scan_range| { - let pk_types = table_desc - .pk - .iter() - .map(|order| { - DataType::from( - table_desc.columns[order.column_index as usize] - .column_type - .as_ref() - .unwrap(), - ) - }) - .collect_vec(); - ScanRange::new(scan_range.clone(), pk_types) - }) - .try_collect()? - } - }; + let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?; let ordered = seq_scan_node.ordered; @@ -429,55 +327,15 @@ impl RowSeqScanExecutor { limit: Option, histogram: Option>, ) { - let ScanRange { - pk_prefix, - next_col_bounds, - } = scan_range; - - // The len of a valid pk_prefix should be less than or equal pk's num. - let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; - let (start_bound, end_bound) = if order_type.is_ascending() { - (next_col_bounds.0, next_col_bounds.1) - } else { - (next_col_bounds.1, next_col_bounds.0) - }; - - let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded); - let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded); - - let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| { - match bound { - Bound::Unbounded => { - if other_bound_is_bounded && order_type_nulls { - // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - } - } - Bound::Included(x) => Bound::Included(x), - Bound::Excluded(x) => Bound::Excluded(x), - } - }; - let start_bound = build_bound( - end_bound_is_bounded, - start_bound, - order_type.nulls_are_first(), - ); - let end_bound = build_bound( - start_bound_is_bounded, - end_bound, - order_type.nulls_are_last(), - ); - + let pk_prefix = scan_range.pk_prefix.clone(); + let range_bounds = scan_range.convert_to_range_bounds(&table); // Range Scan. assert!(pk_prefix.len() < table.pk_indices().len()); let iter = table .batch_chunk_iter_with_pk_bounds( epoch.into(), &pk_prefix, - (start_bound, end_bound), + range_bounds, ordered, chunk_size, PrefetchOptions::new(limit.is_none(), true), diff --git a/src/batch/executors/src/executor/utils.rs b/src/batch/executors/src/executor/utils.rs index 4f724ec5416c8..13e04a780ac51 100644 --- a/src/batch/executors/src/executor/utils.rs +++ b/src/batch/executors/src/executor/utils.rs @@ -12,11 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::ops::{Bound, RangeBounds}; + use futures::stream::BoxStream; use futures::StreamExt; use futures_async_stream::try_stream; +use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::DataType; +use risingwave_common::util::value_encoding::deserialize_datum; +use risingwave_pb::batch_plan::{scan_range, PbScanRange}; +use risingwave_pb::plan_common::StorageTableDesc; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::StateStore; use crate::error::{BatchError, Result}; use crate::executor::{BoxedDataChunkStream, Executor}; @@ -124,3 +134,152 @@ impl Executor for WrapStreamExecutor { self.stream } } + +/// Range for batch scan. +pub struct ScanRange { + /// The prefix of the primary key. + pub pk_prefix: OwnedRow, + + /// The range bounds of the next column. + pub next_col_bounds: (Bound, Bound), +} +impl ScanRange { + /// Create a scan range from the prost representation. + pub fn new(scan_range: PbScanRange, pk_types: Vec) -> Result { + let mut index = 0; + let pk_prefix = OwnedRow::new( + scan_range + .eq_conds + .iter() + .map(|v| { + let ty = pk_types.get(index).unwrap(); + index += 1; + deserialize_datum(v.as_slice(), ty) + }) + .try_collect()?, + ); + if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() { + return Ok(Self { + pk_prefix, + ..Self::full() + }); + } + + let build_bound = |bound: &scan_range::Bound, mut index| -> Result> { + let next_col_bounds = OwnedRow::new( + bound + .value + .iter() + .map(|v| { + let ty = pk_types.get(index).unwrap(); + index += 1; + deserialize_datum(v.as_slice(), ty) + }) + .try_collect()?, + ); + if bound.inclusive { + Ok(Bound::Included(next_col_bounds)) + } else { + Ok(Bound::Excluded(next_col_bounds)) + } + }; + + let next_col_bounds: (Bound, Bound) = match ( + scan_range.lower_bound.as_ref(), + scan_range.upper_bound.as_ref(), + ) { + (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?), + (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?), + (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded), + (None, None) => unreachable!(), + }; + Ok(Self { + pk_prefix, + next_col_bounds, + }) + } + + /// Create a scan range for full table scan. + pub fn full() -> Self { + Self { + pk_prefix: OwnedRow::default(), + next_col_bounds: (Bound::Unbounded, Bound::Unbounded), + } + } + + pub fn convert_to_range_bounds( + self, + table: &StorageTable, + ) -> impl RangeBounds { + let ScanRange { + pk_prefix, + next_col_bounds, + } = self; + + // The len of a valid pk_prefix should be less than or equal pk's num. + let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; + let (start_bound, end_bound) = if order_type.is_ascending() { + (next_col_bounds.0, next_col_bounds.1) + } else { + (next_col_bounds.1, next_col_bounds.0) + }; + + let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded); + let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded); + + let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| { + match bound { + Bound::Unbounded => { + if other_bound_is_bounded && order_type_nulls { + // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded + } + } + Bound::Included(x) => Bound::Included(x), + Bound::Excluded(x) => Bound::Excluded(x), + } + }; + let start_bound = build_bound( + end_bound_is_bounded, + start_bound, + order_type.nulls_are_first(), + ); + let end_bound = build_bound( + start_bound_is_bounded, + end_bound, + order_type.nulls_are_last(), + ); + (start_bound, end_bound) + } +} + +pub fn build_scan_ranges_from_pb( + scan_ranges: &Vec, + table_desc: &StorageTableDesc, +) -> Result> { + if scan_ranges.is_empty() { + Ok(vec![ScanRange::full()]) + } else { + Ok(scan_ranges + .iter() + .map(|scan_range| { + let pk_types = table_desc + .pk + .iter() + .map(|order| { + DataType::from( + table_desc.columns[order.column_index as usize] + .column_type + .as_ref() + .unwrap(), + ) + }) + .collect_vec(); + ScanRange::new(scan_range.clone(), pk_types) + }) + .try_collect()?) + } +} diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index da81e8ff94ddb..ded0549d11a77 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -38,8 +38,8 @@ use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_sqlparser::ast::{ - CompatibleFormatEncode, Expr, FormatEncodeOptions, Ident, ObjectName, OrderByExpr, Query, - Select, SelectItem, SetExpr, TableFactor, TableWithJoins, + CompatibleFormatEncode, FormatEncodeOptions, ObjectName, Query, Select, SelectItem, SetExpr, + TableFactor, TableWithJoins, }; use thiserror_ext::AsReport; @@ -240,22 +240,6 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query { } } -pub fn gen_query_from_table_name_order_by(from_name: ObjectName, pk_names: Vec) -> Query { - let mut query = gen_query_from_table_name(from_name); - query.order_by = pk_names - .into_iter() - .map(|pk| { - let expr = Expr::Identifier(Ident::with_quote_unchecked('"', pk)); - OrderByExpr { - expr, - asc: None, - nulls_first: None, - } - }) - .collect(); - query -} - pub fn convert_unix_millis_to_logstore_u64(unix_millis: u64) -> u64 { Epoch::from_unix_millis(unix_millis).0 } diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs index 47dd68ceb52ef..e42247a9ca585 100644 --- a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -13,12 +13,13 @@ // limitations under the License. use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::util::scan_range::ScanRange; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LogRowSeqScanNode; use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch}; use super::batch::prelude::*; -use super::utils::{childless_record, Distill}; +use super::utils::{childless_record, scan_ranges_as_strs, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb}; use crate::catalog::ColumnId; use crate::error::Result; @@ -31,19 +32,28 @@ use crate::scheduler::SchedulerResult; pub struct BatchLogSeqScan { pub base: PlanBase, core: generic::LogScan, + scan_ranges: Vec, } impl BatchLogSeqScan { - fn new_inner(core: generic::LogScan, dist: Distribution) -> Self { - let order = Order::new(core.table_desc.pk.clone()); + fn new_inner(core: generic::LogScan, dist: Distribution, scan_ranges: Vec) -> Self { + let order = if scan_ranges.len() > 1 { + Order::any() + } else { + core.get_out_column_index_order() + }; let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order); - Self { base, core } + Self { + base, + core, + scan_ranges, + } } - pub fn new(core: generic::LogScan) -> Self { + pub fn new(core: generic::LogScan, scan_ranges: Vec) -> Self { // Use `Single` by default, will be updated later with `clone_with_dist`. - Self::new_inner(core, Distribution::Single) + Self::new_inner(core, Distribution::Single, scan_ranges) } fn clone_with_dist(&self) -> Self { @@ -62,6 +72,7 @@ impl BatchLogSeqScan { } } }, + self.scan_ranges.clone(), ) } @@ -91,6 +102,17 @@ impl Distill for BatchLogSeqScan { vec.push(("old_epoch", Pretty::from(self.core.old_epoch.to_string()))); vec.push(("new_epoch", Pretty::from(self.core.new_epoch.to_string()))); vec.push(("version_id", Pretty::from(self.core.version_id.to_string()))); + if !self.scan_ranges.is_empty() { + let order_names = match verbose { + true => self.core.order_names_with_table_prefix(), + false => self.core.order_names(), + }; + let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges); + vec.push(( + "scan_ranges", + Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()), + )); + } childless_record("BatchLogSeqScan", vec) } @@ -131,6 +153,7 @@ impl TryToBatchPb for BatchLogSeqScan { }), // It's currently true. ordered: !self.order().is_any(), + scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(), })) } } @@ -144,7 +167,7 @@ impl ToLocalBatch for BatchLogSeqScan { } else { Distribution::SomeShard }; - Ok(Self::new_inner(self.core.clone(), dist).into()) + Ok(Self::new_inner(self.core.clone(), dist, self.scan_ranges.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index ecc8489f974c7..136cc8908f02c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -17,15 +17,17 @@ use std::rc::Rc; use educe::Educe; use fixedbitset::FixedBitSet; -use itertools::Itertools; use pretty_xmlish::Pretty; -use risingwave_common::catalog::{Field, Schema, TableDesc}; +use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; use risingwave_common::types::DataType; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_hummock_sdk::HummockVersionId; use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::Order; +use crate::utils::ColIndexMappingRewriteExt; const OP_NAME: &str = "op"; const OP_TYPE: DataType = DataType::Varchar; @@ -34,9 +36,7 @@ const OP_TYPE: DataType = DataType::Varchar; #[educe(PartialEq, Eq, Hash)] pub struct LogScan { pub table_name: String, - /// Include `output_col_idx_with_out_hidden` and `op_column` - pub output_col_idx_with_out_hidden: Vec, - /// Include `output_col_idx_with_out_hidden` and `op_column` and hidden pk + /// Include `output_col_idx` and `op_column` pub output_col_idx: Vec, /// Descriptor of the table pub table_desc: Rc, @@ -85,16 +85,6 @@ impl LogScan { out_column_names } - pub(crate) fn column_names_without_hidden(&self) -> Vec { - let mut out_column_names: Vec<_> = self - .output_col_idx_with_out_hidden - .iter() - .map(|&i| self.table_desc.columns[i].name.clone()) - .collect(); - out_column_names.push(OP_NAME.to_owned()); - out_column_names - } - pub fn distribution_key(&self) -> Option> { let tb_idx_to_op_idx = self .output_col_idx @@ -112,7 +102,6 @@ impl LogScan { /// Create a logical scan node for log table scan pub(crate) fn new( table_name: String, - output_col_idx_with_out_hidden: Vec, output_col_idx: Vec, table_desc: Rc, ctx: OptimizerContextRef, @@ -122,7 +111,6 @@ impl LogScan { ) -> Self { Self { table_name, - output_col_idx_with_out_hidden, output_col_idx, table_desc, chunk_size: None, @@ -162,18 +150,7 @@ impl LogScan { } pub(crate) fn out_fields(&self) -> FixedBitSet { - let mut out_fields_vec = self - .output_col_idx - .iter() - .enumerate() - .filter_map(|(index, idx)| { - if self.output_col_idx_with_out_hidden.contains(idx) { - Some(index) - } else { - None - } - }) - .collect_vec(); + let mut out_fields_vec = self.output_col_idx.clone(); // add op column out_fields_vec.push(self.output_col_idx.len()); FixedBitSet::from_iter(out_fields_vec) @@ -182,4 +159,51 @@ impl LogScan { pub(crate) fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } + + pub fn get_table_columns(&self) -> &[ColumnDesc] { + &self.table_desc.columns + } + + pub(crate) fn order_names(&self) -> Vec { + self.table_desc + .order_column_indices() + .iter() + .map(|&i| self.get_table_columns()[i].name.clone()) + .collect() + } + + pub(crate) fn order_names_with_table_prefix(&self) -> Vec { + self.table_desc + .order_column_indices() + .iter() + .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name)) + .collect() + } + + /// Return indices of fields the output is ordered by and + /// corresponding direction + pub fn get_out_column_index_order(&self) -> Order { + let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping(); + let order = Order::new( + self.table_desc + .pk + .iter() + .map(|order| { + let idx = id_to_tb_idx + .get(&self.table_desc.columns[order.column_index].column_id) + .unwrap(); + ColumnOrder::new(*idx, order.order_type) + }) + .collect(), + ); + self.i2o_col_mapping().rewrite_provided_order(&order) + } + + /// get the Mapping of columnIndex from internal column index to output column index + pub fn i2o_col_mapping(&self) -> ColIndexMapping { + ColIndexMapping::with_remaining_columns( + &self.output_col_idx, + self.get_table_columns().len(), + ) + } } diff --git a/src/frontend/src/optimizer/plan_node/generic/table_scan.rs b/src/frontend/src/optimizer/plan_node/generic/table_scan.rs index 7ef5fd8ca61e6..0abebffb844d3 100644 --- a/src/frontend/src/optimizer/plan_node/generic/table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/table_scan.rs @@ -126,6 +126,11 @@ impl TableScan { .collect() } + pub(crate) fn out_fields(&self) -> FixedBitSet { + let out_fields_vec = self.output_col_idx.clone(); + FixedBitSet::from_iter(out_fields_vec) + } + pub(crate) fn order_names(&self) -> Vec { self.table_desc .order_column_indices() diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 4f011687c7b2b..e095d476e5ba3 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -23,38 +23,37 @@ use std::time::Instant; use anyhow::anyhow; use bytes::Bytes; use futures::StreamExt; +use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType; use pgwire::types::{Format, Row}; -use risingwave_common::catalog::Field; +use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; -use risingwave_common::types::DataType; -use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue}; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::HummockVersionId; -use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; +use risingwave_sqlparser::ast::ObjectName; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::TableId; -use crate::error::{ErrorCode, Result, RwError}; +use crate::error::{ErrorCode, Result}; +use crate::expr::{ExprType, FunctionCall, InputRef, Literal}; use crate::handler::declare_cursor::create_chunk_stream_for_cursor; -use crate::handler::query::{ - gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult, -}; +use crate::handler::query::{gen_batch_plan_fragmenter, BatchQueryPlanResult}; use crate::handler::util::{ - convert_logstore_u64_to_unix_millis, gen_query_from_table_name_order_by, pg_value_format, - to_pg_field, DataChunkToRowSetAdapter, StaticSessionData, + convert_logstore_u64_to_unix_millis, pg_value_format, to_pg_field, DataChunkToRowSetAdapter, + StaticSessionData, }; use crate::handler::HandlerArgs; use crate::monitor::{CursorMetrics, PeriodicCursorMetrics}; -use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; -use crate::optimizer::property::{Order, RequiredDist}; +use crate::optimizer::plan_node::{generic, BatchFilter, BatchLogSeqScan, BatchSeqScan}; +use crate::optimizer::property::{Cardinality, Order, RequiredDist}; use crate::optimizer::PlanRoot; -use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; -use crate::{ - Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog, -}; +use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot}; +use crate::utils::Condition; +use crate::{Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog}; pub enum CursorDataChunkStream { LocalDataChunk(Option), @@ -132,7 +131,7 @@ impl Cursor { pub fn get_fields(&mut self) -> Vec { match self { - Cursor::Subscription(cursor) => cursor.fields.clone(), + Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields().clone(), Cursor::Query(cursor) => cursor.fields.clone(), } } @@ -256,6 +255,124 @@ impl Display for State { } } +struct FieldsManager { + columns_catalog: Vec, + // All row fields, including hidden pk, op, rw_timestamp and all non-hidden columns in the upstream table. + row_fields: Vec, + // Row output column indices based on `row_fields`. + row_output_col_indices: Vec, + // Row pk indices based on `row_fields`. + row_pk_indices: Vec, + // Stream chunk row indices based on `row_fields`. + stream_chunk_row_indices: Vec, + // The op index based on `row_fields`. + op_index: usize, +} + +impl FieldsManager { + // pub const OP_FIELD: Field = Field::with_name(DataType::Varchar, "op".to_owned()); + // pub const RW_TIMESTAMP_FIELD: Field = Field::with_name(DataType::Int64, "rw_timestamp".to_owned()); + + pub fn new(catalog: &TableCatalog) -> Self { + let mut row_fields = Vec::new(); + let mut row_output_col_indices = Vec::new(); + let mut row_pk_indices = Vec::new(); + let mut stream_chunk_row_indices = Vec::new(); + let mut output_idx = 0_usize; + let pk_set: HashSet = catalog + .pk + .iter() + .map(|col_order| col_order.column_index) + .collect(); + + for (index, v) in catalog.columns.iter().enumerate() { + if pk_set.contains(&index) { + row_pk_indices.push(output_idx); + stream_chunk_row_indices.push(output_idx); + row_fields.push(Field::with_name(v.data_type().clone(), v.name())); + if !v.is_hidden { + row_output_col_indices.push(output_idx); + } + output_idx += 1; + } else if !v.is_hidden { + row_output_col_indices.push(output_idx); + stream_chunk_row_indices.push(output_idx); + row_fields.push(Field::with_name(v.data_type().clone(), v.name())); + output_idx += 1; + } + } + + row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned())); + row_output_col_indices.push(output_idx); + let op_index = output_idx; + output_idx += 1; + row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned())); + row_output_col_indices.push(output_idx); + Self { + columns_catalog: catalog.columns.clone(), + row_fields, + row_output_col_indices, + row_pk_indices, + stream_chunk_row_indices, + op_index, + } + } + + pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool { + if self.columns_catalog.ne(&catalog.columns) { + *self = Self::new(catalog); + true + } else { + false + } + } + + pub fn process_output_desc_row(&self, mut rows: Vec) -> (Vec, Option) { + let last_row = rows.last_mut().map(|row| { + let mut row = row.clone(); + row.project(&self.row_pk_indices) + }); + let rows = rows + .iter_mut() + .map(|row| row.project(&self.row_output_col_indices)) + .collect(); + (rows, last_row) + } + + pub fn get_output_fields(&self) -> Vec { + self.row_output_col_indices + .iter() + .map(|&idx| self.row_fields[idx].clone()) + .collect() + } + + // In the beginning (declare cur), we will give it an empty formats, + // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client. + pub fn get_row_stream_fields_and_formats( + &self, + formats: &Vec, + from_snapshot: bool, + ) -> (Vec, Vec) { + let mut fields = Vec::new(); + let need_format = !(formats.is_empty() || formats.len() == 1); + let mut new_formats = formats.clone(); + let stream_chunk_row_indices_iter = if from_snapshot { + self.stream_chunk_row_indices.iter().chain(None) + } else { + self.stream_chunk_row_indices + .iter() + .chain(Some(&self.op_index)) + }; + for index in stream_chunk_row_indices_iter { + fields.push(self.row_fields[*index].clone()); + if need_format && !self.row_output_col_indices.contains(index) { + new_formats.insert(*index, Format::Text); + } + } + (fields, new_formats) + } +} + pub struct SubscriptionCursor { cursor_name: String, subscription: Arc, @@ -264,9 +381,10 @@ pub struct SubscriptionCursor { state: State, // fields will be set in the table's catalog when the cursor is created, // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter. - fields: Vec, + fields_manager: FieldsManager, cursor_metrics: Arc, last_fetch: Instant, + seek_pk_row: Option, } impl SubscriptionCursor { @@ -278,40 +396,46 @@ impl SubscriptionCursor { handler_args: &HandlerArgs, cursor_metrics: Arc, ) -> Result { - let (state, fields) = if let Some(start_timestamp) = start_timestamp { + let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp { let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?; - let fields = table_catalog - .columns - .iter() - .filter(|c| !c.is_hidden) - .map(|c| Field::with_name(c.data_type().clone(), c.name())) - .collect(); - let fields = Self::build_desc(fields, true); ( State::InitLogStoreQuery { seek_timestamp: start_timestamp, expected_timestamp: None, }, - fields, + FieldsManager::new(&table_catalog), ) } else { // The query stream needs to initiated on cursor creation to make sure // future fetch on the cursor starts from the snapshot when the cursor is declared. // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? - let (chunk_stream, fields, init_query_timer) = - Self::initiate_query(None, &dependent_table_id, handler_args.clone()).await?; - let pinned_epoch = handler_args - .session - .env - .hummock_snapshot_manager - .acquire() - .version() - .state_table_info - .info() - .get(&dependent_table_id) - .ok_or_else(|| anyhow!("dependent_table_id {dependent_table_id} not exists"))? - .committed_epoch; + let (chunk_stream, init_query_timer, table_catalog) = + Self::initiate_query(None, &dependent_table_id, handler_args.clone(), None).await?; + let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else( + || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()), + )? { + ReadSnapshot::FrontendPinned { snapshot, .. } => { + snapshot + .version() + .state_table_info + .info() + .get(&dependent_table_id) + .ok_or_else(|| { + anyhow!("dependent_table_id {dependent_table_id} not exists") + })? + .committed_epoch + } + ReadSnapshot::Other(_) => { + return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into()); + } + ReadSnapshot::ReadUncommitted => { + return Err(ErrorCode::InternalError( + "Fetch Cursor don't support read uncommitted".to_owned(), + ) + .into()); + } + }; let start_timestamp = pinned_epoch; ( @@ -323,7 +447,7 @@ impl SubscriptionCursor { expected_timestamp: None, init_query_timer, }, - fields, + FieldsManager::new(&table_catalog), ) }; @@ -335,9 +459,10 @@ impl SubscriptionCursor { dependent_table_id, cursor_need_drop_time, state, - fields, + fields_manager, cursor_metrics, last_fetch: Instant::now(), + seek_pk_row: None, }) } @@ -363,18 +488,22 @@ impl SubscriptionCursor { &self.subscription, ) { Ok((Some(rw_timestamp), expected_timestamp)) => { - let (mut chunk_stream, fields, init_query_timer) = + let (mut chunk_stream, init_query_timer, catalog) = Self::initiate_query( Some(rw_timestamp), &self.dependent_table_id, handler_args.clone(), + None, ) .await?; - Self::init_row_stream( - &mut chunk_stream, - formats, - &from_snapshot, + let table_schema_changed = + self.fields_manager.try_refill_fields(&catalog); + let (fields, formats) = self + .fields_manager + .get_row_stream_fields_and_formats(formats, from_snapshot); + chunk_stream.init_row_stream( &fields, + &formats, handler_args.session.clone(), ); @@ -392,8 +521,7 @@ impl SubscriptionCursor { expected_timestamp, init_query_timer, }; - if self.fields.ne(&fields) { - self.fields = fields; + if table_schema_changed { return Ok(None); } } @@ -423,21 +551,15 @@ impl SubscriptionCursor { if let Some(row) = remaining_rows.pop_front() { // 1. Fetch the next row - let new_row = row.take(); if from_snapshot { - return Ok(Some(Row::new(Self::build_row( - new_row, - None, - formats, - &session_data, - )?))); + return Ok(Some(Self::build_row(row.0, None, formats, &session_data)?)); } else { - return Ok(Some(Row::new(Self::build_row( - new_row, + return Ok(Some(Self::build_row( + row.0, Some(rw_timestamp), formats, &session_data, - )?))); + )?)); } } else { self.cursor_metrics @@ -494,13 +616,10 @@ impl SubscriptionCursor { .. } = &mut self.state { - Self::init_row_stream( - chunk_stream, - formats, - from_snapshot, - &self.fields, - session.clone(), - ); + let (fields, fotmats) = self + .fields_manager + .get_row_stream_fields_and_formats(formats, *from_snapshot); + chunk_stream.init_row_stream(&fields, &fotmats, session.clone()); } while cur < count { let fetch_cursor_timer = Instant::now(); @@ -546,9 +665,18 @@ impl SubscriptionCursor { } } self.last_fetch = Instant::now(); - let desc = self.fields.iter().map(to_pg_field).collect(); + let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans); + if let Some(seek_pk_row) = seek_pk_row { + self.seek_pk_row = Some(seek_pk_row); + } + let desc = self + .fields_manager + .get_output_fields() + .iter() + .map(to_pg_field) + .collect(); - Ok((ans, desc)) + Ok((rows, desc)) } fn get_next_rw_timestamp( @@ -589,6 +717,7 @@ impl SubscriptionCursor { Some(0), &self.dependent_table_id, handler_args, + self.seek_pk_row.clone(), ), State::Fetch { from_snapshot, @@ -600,12 +729,14 @@ impl SubscriptionCursor { None, &self.dependent_table_id, handler_args, + self.seek_pk_row.clone(), ) } else { Self::init_batch_plan_for_subscription_cursor( Some(rw_timestamp), &self.dependent_table_id, handler_args, + self.seek_pk_row.clone(), ) } } @@ -620,81 +751,53 @@ impl SubscriptionCursor { rw_timestamp: Option, dependent_table_id: &TableId, handler_args: HandlerArgs, + seek_pk_row: Option, ) -> Result { let session = handler_args.clone().session; let table_catalog = session.get_table_by_id(dependent_table_id)?; - let pks = table_catalog.pk(); let context = OptimizerContext::from_handler_args(handler_args.clone()); - if let Some(rw_timestamp) = rw_timestamp { - let version_id = { - let version = session.env.hummock_snapshot_manager.acquire(); - let version = version.version(); - if !version - .state_table_info - .info() - .contains_key(dependent_table_id) - { - return Err(anyhow!("table id {dependent_table_id} has been dropped").into()); - } - version.id - }; - Self::create_batch_plan_for_cursor( - &table_catalog, - &session, - context.into(), - rw_timestamp, - rw_timestamp, - version_id, - pks, - ) - } else { - let subscription_from_table_name = - ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); - let pk_names = pks - .iter() - .map(|f| { - Ok::( - table_catalog - .columns - .get(f.column_index) - .ok_or_else(|| { - anyhow!( - "columns not find in table schema, index is {:?}", - f.column_index - ) - })? - .name() - .to_owned(), - ) - }) - .collect::>>()?; - let query_stmt = Statement::Query(Box::new(gen_query_from_table_name_order_by( - subscription_from_table_name, - pk_names, - ))); - gen_batch_plan_by_statement(&session, context.into(), query_stmt) - } + let version_id = { + let version = session.env.hummock_snapshot_manager.acquire(); + let version = version.version(); + if !version + .state_table_info + .info() + .contains_key(dependent_table_id) + { + return Err(anyhow!("table id {dependent_table_id} has been dropped").into()); + } + version.id + }; + Self::create_batch_plan_for_cursor( + table_catalog, + &session, + context.into(), + rw_timestamp, + rw_timestamp, + version_id, + seek_pk_row, + ) } async fn initiate_query( rw_timestamp: Option, dependent_table_id: &TableId, handler_args: HandlerArgs, - ) -> Result<(CursorDataChunkStream, Vec, Instant)> { + seek_pk_row: Option, + ) -> Result<(CursorDataChunkStream, Instant, Arc)> { let init_query_timer = Instant::now(); + let session = handler_args.clone().session; + let table_catalog = session.get_table_by_id(dependent_table_id)?; let plan_result = Self::init_batch_plan_for_subscription_cursor( rw_timestamp, dependent_table_id, handler_args.clone(), + seek_pk_row, )?; let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?; - let (chunk_stream, fields) = + let (chunk_stream, _) = create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?; - Ok(( - chunk_stream, - Self::build_desc(fields, rw_timestamp.is_none()), - init_query_timer, - )) + Ok((chunk_stream, init_query_timer, table_catalog)) } async fn try_refill_remaining_rows( @@ -714,7 +817,7 @@ impl SubscriptionCursor { rw_timestamp: Option, formats: &Vec, session_data: &StaticSessionData, - ) -> Result>> { + ) -> Result { let row_len = row.len(); let new_row = if let Some(rw_timestamp) = rw_timestamp { let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text); @@ -737,7 +840,7 @@ impl SubscriptionCursor { vec![Some(op), None] }; row.extend(new_row); - Ok(row) + Ok(Row(row)) } pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { @@ -749,13 +852,13 @@ impl SubscriptionCursor { } pub fn create_batch_plan_for_cursor( - table_catalog: &TableCatalog, + table_catalog: Arc, session: &SessionImpl, context: OptimizerContextRef, - old_epoch: u64, - new_epoch: u64, + old_epoch: Option, + new_epoch: Option, version_id: HummockVersionId, - pks: &[ColumnOrder], + seek_pk_rows: Option, ) -> Result { // pk + all column without hidden let output_col_idx = table_catalog @@ -770,33 +873,105 @@ impl SubscriptionCursor { } }) .collect::>(); - let output_col_idx_with_out_hidden = output_col_idx + let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64; + let pks = table_catalog.pk(); + let pks = pks .iter() - .filter(|index| !table_catalog.columns[**index].is_hidden) - .cloned() - .collect::>(); - let core = generic::LogScan::new( - table_catalog.name.clone(), - output_col_idx_with_out_hidden, - output_col_idx, - Rc::new(table_catalog.table_desc()), - context, - old_epoch, - new_epoch, - version_id, - ); + .map(|f| { + let pk = table_catalog.columns.get(f.column_index).unwrap(); + (pk.data_type(), f.column_index) + }) + .collect_vec(); + let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows { + let mut pk_rows = vec![]; + let mut values = vec![]; + for (seek_pk, (data_type, column_index)) in + seek_pk_rows.0.into_iter().zip_eq_fast(pks.into_iter()) + { + if let Some(seek_pk) = seek_pk { + pk_rows.push(InputRef { + index: column_index, + data_type: data_type.clone(), + }); + let value_string = String::from_utf8(seek_pk.clone().into()).unwrap(); + let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap(); + values.push((Some(value_data), data_type.clone())); + } + } + if pk_rows.is_empty() { + (vec![], None) + } else { + let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip(); + let right_data = ScalarImpl::Struct(StructValue::new(right_data)); + let right_type = DataType::Struct(StructType::unnamed(right_types)); + let left = FunctionCall::new_unchecked( + ExprType::Row, + pk_rows.into_iter().map(|pk| pk.into()).collect(), + right_type.clone(), + ); + let right = Literal::new(Some(right_data), right_type); + let (scan, predicate) = Condition { + conjunctions: vec![FunctionCall::new( + ExprType::GreaterThan, + vec![left.into(), right.into()], + )? + .into()], + } + .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; + (scan, Some(predicate)) + } + } else { + (vec![], None) + }; - let batch_log_seq_scan = BatchLogSeqScan::new(core); + let (seq_scan, out_fields, out_names) = if old_epoch.is_some() && new_epoch.is_some() { + let core = generic::LogScan::new( + table_catalog.name.clone(), + output_col_idx, + Rc::new(table_catalog.table_desc()), + context, + old_epoch.unwrap(), + new_epoch.unwrap(), + version_id, + ); + let batch_log_seq_scan = BatchLogSeqScan::new(core, scan); + let out_fields = batch_log_seq_scan.core().out_fields(); + let out_names = batch_log_seq_scan.core().column_names(); + (batch_log_seq_scan.into(), out_fields, out_names) + } else { + assert!(old_epoch.is_none() && new_epoch.is_none()); + let core = generic::TableScan::new( + table_catalog.name.clone(), + output_col_idx, + table_catalog.clone(), + vec![], + context, + Condition { + conjunctions: vec![], + }, + None, + Cardinality::default(), + ); + let table_scan = BatchSeqScan::new(core, scan, None); + let out_fields = table_scan.core().out_fields(); + let out_names = table_scan.core().column_names(); + (table_scan.into(), out_fields, out_names) + }; - let out_fields = batch_log_seq_scan.core().out_fields(); - let out_names = batch_log_seq_scan.core().column_names_without_hidden(); + let plan = if let Some(predicate) = predicate + && !predicate.always_true() + { + BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into() + } else { + seq_scan + }; // order by pk, so don't need to sort - let order = Order::new(pks.to_vec()); + let order = Order::new(table_catalog.pk().to_vec()); // Here we just need a plan_root to call the method, only out_fields and out_names will be used let plan_root = PlanRoot::new_with_batch_plan( - PlanRef::from(batch_log_seq_scan.clone()), + plan, RequiredDist::single(), order, out_fields, @@ -821,26 +996,6 @@ impl SubscriptionCursor { }) } - // In the beginning (declare cur), we will give it an empty formats, - // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client. - pub fn init_row_stream( - chunk_stream: &mut CursorDataChunkStream, - formats: &Vec, - from_snapshot: &bool, - fields: &Vec, - session: Arc, - ) { - let mut formats = formats.clone(); - let mut fields = fields.clone(); - formats.pop(); - fields.pop(); - if *from_snapshot { - formats.pop(); - fields.pop(); - } - chunk_stream.init_row_stream(&fields, &formats, session); - } - pub fn idle_duration(&self) -> Duration { self.last_fetch.elapsed() } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 2e8511af1c071..10c64019a3c56 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -927,19 +927,10 @@ impl StorageTableInner { &self, start_epoch: u64, end_epoch: HummockReadEpoch, - start_pk: Option<&OwnedRow>, + encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>), vnode: VirtualNode, ) -> StorageResult>> { - let start_bound = if let Some(start_pk) = start_pk { - let mut bytes = BytesMut::new(); - self.pk_serializer.serialize(start_pk, &mut bytes); - let bytes = bytes.freeze(); - Included(bytes) - } else { - Unbounded - }; - let table_key_range = - prefixed_range_with_vnode::<&Bytes>((start_bound.as_ref(), Unbounded), vnode); + let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode); let read_options = ReadLogOptions { table_id: self.table_id, }; @@ -965,8 +956,21 @@ impl StorageTableInner { start_pk: Option<&OwnedRow>, vnode: VirtualNode, ) -> StorageResult>> { + let start_bound = if let Some(start_pk) = start_pk { + let mut bytes = BytesMut::new(); + self.pk_serializer.serialize(start_pk, &mut bytes); + let bytes = bytes.freeze(); + Included(bytes) + } else { + Unbounded + }; let stream = self - .batch_iter_log_inner::<()>(start_epoch, end_epoch, start_pk, vnode) + .batch_iter_log_inner::<()>( + start_epoch, + end_epoch, + (start_bound.as_ref(), Unbounded), + vnode, + ) .await?; Ok(stream.map_ok(|(_, row)| row)) } @@ -976,11 +980,29 @@ impl StorageTableInner { start_epoch: u64, end_epoch: HummockReadEpoch, ordered: bool, + range_bounds: impl RangeBounds, + pk_prefix: impl Row, ) -> StorageResult> + Send + 'static> { + let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); + let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec(); build_vnode_stream( - |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode), - |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode), + |vnode| { + self.batch_iter_log_inner( + start_epoch, + end_epoch, + (start_key.as_ref(), end_key.as_ref()), + vnode, + ) + }, + |vnode| { + self.batch_iter_log_inner( + start_epoch, + end_epoch, + (start_key.as_ref(), end_key.as_ref()), + vnode, + ) + }, &vnodes, ordered, ) diff --git a/src/utils/pgwire/src/types.rs b/src/utils/pgwire/src/types.rs index e8d7dc52101aa..e4df4699fda27 100644 --- a/src/utils/pgwire/src/types.rs +++ b/src/utils/pgwire/src/types.rs @@ -23,7 +23,7 @@ use crate::error::{PsqlError, PsqlResult}; /// A row of data returned from the database by a query. #[derive(Debug, Clone)] // NOTE: Since we only support simple query protocol, the values are represented as strings. -pub struct Row(Vec>); +pub struct Row(pub Vec>); impl Row { /// Create a row from values. @@ -49,6 +49,14 @@ impl Row { pub fn take(self) -> Vec> { self.0 } + + pub fn project(&mut self, indices: &[usize]) -> Row { + let mut new_row = Vec::with_capacity(indices.len()); + for i in indices { + new_row.push(self.0[*i].take()); + } + Row(new_row) + } } impl Index for Row {