Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): limit the number of transactions cached in a mysql worker #10892

Merged
merged 10 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ dmlsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil)
type dmlSink struct {
alive struct {
sync.RWMutex
conflictDetector *causality.ConflictDetector[*worker, *txnEvent]
conflictDetector *causality.ConflictDetector[*txnEvent]
isDead bool
}

Expand Down Expand Up @@ -107,15 +107,20 @@ func newSink(ctx context.Context,
dead: make(chan struct{}),
}

sink.alive.conflictDetector = causality.NewConflictDetector[*txnEvent](conflictDetectorSlots, causality.TxnCacheOption{
Count: len(backends),
Size: 1024,
BlockStrategy: causality.BlockStrategyWaitEmpty,
})

g, ctx1 := errgroup.WithContext(ctx)
for i, backend := range backends {
w := newWorker(ctx1, changefeedID, i, backend, len(backends))
g.Go(func() error { return w.run() })
txnCh := sink.alive.conflictDetector.GetOutChByCacheID(int64(i))
g.Go(func() error { return w.runLoop(txnCh) })
sink.workers = append(sink.workers, w)
}

sink.alive.conflictDetector = causality.NewConflictDetector[*worker, *txnEvent](sink.workers, conflictDetectorSlots)

sink.wg.Add(1)
go func() {
defer sink.wg.Done()
Expand Down Expand Up @@ -165,9 +170,6 @@ func (s *dmlSink) Close() {
}
s.wg.Wait()

for _, w := range s.workers {
w.close()
}
if s.statistics != nil {
s.statistics.Close()
}
Expand Down
55 changes: 18 additions & 37 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics/txn"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/causality"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type txnWithNotifier struct {
*txnEvent
postTxnExecuted func()
}

type worker struct {
ctx context.Context
changefeed string
workerCount int

ID int
txnCh *chann.DrainableChann[txnWithNotifier]
backend backend

// Metrics.
Expand All @@ -58,13 +52,13 @@
ID int, backend backend, workerCount int,
) *worker {
wid := fmt.Sprintf("%d", ID)

return &worker{
ctx: ctx,
changefeed: fmt.Sprintf("%s.%s", changefeedID.Namespace, changefeedID.ID),
workerCount: workerCount,

ID: ID,
txnCh: chann.NewAutoDrainChann[txnWithNotifier](chann.Cap(-1 /*unbounded*/)),
backend: backend,

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand All @@ -79,21 +73,8 @@
}
}

// Add adds a txnEvent to the worker.
// The worker will call postTxnExecuted() after the txn executed.
// The postTxnExecuted will remove the txn related Node in the conflict detector's
// dependency graph and resolve related dependencies for these transacitons
// which depend on this executed txn.
func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) {
w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted}
}

func (w *worker) close() {
w.txnCh.CloseAndDrain()
}

// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) run() error {
// Run a loop.
func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -114,18 +95,18 @@
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))
return nil
case txn := <-w.txnCh.Out():
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
case txn := <-txnCh:
// we get the data from txnCh until no more data here or reach the state that can be flushed.
// If no more data in txnCh, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.txnEvent != nil {
needFlush := w.onEvent(txn)
if txn.TxnEvent != nil {
needFlush := w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
if !needFlush {
delay := time.NewTimer(w.flushInterval)
for !needFlush {
select {
case txn := <-w.txnCh.Out():
needFlush = w.onEvent(txn)
case txn := <-txnCh:
needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)

Check warning on line 109 in cdc/sink/dmlsink/txn/worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/txn/worker.go#L108-L109

Added lines #L108 - L109 were not covered by tests
case <-delay.C:
needFlush = true
}
Expand Down Expand Up @@ -157,24 +138,24 @@
}

// onEvent is called when a new event is received.
// It returns true if it needs flush immediately.
func (w *worker) onEvent(txn txnWithNotifier) bool {
// It returns true if the event is sent to backend.
func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
w.hasPending = true

if txn.txnEvent.GetTableSinkState() != state.TableSinkSinking {
if txn.GetTableSinkState() != state.TableSinkSinking {
// The table where the event comes from is in stopping, so it's safe
// to drop the event directly.
txn.txnEvent.Callback()
txn.Callback()

Check warning on line 148 in cdc/sink/dmlsink/txn/worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/txn/worker.go#L148

Added line #L148 was not covered by tests
// Still necessary to append the callbacks into the pending list.
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)

Check warning on line 150 in cdc/sink/dmlsink/txn/worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/txn/worker.go#L150

Added line #L150 was not covered by tests
return false
}

w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds())
w.metricQueueDuration.Observe(time.Since(txn.start).Seconds())
w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows)))
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)
return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)
}

