Skip to content

Commit

Permalink
chore(test): prevent test compile OOM
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Jun 6, 2022
1 parent 5a5de58 commit afcb460
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(generators)]
#![feature(proc_macro_hygiene, stmt_expr_attributes)]

use std::sync::Arc;

use futures::stream::StreamExt;
Expand All @@ -24,20 +25,20 @@ use risingwave_batch::executor::{
BoxedDataChunkStream, BoxedExecutor, DeleteExecutor, Executor as BatchExecutor, InsertExecutor,
RowSeqScanExecutor,
};
use risingwave_common::array::{Array, DataChunk, F64Array, I64Array};
use risingwave_common::array::{Array, DataChunk, F64Array, I64Array, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::column_nonnull;
use risingwave_common::error::{Result, RwError};
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::IntoOrdered;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::monitor::StateStoreMetrics;
use risingwave_storage::table::cell_based_table::CellBasedTable;
use risingwave_storage::table::state_table::StateTable;
use risingwave_storage::Keyspace;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::{
Expand Down Expand Up @@ -86,6 +87,8 @@ impl SingleChunkExecutor {
/// insertion, deletion, and materialization.
#[tokio::test]
async fn test_table_v2_materialize() -> Result<()> {
use risingwave_pb::data::DataType;

let memory_state_store = MemoryStateStore::new();
let source_manager = Arc::new(MemSourceManager::default());
let source_table_id = TableId::default();
Expand Down Expand Up @@ -350,3 +353,116 @@ async fn test_table_v2_materialize() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_row_seq_scan() -> Result<()> {
// In this test we test if the memtable can be correctly scanned for K-V pair insertions.
let memory_state_store = MemoryStateStore::new();
let keyspace = Keyspace::executor_root(memory_state_store.clone(), 0x42);

let schema = Schema::new(vec![
Field::unnamed(DataType::Int32), // pk
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int64),
]);
let _column_ids = vec![ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)];

let column_descs = vec![
ColumnDesc::unnamed(ColumnId::from(0), schema[0].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(1), schema[1].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
];

let mut state = StateTable::new(
keyspace.clone(),
column_descs.clone(),
vec![OrderType::Ascending],
None,
vec![0_usize],
);
let table = CellBasedTable::new_adhoc(
keyspace,
column_descs,
Arc::new(StateStoreMetrics::unused()),
);

let epoch: u64 = 0;

state
.insert(
&Row(vec![Some(1_i32.into())]),
Row(vec![
Some(1_i32.into()),
Some(4_i32.into()),
Some(7_i64.into()),
]),
)
.unwrap();
state
.insert(
&Row(vec![Some(2_i32.into())]),
Row(vec![
Some(2_i32.into()),
Some(5_i32.into()),
Some(8_i64.into()),
]),
)
.unwrap();
state.commit(epoch).await.unwrap();

let executor = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
table.iter(u64::MAX).await.unwrap(),
1,
true,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
));

assert_eq!(executor.schema().fields().len(), 3);

let mut stream = executor.execute();
let res_chunk = stream.next().await.unwrap().unwrap();

assert_eq!(res_chunk.dimension(), 3);
assert_eq!(
res_chunk
.column_at(0)
.array()
.as_int32()
.iter()
.collect::<Vec<_>>(),
vec![Some(1)]
);
assert_eq!(
res_chunk
.column_at(1)
.array()
.as_int32()
.iter()
.collect::<Vec<_>>(),
vec![Some(4)]
);

let res_chunk2 = stream.next().await.unwrap().unwrap();
assert_eq!(res_chunk2.dimension(), 3);
assert_eq!(
res_chunk2
.column_at(0)
.array()
.as_int32()
.iter()
.collect::<Vec<_>>(),
vec![Some(2)]
);
assert_eq!(
res_chunk2
.column_at(1)
.array()
.as_int32()
.iter()
.collect::<Vec<_>>(),
vec![Some(5)]
);
Ok(())
}
141 changes: 0 additions & 141 deletions src/compute/tests/row_seq_scan.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/frontend/test_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ anyhow = "1"
console = "0.15"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.10"
libtest-mimic = "0.4"
madsim = "=0.2.0-alpha.3"
risingwave_frontend = { path = ".." }
risingwave_sqlparser = { path = "../../sqlparser" }
Expand All @@ -29,6 +28,7 @@ walkdir = "2"
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

[dev-dependencies]
libtest-mimic = "0.4"
tempfile = "3"

[build-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/test_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

mod resolve_id;

use std::collections::HashMap;
use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct TestCase {
pub create_source: Option<CreateSource>,

/// Provide config map to frontend
pub with_config_map: Option<HashMap<String, String>>,
pub with_config_map: Option<BTreeMap<String, String>>,
}

#[serde_with::skip_serializing_none]
Expand Down

0 comments on commit afcb460

Please sign in to comment.