Skip to content

Commit

Permalink
sink(ticdc): use slice in txnsWithTheSameCommitTs to support splittin…
Browse files Browse the repository at this point in the history
…g transaction (#5203)

ref #5231
  • Loading branch information
CharlesCheung96 authored Apr 26, 2022
1 parent a7bcd92 commit dec3e20
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 155 deletions.
23 changes: 23 additions & 0 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,26 @@ func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *Polymorphi
func (e *PolymorphicEvent) RegionID() uint64 {
return e.RawKV.RegionID
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
if i.RawKV.OpType == OpTypeResolved {
return false
} else if j.RawKV.OpType == OpTypeResolved {
return true
}

if i.StartTs > j.StartTs {
return false
} else if i.StartTs < j.StartTs {
return true
}

if i.RawKV.OpType == OpTypeDelete && j.RawKV.OpType != OpTypeDelete {
return true
}
}
return i.CRTs < j.CRTs
}
22 changes: 12 additions & 10 deletions cdc/sink/mysql/txn_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type txnsWithTheSameCommitTs struct {
txns map[model.Ts]*model.SingleTableTxn
txns []*model.SingleTableTxn
commitTs model.Ts
}

Expand All @@ -34,18 +34,22 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
zap.Uint64("commitTs of txn", t.commitTs),
zap.Any("row", row))
}
if t.txns == nil {
t.txns = make(map[model.Ts]*model.SingleTableTxn)
}
txn, exist := t.txns[row.StartTs]
if !exist {

var txn *model.SingleTableTxn
if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs {
txn = &model.SingleTableTxn{
StartTs: row.StartTs,
CommitTs: row.CommitTs,
Table: row.Table,
ReplicaID: row.ReplicaID,
}
t.txns[row.StartTs] = txn
t.txns = append(t.txns, txn)
} else if t.txns[len(t.txns)-1].StartTs == row.StartTs {
txn = t.txns[len(t.txns)-1]
} else {
log.Panic("Row changed event received by the sink module should be ordered",
zap.Any("previousTxn", t.txns[len(t.txns)-1]),
zap.Any("currentRow", row))
}
txn.Append(row)
}
Expand Down Expand Up @@ -149,9 +153,7 @@ func splitResolvedTxn(
}
resolvedTxns := make([]*model.SingleTableTxn, 0, txnsLength)
for _, txns := range resolvedTxnsWithTheSameCommitTs {
for _, txn := range txns.txns {
resolvedTxns = append(resolvedTxns, txn)
}
resolvedTxns = append(resolvedTxns, txns.txns...)
}
resolvedRowsMap[tableID] = resolvedTxns
flushedResolvedTsMap[tableID] = resolvedTs
Expand Down
Loading

0 comments on commit dec3e20

Please sign in to comment.