// doFlush flushes the backend.
Expand Down
106 changes: 61 additions & 45 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
import (
"sync"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/causality/internal"
"github.com/pingcap/tiflow/pkg/chann"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// ConflictDetector implements a logic that dispatches transaction
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
// to different workers in a way that transactions modifying the same
// keys are never executed concurrently and have their original orders
// preserved.
type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
workers []Worker
// to different worker cache channels in a way that transactions
// modifying the same keys are never executed concurrently and
// have their original orders preserved. Transactions in different
// channels can be executed concurrently.
type ConflictDetector[Txn txnEvent] struct {
// resolvedTxnCaches are used to cache resolved transactions.
resolvedTxnCaches []txnCache[Txn]

// slots are used to find all unfinished transactions
// conflicting with an incoming transactions.
Expand All @@ -38,28 +42,25 @@

// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
garbageNodes *chann.DrainableChann[*internal.Node]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
node *internal.Node
sortedKeysHash []uint64
}

// NewConflictDetector creates a new ConflictDetector.
func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
workers []Worker,
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
func NewConflictDetector[Txn txnEvent](
numSlots uint64, opt TxnCacheOption,
) *ConflictDetector[Txn] {
ret := &ConflictDetector[Txn]{
resolvedTxnCaches: make([]txnCache[Txn], opt.Count),
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[*internal.Node](),
closeCh: make(chan struct{}),
}
for i := 0; i < opt.Count; i++ {
ret.resolvedTxnCaches[i] = newTxnCache[Txn](opt)

Check warning on line 63 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L53-L63

Added lines #L53 - L63 were not covered by tests
}

ret.wg.Add(1)
Expand All @@ -75,36 +76,38 @@
//
// NOTE: if multiple threads access this concurrently,
// Txn.GenSortedDedupKeysHash must be sorted by the slot index.
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
func (d *ConflictDetector[Txn]) Add(txn Txn) {

Check warning on line 79 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L79

Added line #L79 was not covered by tests
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
node := internal.NewNode(sortedKeysHash)
txnWithNotifier := TxnWithNotifier[Txn]{
TxnEvent: txn,
PostTxnExecuted: func() {

Check warning on line 84 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L81-L84

Added lines #L81 - L84 were not covered by tests
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
d.garbageNodes.In() <- node
},

Check warning on line 93 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}
node.TrySendToTxnCache = func(cacheID int64) bool {
// Try sending this txn to related cache as soon as all dependencies are resolved.
return d.sendToCache(txnWithNotifier, cacheID)

Check warning on line 97 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L95-L97

Added lines #L95 - L97 were not covered by tests
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.RandCacheID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.resolvedTxnCaches)) }

Check warning on line 99 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L99

Added line #L99 was not covered by tests
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
d.slots.Add(node)

Check warning on line 101 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L101

Added line #L101 was not covered by tests
}

// Close closes the ConflictDetector.
func (d *ConflictDetector[Worker, Txn]) Close() {
func (d *ConflictDetector[Txn]) Close() {

Check warning on line 105 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L105

Added line #L105 was not covered by tests
close(d.closeCh)
d.wg.Wait()
}

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
func (d *ConflictDetector[Txn]) runBackgroundTasks() {

Check warning on line 110 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L110

Added line #L110 was not covered by tests
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
Expand All @@ -117,20 +120,33 @@
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
case node := <-d.garbageNodes.Out():
if node != nil {
d.slots.Free(node)

Check warning on line 125 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L123-L125

Added lines #L123 - L125 were not covered by tests
}
}
}
}

// sendToWorker should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) {
if workerID < 0 {
panic("must assign with a valid workerID")
// sendToCache should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Txn]) sendToCache(txn TxnWithNotifier[Txn], id int64) bool {
if id < 0 {
log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id))
}

Check warning on line 135 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L132-L135

Added lines #L132 - L135 were not covered by tests

// Note OnConflictResolved must be called before add to cache. Otherwise, there will
// be a data race since the txn may be read before the OnConflictResolved is called.
txn.TxnEvent.OnConflictResolved()
cache := d.resolvedTxnCaches[id]
ok := cache.add(txn)
return ok

Check warning on line 142 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L139-L142

Added lines #L139 - L142 were not covered by tests
}

// GetOutChByCacheID returns the output channel by cacheID.
// Note txns in single cache should be executed sequentially.
func (d *ConflictDetector[Txn]) GetOutChByCacheID(id int64) <-chan TxnWithNotifier[Txn] {
if id < 0 {
log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id))

Check warning on line 149 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L147-L149

Added lines #L147 - L149 were not covered by tests
}
txn.OnConflictResolved()
worker := d.workers[workerID]
worker.Add(txn, postTxnExecuted)
return d.resolvedTxnCaches[id].out()

Check warning on line 151 in pkg/causality/conflict_detector.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/conflict_detector.go#L151

Added line #L151 was not covered by tests
}
Loading
Loading