Skip to content

Commit

Permalink
sink(ticdc): store checkpointTs by flushedResolvedTsMap in mysql sink
Browse files Browse the repository at this point in the history
close #5107
  • Loading branch information
CharlesCheung96 committed Jun 21, 2022
1 parent 48e2761 commit 6286415
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 160 deletions.
30 changes: 18 additions & 12 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,39 +111,46 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha
func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
if len(c.unresolvedTxns) == 0 {
return nil, nil
}

return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns)
}

func splitResolvedTxn(
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
) (checkpointTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
var (
ok bool
txnsLength int
txns []*txnsWithTheSameCommitTs
resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs
)

checkpointTsMap = make(map[model.TableID]uint64, len(unresolvedTxns))
resolvedTsMap.Range(func(k, v interface{}) bool {
tableID := k.(model.TableID)
resolvedTs := v.(model.Ts)
checkpointTsMap[tableID] = resolvedTs
return true
})

resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns))
flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns))
for tableID, txns := range unresolvedTxns {
v, ok := resolvedTsMap.Load(tableID)
if !ok {
for tableID, resolvedTs := range checkpointTsMap {
if txns, ok = unresolvedTxns[tableID]; !ok {
continue
}
resolvedTs := v.(uint64)
i := sort.Search(len(txns), func(i int) bool {
return txns[i].commitTs > resolvedTs
})
if i == 0 {
continue
}
var resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs
if i == len(txns) {
resolvedTxnsWithTheSameCommitTs = txns
delete(unresolvedTxns, tableID)
} else {
resolvedTxnsWithTheSameCommitTs = txns[:i]
unresolvedTxns[tableID] = txns[i:]
}
var txnsLength int
for _, txns := range resolvedTxnsWithTheSameCommitTs {
txnsLength += len(txns.txns)
}
Expand All @@ -154,7 +161,6 @@ func splitResolvedTxn(
}
}
resolvedRowsMap[tableID] = resolvedTxns
flushedResolvedTsMap[tableID] = resolvedTs
}
return
}
Loading

0 comments on commit 6286415

Please sign in to comment.