From 9a01f67f7d6b84d57382e5b47df1fdea14249897 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 12 Nov 2024 15:14:13 +0800 Subject: [PATCH] refactor(stream): add more assertion on update op handling in hash dispatcher Signed-off-by: Bugen Zhao --- src/stream/src/executor/dispatch.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 39f561633b8dd..609fed1be038f 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -787,6 +787,10 @@ impl Dispatcher for HashDataDispatcher { } if !visible { + assert!( + last_vnode_when_update_delete.is_none(), + "invisible row between U- and U+, op = {op:?}", + ); new_ops.push(op); continue; } @@ -797,7 +801,11 @@ impl Dispatcher for HashDataDispatcher { if op == Op::UpdateDelete { last_vnode_when_update_delete = Some(vnode); } else if op == Op::UpdateInsert { - if vnode != last_vnode_when_update_delete.unwrap() { + if vnode + != last_vnode_when_update_delete + .take() + .expect("missing U- before U+") + { new_ops.push(Op::Delete); new_ops.push(Op::Insert); } else { @@ -808,6 +816,10 @@ impl Dispatcher for HashDataDispatcher { new_ops.push(op); } } + assert!( + last_vnode_when_update_delete.is_none(), + "missing U+ after U-" + ); let ops = new_ops;