From bc03cb1ab56b4a8a5868df7b7fde2d7cc81d8be3 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 1 Jul 2022 12:52:51 +0800 Subject: [PATCH] address comments --- cdc/sink/flowcontrol/flow_control.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 039d33418f1..0f1e0ce5050 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -101,7 +101,7 @@ func (c *TableFlowController) Consume( commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) blockingCallBack := func() (err error) { - if commitTs != lastCommitTs || c.splitTxn { + if commitTs > lastCommitTs || c.splitTxn { // Call `callback` in two condition: // 1. commitTs > lastCommitTs, handle new txn and send a normal resolved ts // 2. commitTs == lastCommitTs && splitTxn = true, split the same txn and