Skip to content

Commit

Permalink
limit the maximum number of cached txns in mysql worker
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Apr 16, 2024
1 parent 72646f6 commit 55b9eb6
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 186 deletions.
15 changes: 8 additions & 7 deletions cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ dmlsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil)
type dmlSink struct {
alive struct {
sync.RWMutex
conflictDetector *causality.ConflictDetector[*worker, *txnEvent]
conflictDetector *causality.ConflictDetector[*txnEvent]
isDead bool
}

Expand Down Expand Up @@ -107,15 +107,19 @@ func newSink(ctx context.Context,
dead: make(chan struct{}),
}

sink.alive.conflictDetector = causality.NewConflictDetector[*txnEvent](conflictDetectorSlots, causality.WorkerOption{
WorkerCount: len(backends),
CacheSize: 1024,
IsBlock: true,
})

g, ctx1 := errgroup.WithContext(ctx)
for i, backend := range backends {
w := newWorker(ctx1, changefeedID, i, backend, len(backends))
g.Go(func() error { return w.run() })
g.Go(func() error { return w.runLoop(sink.alive.conflictDetector) })
sink.workers = append(sink.workers, w)
}

sink.alive.conflictDetector = causality.NewConflictDetector[*worker, *txnEvent](sink.workers, conflictDetectorSlots)

sink.wg.Add(1)
go func() {
defer sink.wg.Done()
Expand Down Expand Up @@ -165,9 +169,6 @@ func (s *dmlSink) Close() {
}
s.wg.Wait()

for _, w := range s.workers {
w.close()
}
if s.statistics != nil {
s.statistics.Close()
}
Expand Down
52 changes: 17 additions & 35 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics/txn"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/causality"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type txnWithNotifier struct {
*txnEvent
postTxnExecuted func()
}

type worker struct {
ctx context.Context
changefeed string
workerCount int

ID int
txnCh *chann.DrainableChann[txnWithNotifier]
backend backend

// Metrics.
Expand All @@ -58,13 +52,13 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
ID int, backend backend, workerCount int,
) *worker {
wid := fmt.Sprintf("%d", ID)

return &worker{
ctx: ctx,
changefeed: fmt.Sprintf("%s.%s", changefeedID.Namespace, changefeedID.ID),
workerCount: workerCount,

ID: ID,
txnCh: chann.NewAutoDrainChann[txnWithNotifier](chann.Cap(-1 /*unbounded*/)),
backend: backend,

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand All @@ -79,21 +73,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
}
}

// Add adds a txnEvent to the worker.
// The worker will call postTxnExecuted() after the txn executed.
// The postTxnExecuted will remove the txn related Node in the conflict detector's
// dependency graph and resolve related dependencies for these transacitons
// which depend on this executed txn.
func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) {
w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted}
}

func (w *worker) close() {
w.txnCh.CloseAndDrain()
}

// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) run() error {
// Run a loop.
func (w *worker) runLoop(conflictDetector *causality.ConflictDetector[*txnEvent]) error {
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -107,25 +88,26 @@ func (w *worker) run() error {
zap.Int("workerID", w.ID))

start := time.Now()
txnCh := conflictDetector.GetOutChByWorkerID(int64(w.ID))
for {
select {
case <-w.ctx.Done():
log.Info("Transaction dmlSink worker exits as canceled",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))
return nil
case txn := <-w.txnCh.Out():
case txn := <-txnCh:
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.txnEvent != nil {
needFlush := w.onEvent(txn)
if txn.TxnEvent != nil {
needFlush := w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
if !needFlush {
delay := time.NewTimer(w.flushInterval)
for !needFlush {
select {
case txn := <-w.txnCh.Out():
needFlush = w.onEvent(txn)
case txn := <-txnCh:
needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
case <-delay.C:
needFlush = true
}
Expand Down Expand Up @@ -157,24 +139,24 @@ func (w *worker) run() error {
}

// onEvent is called when a new event is received.
// It returns true if it needs flush immediately.
func (w *worker) onEvent(txn txnWithNotifier) bool {
// It returns true if the event is sent to backend.
func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
w.hasPending = true

if txn.txnEvent.GetTableSinkState() != state.TableSinkSinking {
if txn.GetTableSinkState() != state.TableSinkSinking {
// The table where the event comes from is in stopping, so it's safe
// to drop the event directly.
txn.txnEvent.Callback()
txn.Callback()
// Still necessary to append the callbacks into the pending list.
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)
return false
}

w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds())
w.metricQueueDuration.Observe(time.Since(txn.start).Seconds())
w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows)))
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)
return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)
}

