Skip to content

Commit

Permalink
sink(ticdc): limit the number of transactions cached in a mysql worker (
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Apr 26, 2024
1 parent bff94a2 commit 445a471
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 277 deletions.
16 changes: 9 additions & 7 deletions cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ dmlsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil)
type dmlSink struct {
alive struct {
sync.RWMutex
conflictDetector *causality.ConflictDetector[*worker, *txnEvent]
conflictDetector *causality.ConflictDetector[*txnEvent]
isDead bool
}

Expand Down Expand Up @@ -107,15 +107,20 @@ func newSink(ctx context.Context,
dead: make(chan struct{}),
}

sink.alive.conflictDetector = causality.NewConflictDetector[*txnEvent](conflictDetectorSlots, causality.TxnCacheOption{
Count: len(backends),
Size: 1024,
BlockStrategy: causality.BlockStrategyWaitEmpty,
})

g, ctx1 := errgroup.WithContext(ctx)
for i, backend := range backends {
w := newWorker(ctx1, changefeedID, i, backend, len(backends))
g.Go(func() error { return w.run() })
txnCh := sink.alive.conflictDetector.GetOutChByCacheID(int64(i))
g.Go(func() error { return w.runLoop(txnCh) })
sink.workers = append(sink.workers, w)
}

sink.alive.conflictDetector = causality.NewConflictDetector[*worker, *txnEvent](sink.workers, conflictDetectorSlots)

sink.wg.Add(1)
go func() {
defer sink.wg.Done()
Expand Down Expand Up @@ -165,9 +170,6 @@ func (s *dmlSink) Close() {
}
s.wg.Wait()

for _, w := range s.workers {
w.close()
}
if s.statistics != nil {
s.statistics.Close()
}
Expand Down
53 changes: 17 additions & 36 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics/txn"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/causality"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type txnWithNotifier struct {
*txnEvent
postTxnExecuted func()
}

type worker struct {
ctx context.Context
changefeed string
workerCount int

ID int
txnCh *chann.DrainableChann[txnWithNotifier]
backend backend

// Metrics.
Expand All @@ -58,13 +52,13 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
ID int, backend backend, workerCount int,
) *worker {
wid := fmt.Sprintf("%d", ID)

return &worker{
ctx: ctx,
changefeed: fmt.Sprintf("%s.%s", changefeedID.Namespace, changefeedID.ID),
workerCount: workerCount,

ID: ID,
txnCh: chann.NewAutoDrainChann[txnWithNotifier](chann.Cap(-1 /*unbounded*/)),
backend: backend,

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand All @@ -79,21 +73,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
}
}

// Add adds a txnEvent to the worker.
// The worker will call postTxnExecuted() after the txn executed.
// The postTxnExecuted will remove the txn related Node in the conflict detector's
// dependency graph and resolve related dependencies for these transacitons
// which depend on this executed txn.
func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) {
w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted}
}

func (w *worker) close() {
w.txnCh.CloseAndDrain()
}

// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) run() error {
// Run a loop.
func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -114,18 +95,18 @@ func (w *worker) run() error {
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))
return nil
case txn := <-w.txnCh.Out():
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
case txn := <-txnCh:
// we get the data from txnCh until no more data here or reach the state that can be flushed.
// If no more data in txnCh, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.txnEvent != nil {
needFlush := w.onEvent(txn)
if txn.TxnEvent != nil {
needFlush := w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
if !needFlush {
delay := time.NewTimer(w.flushInterval)
for !needFlush {
select {
case txn := <-w.txnCh.Out():
needFlush = w.onEvent(txn)
case txn := <-txnCh:
needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
case <-delay.C:
needFlush = true
}
Expand Down Expand Up @@ -158,23 +139,23 @@ func (w *worker) run() error {

// onEvent is called when a new event is received.
// It returns true if the event is sent to backend.
func (w *worker) onEvent(txn txnWithNotifier) bool {
func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
w.hasPending = true

if txn.txnEvent.GetTableSinkState() != state.TableSinkSinking {
if txn.GetTableSinkState() != state.TableSinkSinking {
// The table where the event comes from is in stopping, so it's safe
// to drop the event directly.
txn.txnEvent.Callback()
txn.Callback()
// Still necessary to append the callbacks into the pending list.
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)
return false
}

w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds())
w.metricQueueDuration.Observe(time.Since(txn.start).Seconds())
w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows)))
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)
return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)
}

