Skip to content

Commit

Permalink
fix all topn ut
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 19, 2024
1 parent 578c81c commit 92bd5e0
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 392 deletions.
160 changes: 66 additions & 94 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ where
mod tests {
use std::sync::atomic::AtomicU64;

use assert_matches::assert_matches;
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::catalog::Field;
use risingwave_common::hash::SerializedKey;
Expand Down Expand Up @@ -373,7 +372,7 @@ mod tests {
)
.await;
let schema = source.schema().clone();
let top_n_executor = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
let top_n = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source,
ActorContext::for_test(0),
schema,
Expand All @@ -385,73 +384,64 @@ mod tests {
Arc::new(AtomicU64::new(0)),
)
.unwrap();
let mut top_n_executor = top_n_executor.boxed().execute();
let mut top_n = top_n.boxed().execute();

// consume the init barrier
top_n_executor.next().await.unwrap().unwrap();
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 10 9 1
+ 8 8 2
+ 7 8 2
+ 9 1 1
+ 10 1 1
",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 10 9 1
- 8 8 2
- 10 1 1
+ 8 1 3
",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 7 8 2
- 8 1 3
- 9 1 1
",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 5 1 1
+ 2 1 1
",
),
)
.sort_rows(),
);
}

Expand All @@ -469,7 +459,7 @@ mod tests {
)
.await;
let schema = source.schema().clone();
let top_n_executor = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
let top_n = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source,
ActorContext::for_test(0),
schema,
Expand All @@ -481,66 +471,57 @@ mod tests {
Arc::new(AtomicU64::new(0)),
)
.unwrap();
let mut top_n_executor = top_n_executor.boxed().execute();
let mut top_n = top_n.boxed().execute();

// consume the init barrier
top_n_executor.next().await.unwrap().unwrap();
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 8 8 2
+ 10 1 1
+ 8 1 3
",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 8 8 2
- 10 1 1
",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 8 1 3",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 5 1 1
+ 3 1 2
",
),
)
.sort_rows(),
);
}

Expand All @@ -558,7 +539,7 @@ mod tests {
)
.await;
let schema = source.schema().clone();
let top_n_executor = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
let top_n = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source,
ActorContext::for_test(0),
schema,
Expand All @@ -570,71 +551,62 @@ mod tests {
Arc::new(AtomicU64::new(0)),
)
.unwrap();
let mut top_n_executor = top_n_executor.boxed().execute();
let mut top_n = top_n.boxed().execute();

// consume the init barrier
top_n_executor.next().await.unwrap().unwrap();
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 10 9 1
+ 8 8 2
+ 7 8 2
+ 9 1 1
+ 10 1 1
+ 8 1 3",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 10 9 1
- 8 8 2
- 10 1 1",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 7 8 2
- 8 1 3
- 9 1 1",
),
)
.sort_rows(),
);

// barrier
assert_matches!(
top_n_executor.next().await.unwrap().unwrap(),
Message::Barrier(_)
);
let res = top_n_executor.next().await.unwrap().unwrap();
top_n.expect_barrier().await;
assert_eq!(
res.as_chunk().unwrap(),
&StreamChunk::from_pretty(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 5 1 1
+ 2 1 1
+ 3 1 2
+ 4 1 3",
),
)
.sort_rows(),
);
}

Expand Down
Loading

0 comments on commit 92bd5e0

Please sign in to comment.