// doFlush flushes the backend.
Expand Down
80 changes: 46 additions & 34 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ package causality
import (
"sync"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/causality/internal"
"github.com/pingcap/tiflow/pkg/chann"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// ConflictDetector implements a logic that dispatches transaction
// to different workers in a way that transactions modifying the same
// keys are never executed concurrently and have their original orders
// preserved.
type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
workers []Worker
type ConflictDetector[Txn txnEvent] struct {
workers []workerCache[Txn]

// slots are used to find all unfinished transactions
// conflicting with an incoming transactions.
Expand All @@ -38,29 +40,26 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {

// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
garbageNodes *chann.DrainableChann[*internal.Node]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
node *internal.Node
sortedKeysHash []uint64
}

// NewConflictDetector creates a new ConflictDetector.
func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
workers []Worker,
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
workers: workers,
func NewConflictDetector[Txn txnEvent](
numSlots uint64, opt WorkerOption,
) *ConflictDetector[Txn] {
ret := &ConflictDetector[Txn]{
workers: make([]workerCache[Txn], opt.WorkerCount),
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
garbageNodes: chann.NewAutoDrainChann[*internal.Node](),
closeCh: make(chan struct{}),
}
for i := 0; i < opt.WorkerCount; i++ {
ret.workers[i] = newWorker[Txn](opt)
}

ret.wg.Add(1)
go func() {
Expand All @@ -75,36 +74,38 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
//
// NOTE: if multiple threads access this concurrently,
// Txn.GenSortedDedupKeysHash must be sorted by the slot index.
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
func (d *ConflictDetector[Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
node := internal.NewNode(sortedKeysHash)
txnWithNotifier := TxnWithNotifier[Txn]{
TxnEvent: txn,
PostTxnExecuted: func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
d.garbageNodes.In() <- node
},
}
node.TrySendToWorker = func(workerID int64) bool {
// Try sending this txn to related worker as soon as all dependencies are resolved.
return d.sendToWorker(txnWithNotifier, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
d.slots.Add(node)
}

// Close closes the ConflictDetector.
func (d *ConflictDetector[Worker, Txn]) Close() {
func (d *ConflictDetector[Txn]) Close() {
close(d.closeCh)
d.wg.Wait()
}

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
func (d *ConflictDetector[Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
Expand All @@ -117,20 +118,31 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
case node := <-d.garbageNodes.Out():
if node != nil {
d.slots.Free(node)
}
}
}
}

// sendToWorker should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) {
func (d *ConflictDetector[Txn]) sendToWorker(txn TxnWithNotifier[Txn], workerID int64) bool {
if workerID < 0 {
panic("must assign with a valid workerID")
log.Panic("must assign with a valid workerID", zap.Int64("workerID", workerID))
}
txn.OnConflictResolved()
worker := d.workers[workerID]
worker.Add(txn, postTxnExecuted)
ok := worker.add(txn)
if ok {
txn.TxnEvent.OnConflictResolved()
}
return ok
}

// GetOutChByWorkerID returns the output channel of the worker.
func (d *ConflictDetector[Txn]) GetOutChByWorkerID(workerID int64) <-chan TxnWithNotifier[Txn] {
if workerID < 0 {
log.Panic("must assign with a valid workerID", zap.Int64("workerID", workerID))
}
return d.workers[workerID].out()
}
Loading

0 comments on commit 55b9eb6

Please sign in to comment.