Skip to content

Commit

Permalink
sinkv2(cdc): fix sink worker for splitTxn is false (#7988)
Browse files Browse the repository at this point in the history
close #7990
  • Loading branch information
hicqu authored Dec 31, 2022
1 parent 1d6d79f commit 73bb333
Showing 1 changed file with 19 additions and 23 deletions.
42 changes: 19 additions & 23 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
lastTxnCommitTs := uint64(0) // Can be used in `advanceTableSink`
currTxnCommitTs := uint64(0) // Can be used in `advanceTableSinkWithBatchID`.
events := make([]*model.RowChangedEvent, 0, 1024)

// batchID is used to advance table sink with a given CommitTs, even if not all
// transactions with the same CommitTs are collected, regardless of whether splitTxn
// is enabled or not. We split transactions with the same CommitTs even if splitTxn
// is false, and it won't break transaction atomicity to downstreams.
batchID := uint64(1)

if w.eventCache != nil {
Expand Down Expand Up @@ -142,31 +147,33 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
zap.Uint64("currTxnCommitTs", currTxnCommitTs),
zap.Uint64("lastTxnCommitTs", lastTxnCommitTs),
zap.Bool("isLastTime", isLastTime))
if w.splitTxn {
if currTxnCommitTs == 0 {
return
}
if currTxnCommitTs == lastPos.CommitTs && lastPos.IsCommitFence() {
if currTxnCommitTs == lastPos.CommitTs {
if lastPos.IsCommitFence() {
// All transactions before currTxnCommitTs are resolved.
err = w.advanceTableSink(task, currTxnCommitTs, committedTxnSize+pendingTxnSize)
} else {
// This will advance some complete transactions before currTxnCommitTs,
// and one partail transaction with `batchID`.
// This means all events of the currenet transaction have been fetched, but we can't
// ensure whether there are more transaction with the same CommitTs or not.
err = w.advanceTableSinkWithBatchID(task, currTxnCommitTs, committedTxnSize+pendingTxnSize, batchID)
batchID += 1
}
committedTxnSize = 0
pendingTxnSize = 0
} else {
if lastTxnCommitTs == 0 {
return
}
} else if w.splitTxn && currTxnCommitTs > 0 {
// This branch will advance some complete transactions before currTxnCommitTs,
// and one partail transaction with `batchID`.
err = w.advanceTableSinkWithBatchID(task, currTxnCommitTs, committedTxnSize+pendingTxnSize, batchID)
batchID += 1
committedTxnSize = 0
pendingTxnSize = 0
} else if !w.splitTxn && lastTxnCommitTs > 0 {
err = w.advanceTableSink(task, lastTxnCommitTs, committedTxnSize)
committedTxnSize = 0
// It's the last time we call `doEmitAndAdvance`, but `pendingTxnSize`
// hasn't been recorded yet. To avoid losing it, record it manually.
if isLastTime && pendingTxnSize > 0 {
w.memQuota.record(task.tableID, model.NewResolvedTs(currTxnCommitTs), pendingTxnSize)
pendingTxnSize = 0
}
}
return
Expand Down Expand Up @@ -296,8 +303,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
lastPos = upperBound
currTxnCommitTs = upperBound.CommitTs
lastTxnCommitTs = upperBound.CommitTs
committedTxnSize += pendingTxnSize
pendingTxnSize = 0
return maybeEmitAndAdvance(true, true)
}
allEventCount += 1
Expand All @@ -312,11 +317,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
pendingTxnSize = 0
batchID = 1
}
if pos.IsCommitFence() {
lastTxnCommitTs = currTxnCommitTs
committedTxnSize += pendingTxnSize
pendingTxnSize = 0
}

// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
Expand All @@ -329,11 +329,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
events = append(events, x...)
allEventSize += size
usedMem += size
if pos.IsCommitFence() {
committedTxnSize += size
} else {
pendingTxnSize += size
}
pendingTxnSize += size
}

if err := maybeEmitAndAdvance(false, pos.Valid()); err != nil {
Expand Down

0 comments on commit 73bb333

Please sign in to comment.