// doFlush flushes the backend.
Expand Down
106 changes: 61 additions & 45 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ package causality
import (
"sync"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/causality/internal"
"github.com/pingcap/tiflow/pkg/chann"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// ConflictDetector implements a logic that dispatches transaction
// to different workers in a way that transactions modifying the same
// keys are never executed concurrently and have their original orders
// preserved.
type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
workers []Worker
// to different worker cache channels in a way that transactions
// modifying the same keys are never executed concurrently and
// have their original orders preserved. Transactions in different
// channels can be executed concurrently.
type ConflictDetector[Txn txnEvent] struct {
// resolvedTxnCaches are used to cache resolved transactions.
resolvedTxnCaches []txnCache[Txn]

// slots are used to find all unfinished transactions
// conflicting with an incoming transactions.
Expand All @@ -38,28 +42,25 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {

// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
garbageNodes *chann.DrainableChann[*internal.Node]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
node *internal.Node
sortedKeysHash []uint64
}

// NewConflictDetector creates a new ConflictDetector.
func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
workers []Worker,
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
func NewConflictDetector[Txn txnEvent](
numSlots uint64, opt TxnCacheOption,
) *ConflictDetector[Txn] {
ret := &ConflictDetector[Txn]{
resolvedTxnCaches: make([]txnCache[Txn], opt.Count),
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[*internal.Node](),
closeCh: make(chan struct{}),
}
for i := 0; i < opt.Count; i++ {
ret.resolvedTxnCaches[i] = newTxnCache[Txn](opt)
}

ret.wg.Add(1)
Expand All @@ -75,36 +76,38 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
//
// NOTE: if multiple threads access this concurrently,
// Txn.GenSortedDedupKeysHash must be sorted by the slot index.
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
func (d *ConflictDetector[Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
node := internal.NewNode(sortedKeysHash)
txnWithNotifier := TxnWithNotifier[Txn]{
TxnEvent: txn,
PostTxnExecuted: func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
d.garbageNodes.In() <- node
},
}
node.TrySendToTxnCache = func(cacheID int64) bool {
// Try sending this txn to related cache as soon as all dependencies are resolved.
return d.sendToCache(txnWithNotifier, cacheID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.RandCacheID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.resolvedTxnCaches)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
d.slots.Add(node)
}

// Close closes the ConflictDetector.
func (d *ConflictDetector[Worker, Txn]) Close() {
func (d *ConflictDetector[Txn]) Close() {
close(d.closeCh)
d.wg.Wait()
}

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
func (d *ConflictDetector[Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
Expand All @@ -117,20 +120,33 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
case node := <-d.garbageNodes.Out():
if node != nil {
d.slots.Free(node)
}
}
}
}

// sendToWorker should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) {
if workerID < 0 {
panic("must assign with a valid workerID")
// sendToCache should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Txn]) sendToCache(txn TxnWithNotifier[Txn], id int64) bool {
if id < 0 {
log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id))
}

// Note OnConflictResolved must be called before add to cache. Otherwise, there will
// be a data race since the txn may be read before the OnConflictResolved is called.
txn.TxnEvent.OnConflictResolved()
cache := d.resolvedTxnCaches[id]
ok := cache.add(txn)
return ok
}

// GetOutChByCacheID returns the output channel by cacheID.
// Note txns in single cache should be executed sequentially.
func (d *ConflictDetector[Txn]) GetOutChByCacheID(id int64) <-chan TxnWithNotifier[Txn] {
if id < 0 {
log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id))
}
txn.OnConflictResolved()
worker := d.workers[workerID]
worker.Add(txn, postTxnExecuted)
return d.resolvedTxnCaches[id].out()
}
Loading

0 comments on commit 445a471

Please sign in to comment.