diff --git a/src/compute/tests/table_v2_materialize.rs b/src/compute/tests/integration_tests.rs similarity index 78% rename from src/compute/tests/table_v2_materialize.rs rename to src/compute/tests/integration_tests.rs index 3bbe7b952de81..141c47152e8a6 100644 --- a/src/compute/tests/table_v2_materialize.rs +++ b/src/compute/tests/integration_tests.rs @@ -14,6 +14,7 @@ #![feature(generators)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] + use std::sync::Arc; use futures::stream::StreamExt; @@ -24,20 +25,20 @@ use risingwave_batch::executor::{ BoxedDataChunkStream, BoxedExecutor, DeleteExecutor, Executor as BatchExecutor, InsertExecutor, RowSeqScanExecutor, }; -use risingwave_common::array::{Array, DataChunk, F64Array, I64Array}; +use risingwave_common::array::{Array, DataChunk, F64Array, I64Array, Row}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::column_nonnull; use risingwave_common::error::{Result, RwError}; use risingwave_common::test_prelude::DataChunkTestExt; -use risingwave_common::types::IntoOrdered; +use risingwave_common::types::{DataType, IntoOrdered}; use risingwave_common::util::sort_util::{OrderPair, OrderType}; use risingwave_pb::data::data_type::TypeName; -use risingwave_pb::data::DataType; use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc; use risingwave_source::{MemSourceManager, SourceManager}; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::monitor::StateStoreMetrics; use risingwave_storage::table::cell_based_table::CellBasedTable; +use risingwave_storage::table::state_table::StateTable; use risingwave_storage::Keyspace; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::executor::{ @@ -86,6 +87,8 @@ impl SingleChunkExecutor { /// insertion, deletion, and materialization. #[tokio::test] async fn test_table_v2_materialize() -> Result<()> { + use risingwave_pb::data::DataType; + let memory_state_store = MemoryStateStore::new(); let source_manager = Arc::new(MemSourceManager::default()); let source_table_id = TableId::default(); @@ -350,3 +353,116 @@ async fn test_table_v2_materialize() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_row_seq_scan() -> Result<()> { + // In this test we test if the memtable can be correctly scanned for K-V pair insertions. + let memory_state_store = MemoryStateStore::new(); + let keyspace = Keyspace::executor_root(memory_state_store.clone(), 0x42); + + let schema = Schema::new(vec![ + Field::unnamed(DataType::Int32), // pk + Field::unnamed(DataType::Int32), + Field::unnamed(DataType::Int64), + ]); + let _column_ids = vec![ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; + + let column_descs = vec![ + ColumnDesc::unnamed(ColumnId::from(0), schema[0].data_type.clone()), + ColumnDesc::unnamed(ColumnId::from(1), schema[1].data_type.clone()), + ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()), + ]; + + let mut state = StateTable::new( + keyspace.clone(), + column_descs.clone(), + vec![OrderType::Ascending], + None, + vec![0_usize], + ); + let table = CellBasedTable::new_adhoc( + keyspace, + column_descs, + Arc::new(StateStoreMetrics::unused()), + ); + + let epoch: u64 = 0; + + state + .insert( + &Row(vec![Some(1_i32.into())]), + Row(vec![ + Some(1_i32.into()), + Some(4_i32.into()), + Some(7_i64.into()), + ]), + ) + .unwrap(); + state + .insert( + &Row(vec![Some(2_i32.into())]), + Row(vec![ + Some(2_i32.into()), + Some(5_i32.into()), + Some(8_i64.into()), + ]), + ) + .unwrap(); + state.commit(epoch).await.unwrap(); + + let executor = Box::new(RowSeqScanExecutor::new( + table.schema().clone(), + table.iter(u64::MAX).await.unwrap(), + 1, + true, + "RowSeqScanExecutor2".to_string(), + Arc::new(BatchMetrics::unused()), + )); + + assert_eq!(executor.schema().fields().len(), 3); + + let mut stream = executor.execute(); + let res_chunk = stream.next().await.unwrap().unwrap(); + + assert_eq!(res_chunk.dimension(), 3); + assert_eq!( + res_chunk + .column_at(0) + .array() + .as_int32() + .iter() + .collect::>(), + vec![Some(1)] + ); + assert_eq!( + res_chunk + .column_at(1) + .array() + .as_int32() + .iter() + .collect::>(), + vec![Some(4)] + ); + + let res_chunk2 = stream.next().await.unwrap().unwrap(); + assert_eq!(res_chunk2.dimension(), 3); + assert_eq!( + res_chunk2 + .column_at(0) + .array() + .as_int32() + .iter() + .collect::>(), + vec![Some(2)] + ); + assert_eq!( + res_chunk2 + .column_at(1) + .array() + .as_int32() + .iter() + .collect::>(), + vec![Some(5)] + ); + Ok(()) +} diff --git a/src/compute/tests/row_seq_scan.rs b/src/compute/tests/row_seq_scan.rs deleted file mode 100644 index 6be0f4bfa5c77..0000000000000 --- a/src/compute/tests/row_seq_scan.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::sync::Arc; - -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use futures::StreamExt; -use risingwave_batch::executor::monitor::BatchMetrics; -use risingwave_batch::executor::{Executor, RowSeqScanExecutor}; -use risingwave_common::array::{Array, Row}; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; -use risingwave_common::error::Result; -use risingwave_common::types::DataType; -use risingwave_common::util::sort_util::OrderType; -use risingwave_storage::memory::MemoryStateStore; -use risingwave_storage::monitor::StateStoreMetrics; -use risingwave_storage::table::cell_based_table::CellBasedTable; -use risingwave_storage::table::state_table::StateTable; -use risingwave_storage::Keyspace; - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn test_row_seq_scan() -> Result<()> { - // In this test we test if the memtable can be correctly scanned for K-V pair insertions. - let memory_state_store = MemoryStateStore::new(); - let keyspace = Keyspace::executor_root(memory_state_store.clone(), 0x42); - - let schema = Schema::new(vec![ - Field::unnamed(DataType::Int32), // pk - Field::unnamed(DataType::Int32), - Field::unnamed(DataType::Int64), - ]); - let _column_ids = vec![ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; - - let column_descs = vec![ - ColumnDesc::unnamed(ColumnId::from(0), schema[0].data_type.clone()), - ColumnDesc::unnamed(ColumnId::from(1), schema[1].data_type.clone()), - ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()), - ]; - - let mut state = StateTable::new( - keyspace.clone(), - column_descs.clone(), - vec![OrderType::Ascending], - None, - vec![0_usize], - ); - let table = CellBasedTable::new_adhoc( - keyspace, - column_descs, - Arc::new(StateStoreMetrics::unused()), - ); - - let epoch: u64 = 0; - - state - .insert( - &Row(vec![Some(1_i32.into())]), - Row(vec![ - Some(1_i32.into()), - Some(4_i32.into()), - Some(7_i64.into()), - ]), - ) - .unwrap(); - state - .insert( - &Row(vec![Some(2_i32.into())]), - Row(vec![ - Some(2_i32.into()), - Some(5_i32.into()), - Some(8_i64.into()), - ]), - ) - .unwrap(); - state.commit(epoch).await.unwrap(); - - let executor = Box::new(RowSeqScanExecutor::new( - table.schema().clone(), - table.iter(u64::MAX).await.unwrap(), - 1, - true, - "RowSeqScanExecutor2".to_string(), - Arc::new(BatchMetrics::unused()), - )); - - assert_eq!(executor.schema().fields().len(), 3); - - let mut stream = executor.execute(); - let res_chunk = stream.next().await.unwrap().unwrap(); - - assert_eq!(res_chunk.dimension(), 3); - assert_eq!( - res_chunk - .column_at(0) - .array() - .as_int32() - .iter() - .collect::>(), - vec![Some(1)] - ); - assert_eq!( - res_chunk - .column_at(1) - .array() - .as_int32() - .iter() - .collect::>(), - vec![Some(4)] - ); - - let res_chunk2 = stream.next().await.unwrap().unwrap(); - assert_eq!(res_chunk2.dimension(), 3); - assert_eq!( - res_chunk2 - .column_at(0) - .array() - .as_int32() - .iter() - .collect::>(), - vec![Some(2)] - ); - assert_eq!( - res_chunk2 - .column_at(1) - .array() - .as_int32() - .iter() - .collect::>(), - vec![Some(5)] - ); - Ok(()) -} diff --git a/src/frontend/test_runner/Cargo.toml b/src/frontend/test_runner/Cargo.toml index e0d040bb6e433..40c3a7ac60b3f 100644 --- a/src/frontend/test_runner/Cargo.toml +++ b/src/frontend/test_runner/Cargo.toml @@ -9,7 +9,6 @@ anyhow = "1" console = "0.15" futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.10" -libtest-mimic = "0.4" madsim = "=0.2.0-alpha.3" risingwave_frontend = { path = ".." } risingwave_sqlparser = { path = "../../sqlparser" } @@ -29,6 +28,7 @@ walkdir = "2" workspace-hack = { version = "0.1", path = "../../workspace-hack" } [dev-dependencies] +libtest-mimic = "0.4" tempfile = "3" [build-dependencies] diff --git a/src/frontend/test_runner/src/lib.rs b/src/frontend/test_runner/src/lib.rs index 87fe84753a950..2a265690504ad 100644 --- a/src/frontend/test_runner/src/lib.rs +++ b/src/frontend/test_runner/src/lib.rs @@ -18,7 +18,7 @@ mod resolve_id; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::sync::Arc; use anyhow::{anyhow, Result}; @@ -87,7 +87,7 @@ pub struct TestCase { pub create_source: Option, /// Provide config map to frontend - pub with_config_map: Option>, + pub with_config_map: Option>, } #[serde_with::skip_serializing_none]