From 764fdb45cb05550009d1627d2cd05ca55b035ac0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 20 May 2024 21:01:22 +0800 Subject: [PATCH] sink(ticdc): limit the number of transactions cached in a mysql worker (#10892) (#10941) close pingcap/tiflow#10896 --- cdc/sink/dmlsink/txn/txn_dml_sink.go | 16 +-- cdc/sink/dmlsink/txn/worker.go | 43 ++----- pkg/causality/conflict_detector.go | 106 +++++++++------- pkg/causality/internal/node.go | 177 ++++++++++++++------------- pkg/causality/internal/node_test.go | 95 +++++++------- pkg/causality/internal/slots.go | 8 +- pkg/causality/internal/slots_test.go | 16 +-- pkg/causality/tests/driver.go | 12 +- pkg/causality/tests/worker.go | 29 ++--- pkg/causality/txn_cache.go | 142 +++++++++++++++++++++ pkg/causality/txn_cache_test.go | 85 +++++++++++++ pkg/causality/worker.go | 29 ----- 12 files changed, 486 insertions(+), 272 deletions(-) create mode 100644 pkg/causality/txn_cache.go create mode 100644 pkg/causality/txn_cache_test.go delete mode 100644 pkg/causality/worker.go diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index b8f78059f5d..964d11b6eca 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -43,7 +43,7 @@ var _ dmlsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil) type dmlSink struct { alive struct { sync.RWMutex - conflictDetector *causality.ConflictDetector[*worker, *txnEvent] + conflictDetector *causality.ConflictDetector[*txnEvent] isDead bool } @@ -107,15 +107,20 @@ func newSink(ctx context.Context, dead: make(chan struct{}), } + sink.alive.conflictDetector = causality.NewConflictDetector[*txnEvent](conflictDetectorSlots, causality.TxnCacheOption{ + Count: len(backends), + Size: 1024, + BlockStrategy: causality.BlockStrategyWaitEmpty, + }) + g, ctx1 := errgroup.WithContext(ctx) for i, backend := range backends { w := newWorker(ctx1, changefeedID, i, backend, len(backends)) - g.Go(func() error { return w.runLoop() }) + txnCh := sink.alive.conflictDetector.GetOutChByCacheID(int64(i)) + g.Go(func() error { return w.runLoop(txnCh) }) sink.workers = append(sink.workers, w) } - sink.alive.conflictDetector = causality.NewConflictDetector[*worker, *txnEvent](sink.workers, conflictDetectorSlots) - sink.wg.Add(1) go func() { defer sink.wg.Done() @@ -165,9 +170,6 @@ func (s *dmlSink) Close() { } s.wg.Wait() - for _, w := range s.workers { - w.close() - } if s.statistics != nil { s.statistics.Close() } diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 6fae294ddf3..b4f071bb567 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -22,23 +22,17 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics/txn" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" - "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/causality" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) -type txnWithNotifier struct { - *txnEvent - postTxnExecuted func() -} - type worker struct { ctx context.Context changefeed string workerCount int ID int - txnCh *chann.DrainableChann[txnWithNotifier] backend backend // Metrics. @@ -58,13 +52,13 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, ID int, backend backend, workerCount int, ) *worker { wid := fmt.Sprintf("%d", ID) + return &worker{ ctx: ctx, changefeed: fmt.Sprintf("%s.%s", changefeedID.Namespace, changefeedID.ID), workerCount: workerCount, ID: ID, - txnCh: chann.NewAutoDrainChann[txnWithNotifier](chann.Cap(-1 /*unbounded*/)), backend: backend, metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), @@ -79,21 +73,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, } } -// Add adds a txnEvent to the worker. -// The worker will call postTxnExecuted() after the txn executed. -// The postTxnExecuted will remove the txn related Node in the conflict detector's -// dependency graph and resolve related dependencies for these transacitons -// which depend on this executed txn. -func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) { - w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted} -} - -func (w *worker) close() { - w.txnCh.CloseAndDrain() -} - // Run a loop. -func (w *worker) runLoop() error { +func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { defer func() { if err := w.backend.Close(); err != nil { log.Info("Transaction dmlSink backend close fail", @@ -121,9 +102,9 @@ func (w *worker) runLoop() error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) return nil - case txn := <-w.txnCh.Out(): - if txn.txnEvent != nil { - needFlush = w.onEvent(txn) + case txn := <-txnCh: + if txn.TxnEvent != nil { + needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted) } case <-ticker.C: needFlush = true @@ -149,23 +130,23 @@ func (w *worker) runLoop() error { // onEvent is called when a new event is received. // It returns true if the event is sent to backend. -func (w *worker) onEvent(txn txnWithNotifier) bool { +func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { w.hasPending = true - if txn.txnEvent.GetTableSinkState() != state.TableSinkSinking { + if txn.GetTableSinkState() != state.TableSinkSinking { // The table where the event comes from is in stopping, so it's safe // to drop the event directly. - txn.txnEvent.Callback() + txn.Callback() // Still necessary to append the callbacks into the pending list. - w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted) + w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted) return false } w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds()) w.metricQueueDuration.Observe(time.Since(txn.start).Seconds()) w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows))) - w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted) - return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent) + w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted) + return w.backend.OnTxnEvent(txn.TxnCallbackableEvent) } // doFlush flushes the backend. diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index f1f7495e77c..79f14944027 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -16,17 +16,21 @@ package causality import ( "sync" + "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/causality/internal" "github.com/pingcap/tiflow/pkg/chann" "go.uber.org/atomic" + "go.uber.org/zap" ) // ConflictDetector implements a logic that dispatches transaction -// to different workers in a way that transactions modifying the same -// keys are never executed concurrently and have their original orders -// preserved. -type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct { - workers []Worker +// to different worker cache channels in a way that transactions +// modifying the same keys are never executed concurrently and +// have their original orders preserved. Transactions in different +// channels can be executed concurrently. +type ConflictDetector[Txn txnEvent] struct { + // resolvedTxnCaches are used to cache resolved transactions. + resolvedTxnCaches []txnCache[Txn] // slots are used to find all unfinished transactions // conflicting with an incoming transactions. @@ -38,28 +42,25 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct { // Used to run a background goroutine to GC or notify nodes. notifiedNodes *chann.DrainableChann[func()] - garbageNodes *chann.DrainableChann[txnFinishedEvent] + garbageNodes *chann.DrainableChann[*internal.Node] wg sync.WaitGroup closeCh chan struct{} } -type txnFinishedEvent struct { - node *internal.Node - sortedKeysHash []uint64 -} - // NewConflictDetector creates a new ConflictDetector. -func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( - workers []Worker, - numSlots uint64, -) *ConflictDetector[Worker, Txn] { - ret := &ConflictDetector[Worker, Txn]{ - workers: workers, - slots: internal.NewSlots[*internal.Node](numSlots), - numSlots: numSlots, - notifiedNodes: chann.NewAutoDrainChann[func()](), - garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](), - closeCh: make(chan struct{}), +func NewConflictDetector[Txn txnEvent]( + numSlots uint64, opt TxnCacheOption, +) *ConflictDetector[Txn] { + ret := &ConflictDetector[Txn]{ + resolvedTxnCaches: make([]txnCache[Txn], opt.Count), + slots: internal.NewSlots[*internal.Node](numSlots), + numSlots: numSlots, + notifiedNodes: chann.NewAutoDrainChann[func()](), + garbageNodes: chann.NewAutoDrainChann[*internal.Node](), + closeCh: make(chan struct{}), + } + for i := 0; i < opt.Count; i++ { + ret.resolvedTxnCaches[i] = newTxnCache[Txn](opt) } ret.wg.Add(1) @@ -75,12 +76,12 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( // // NOTE: if multiple threads access this concurrently, // Txn.GenSortedDedupKeysHash must be sorted by the slot index. -func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { +func (d *ConflictDetector[Txn]) Add(txn Txn) { sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots) - node := internal.NewNode() - node.OnResolved = func(workerID int64) { - // This callback is called after the transaction is executed. - postTxnExecuted := func() { + node := internal.NewNode(sortedKeysHash) + txnWithNotifier := TxnWithNotifier[Txn]{ + TxnEvent: txn, + PostTxnExecuted: func() { // After this transaction is executed, we can remove the node from the graph, // and resolve related dependencies for these transacitons which depend on this // executed transaction. @@ -88,23 +89,25 @@ func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { // Send this node to garbageNodes to GC it from the slots if this node is still // occupied related slots. - d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash} - } - // Send this txn to related worker as soon as all dependencies are resolved. - d.sendToWorker(txn, postTxnExecuted, workerID) + d.garbageNodes.In() <- node + }, + } + node.TrySendToTxnCache = func(cacheID int64) bool { + // Try sending this txn to related cache as soon as all dependencies are resolved. + return d.sendToCache(txnWithNotifier, cacheID) } - node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) } + node.RandCacheID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.resolvedTxnCaches)) } node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback } - d.slots.Add(node, sortedKeysHash) + d.slots.Add(node) } // Close closes the ConflictDetector. -func (d *ConflictDetector[Worker, Txn]) Close() { +func (d *ConflictDetector[Txn]) Close() { close(d.closeCh) d.wg.Wait() } -func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { +func (d *ConflictDetector[Txn]) runBackgroundTasks() { defer func() { d.notifiedNodes.CloseAndDrain() d.garbageNodes.CloseAndDrain() @@ -117,20 +120,33 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { if notifyCallback != nil { notifyCallback() } - case event := <-d.garbageNodes.Out(): - if event.node != nil { - d.slots.Free(event.node, event.sortedKeysHash) + case node := <-d.garbageNodes.Out(): + if node != nil { + d.slots.Free(node) } } } } -// sendToWorker should not call txn.Callback if it returns an error. -func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) { - if workerID < 0 { - panic("must assign with a valid workerID") +// sendToCache should not call txn.Callback if it returns an error. +func (d *ConflictDetector[Txn]) sendToCache(txn TxnWithNotifier[Txn], id int64) bool { + if id < 0 { + log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id)) + } + + // Note OnConflictResolved must be called before add to cache. Otherwise, there will + // be a data race since the txn may be read before the OnConflictResolved is called. + txn.TxnEvent.OnConflictResolved() + cache := d.resolvedTxnCaches[id] + ok := cache.add(txn) + return ok +} + +// GetOutChByCacheID returns the output channel by cacheID. +// Note txns in single cache should be executed sequentially. +func (d *ConflictDetector[Txn]) GetOutChByCacheID(id int64) <-chan TxnWithNotifier[Txn] { + if id < 0 { + log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id)) } - txn.OnConflictResolved() - worker := d.workers[workerID] - worker.Add(txn, postTxnExecuted) + return d.resolvedTxnCaches[id].out() } diff --git a/pkg/causality/internal/node.go b/pkg/causality/internal/node.go index 75324918a5b..a9ec62e6055 100644 --- a/pkg/causality/internal/node.go +++ b/pkg/causality/internal/node.go @@ -15,24 +15,26 @@ package internal import ( "sync" - stdatomic "sync/atomic" + "sync/atomic" "github.com/google/btree" - "go.uber.org/atomic" + "github.com/pingcap/log" + "go.uber.org/zap" ) type ( - workerID = int64 + cacheID = int64 ) const ( - unassigned = workerID(-2) - assignedToAny = workerID(-1) + unassigned = cacheID(-2) + assignedToAny = cacheID(-1) + invalidNodeID = int64(-1) ) var ( - nextNodeID = atomic.NewInt64(0) + nextNodeID = atomic.Int64{} // btreeFreeList is a shared free list used by all // btrees in order to lessen the burden of GC. @@ -46,12 +48,13 @@ var ( // in conflict detection. type Node struct { // Immutable fields. - id int64 + id int64 + sortedDedupKeysHash []uint64 // Called when all dependencies are resolved. - OnResolved func(id workerID) + TrySendToTxnCache func(id cacheID) bool // Set the id generator to get a random ID. - RandWorkerID func() workerID + RandCacheID func() cacheID // Set the callback that the node is notified. OnNotified func(callback func()) @@ -64,7 +67,7 @@ type Node struct { // Following fields are protected by `mu`. mu sync.Mutex - assignedTo workerID + assignedTo cacheID removed bool // dependers is an ordered set for all nodes that @@ -82,11 +85,12 @@ type Node struct { } // NewNode creates a new node. -func NewNode() (ret *Node) { +func NewNode(sortedDedupKeysHash []uint64) (ret *Node) { defer func() { ret.id = genNextNodeID() - ret.OnResolved = nil - ret.RandWorkerID = nil + ret.sortedDedupKeysHash = sortedDedupKeysHash + ret.TrySendToTxnCache = nil + ret.RandCacheID = nil ret.totalDependencies = 0 ret.resolvedDependencies = 0 ret.removedDependencies = 0 @@ -104,9 +108,14 @@ func (n *Node) NodeID() int64 { return n.id } +// Hashes implements interface internal.SlotNode. +func (n *Node) Hashes() []uint64 { + return n.sortedDedupKeysHash +} + // DependOn implements interface internal.SlotNode. func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) { - resolvedDependencies, removedDependencies := int32(0), int32(0) + resolvedDependencies := int32(0) depend := func(target *Node) { if target == nil { @@ -115,14 +124,14 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) // with any other nodes. However it's still necessary to track // it because Node.tryResolve needs to counting the number of // resolved dependencies. - resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1) - stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny) - removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1) + resolvedDependencies = atomic.AddInt32(&n.resolvedDependencies, 1) + atomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny) + atomic.AddInt32(&n.removedDependencies, 1) return } if target.id == n.id { - panic("you cannot depend on yourself") + log.Panic("node cannot depend on itself") } // The target node might be removed or modified in other places, for example @@ -131,20 +140,20 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) defer target.mu.Unlock() if target.assignedTo != unassigned { - // The target has already been assigned to a worker. - // In this case, record the worker ID in `resolvedList`, and this node - // probably can be sent to the same worker and executed sequentially. - resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1) - stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo) + // The target has already been assigned to a cache. + // In this case, record the cache ID in `resolvedList`, and this node + // probably can be sent to the same cache and executed sequentially. + resolvedDependencies = atomic.AddInt32(&n.resolvedDependencies, 1) + atomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo) } // Add the node to the target's dependers if the target has not been removed. if target.removed { // The target has already been removed. - removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1) + atomic.AddInt32(&n.removedDependencies, 1) } else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist { // Should never depend on a target redundantly. - panic("should never exist") + log.Panic("should never exist") } } @@ -153,7 +162,7 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) // ?: why gen new ID here? n.id = genNextNodeID() - // `totalDependcies` and `resolvedList` must be initialized before depending on any targets. + // `totalDependencies` and `resolvedList` must be initialized before depending on any targets. n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt) n.resolvedList = make([]int64, 0, n.totalDependencies) for i := 0; i < int(n.totalDependencies); i++ { @@ -167,7 +176,7 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) depend(nil) } - n.maybeResolve(resolvedDependencies, removedDependencies) + n.maybeResolve() } // Remove implements interface internal.SlotNode. @@ -179,8 +188,8 @@ func (n *Node) Remove() { if n.dependers != nil { // `mu` must be holded during accessing dependers. n.dependers.Ascend(func(node *Node) bool { - removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1) - node.maybeResolve(0, removedDependencies) + atomic.AddInt32(&node.removedDependencies, 1) + node.OnNotified(node.maybeResolve) return true }) n.dependers.Clear(true) @@ -195,12 +204,11 @@ func (n *Node) Free() { n.mu.Lock() defer n.mu.Unlock() if n.id == invalidNodeID { - panic("double free") + log.Panic("double free") } n.id = invalidNodeID - n.OnResolved = nil - n.RandWorkerID = nil + n.TrySendToTxnCache = nil // TODO: reuse node if necessary. Currently it's impossible if async-notify is used. // The reason is a node can step functions `assignTo`, `Remove`, `Free`, then `assignTo`. @@ -208,28 +216,31 @@ func (n *Node) Free() { // or not. } -// assignTo assigns a node to a worker. Returns `true` on success. -func (n *Node) assignTo(workerID int64) bool { +// tryAssignTo assigns a node to a cache. Returns `true` on success. +func (n *Node) tryAssignTo(cacheID int64) bool { n.mu.Lock() defer n.mu.Unlock() if n.assignedTo != unassigned { // Already resolved by some other guys. - return false + return true } - n.assignedTo = workerID - if n.OnResolved != nil { - n.OnResolved(workerID) - n.OnResolved = nil + if n.TrySendToTxnCache != nil { + ok := n.TrySendToTxnCache(cacheID) + if !ok { + return false + } + n.TrySendToTxnCache = nil } + n.assignedTo = cacheID if n.dependers != nil { // `mu` must be holded during accessing dependers. n.dependers.Ascend(func(node *Node) bool { - resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1) - stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo) - node.maybeResolve(resolvedDependencies, 0) + resolvedDependencies := atomic.AddInt32(&node.resolvedDependencies, 1) + atomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo) + node.OnNotified(node.maybeResolve) return true }) } @@ -237,72 +248,70 @@ func (n *Node) assignTo(workerID int64) bool { return true } -func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) { - if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok { - if workerNum < 0 { - panic("Node.tryResolve must return a valid worker ID") +func (n *Node) maybeResolve() { + if cacheID, ok := n.tryResolve(); ok { + if cacheID == unassigned { + log.Panic("invalid cache ID", zap.Uint64("cacheID", uint64(cacheID))) } - if n.OnNotified != nil { - // Notify the conflict detector background worker to assign the node to the worker asynchronously. - n.OnNotified(func() { n.assignTo(workerNum) }) - } else { - // Assign the node to the worker directly. - n.assignTo(workerNum) + + if cacheID >= 0 { + n.tryAssignTo(cacheID) + return + } + + cacheID := n.RandCacheID() + if !n.tryAssignTo(cacheID) { + // If the cache is full, we need to try to assign to another cache. + n.OnNotified(n.maybeResolve) } } } -// tryResolve try to find a worker to assign the node to. +// tryResolve try to find a cache to assign the node to. // Returns (_, false) if there is a conflict, // returns (rand, true) if there is no conflict, -// returns (N, true) if only worker N can be used. -func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) { - assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies) - if resolved && assignedTo == assignedToAny { - assignedTo = n.RandWorkerID() +// returns (N, true) if only cache N can be used. +func (n *Node) tryResolve() (int64, bool) { + if n.totalDependencies == 0 { + // No conflicts, can select any caches. + return assignedToAny, true } - return assignedTo, resolved -} -func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) { - if n.totalDependencies == 0 { - // No conflicts, can select any workers. + removedDependencies := atomic.LoadInt32(&n.removedDependencies) + if removedDependencies == n.totalDependencies { + // All dependcies are removed, so assign the node to any cache is fine. return assignedToAny, true } + resolvedDependencies := atomic.LoadInt32(&n.resolvedDependencies) if resolvedDependencies == n.totalDependencies { - firstDep := stdatomic.LoadInt64(&n.resolvedList[0]) + firstDep := atomic.LoadInt64(&n.resolvedList[0]) hasDiffDep := false for i := 1; i < int(n.totalDependencies); i++ { - curr := stdatomic.LoadInt64(&n.resolvedList[i]) - // // Todo: simplify assign to logic, only resolve dependencies nodes after - // // corresponding transactions are executed. - // // - // // In DependOn, depend(nil) set resolvedList[i] to assignedToAny - // // for these no dependecy keys. - // if curr == assignedToAny { - // continue - // } + curr := atomic.LoadInt64(&n.resolvedList[i]) + // Todo: simplify assign to logic, only resolve dependencies nodes after + // corresponding transactions are executed. + // + // In DependOn, depend(nil) set resolvedList[i] to assignedToAny + // for these no dependecy keys. + if curr == assignedToAny { + continue + } if firstDep != curr { hasDiffDep = true break } } - if !hasDiffDep { - // If all dependency nodes are assigned to the same worker, we can assign - // this node to the same worker directly, and they will execute sequentially. - // On the other hand, if dependency nodes are assigned to different workers, + if !hasDiffDep && firstDep != unassigned { + // If all dependency nodes are assigned to the same cache, we can assign + // this node to the same cache directly, and they will execute sequentially. + // On the other hand, if dependency nodes are assigned to different caches, // This node has to wait all dependency txn executed and all depencecy nodes // are removed. return firstDep, true } } - // All dependcies are removed, so assign the node to any worker is fine. - if removedDependencies == n.totalDependencies { - return assignedToAny, true - } - return unassigned, false } @@ -327,9 +336,9 @@ func (n *Node) dependerCount() int { return n.dependers.Len() } -// assignedWorkerID returns the worker ID that the node has been assigned to. +// assignedWorkerID returns the cache ID that the node has been assigned to. // NOTE: assignedWorkerID is used for unit tests only. -func (n *Node) assignedWorkerID() workerID { +func (n *Node) assignedWorkerID() cacheID { n.mu.Lock() defer n.mu.Unlock() diff --git a/pkg/causality/internal/node_test.go b/pkg/causality/internal/node_test.go index 776f5359550..85dfe8b7b27 100644 --- a/pkg/causality/internal/node_test.go +++ b/pkg/causality/internal/node_test.go @@ -21,15 +21,24 @@ import ( var _ SlotNode[*Node] = &Node{} // Asserts that *Node implements SlotNode[*Node]. +func newNodeForTest() *Node { + node := NewNode(nil) + node.OnNotified = func(callback func()) { + // run the callback immediately + callback() + } + return node +} + func TestNodeFree(t *testing.T) { // This case should not be run parallel to // others, for fear that the use-after-free - // will race with NewNode() in other cases. + // will race with newNodeForTest() in other cases. - nodeA := NewNode() + nodeA := newNodeForTest() nodeA.Free() - nodeA = NewNode() + nodeA = newNodeForTest() nodeA.Free() // Double freeing should panic. @@ -41,8 +50,8 @@ func TestNodeFree(t *testing.T) { func TestNodeEquals(t *testing.T) { t.Parallel() - nodeA := NewNode() - nodeB := NewNode() + nodeA := newNodeForTest() + nodeB := newNodeForTest() require.False(t, nodeA.NodeID() == nodeB.NodeID()) require.True(t, nodeA.NodeID() == nodeA.NodeID()) } @@ -51,8 +60,8 @@ func TestNodeDependOn(t *testing.T) { t.Parallel() // Construct a dependency graph: A --> B - nodeA := NewNode() - nodeB := NewNode() + nodeA := newNodeForTest() + nodeB := newNodeForTest() nodeA.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB}, 999) require.Equal(t, nodeA.dependerCount(), 0) @@ -63,23 +72,23 @@ func TestNodeSingleDependency(t *testing.T) { t.Parallel() // Node B depends on A, without any other resolved dependencies. - nodeA := NewNode() - nodeB := NewNode() - nodeB.RandWorkerID = func() workerID { return 100 } + nodeA := newNodeForTest() + nodeB := newNodeForTest() + nodeB.RandCacheID = func() cacheID { return 100 } nodeB.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 0) - require.True(t, nodeA.assignTo(1)) - require.Equal(t, workerID(1), nodeA.assignedWorkerID()) - require.Equal(t, workerID(1), nodeB.assignedWorkerID()) + require.True(t, nodeA.tryAssignTo(1)) + require.Equal(t, cacheID(1), nodeA.assignedWorkerID()) + require.Equal(t, cacheID(1), nodeB.assignedWorkerID()) // Node D depends on C, with some other resolved dependencies. - nodeC := NewNode() - nodeD := NewNode() - nodeD.RandWorkerID = func() workerID { return 100 } + nodeC := newNodeForTest() + nodeD := newNodeForTest() + nodeD.RandCacheID = func() cacheID { return 100 } nodeD.DependOn(map[int64]*Node{nodeA.NodeID(): nodeC}, 999) - require.True(t, nodeC.assignTo(2)) - require.Equal(t, workerID(2), nodeC.assignedWorkerID()) + require.True(t, nodeC.tryAssignTo(2)) + require.Equal(t, cacheID(2), nodeC.assignedWorkerID()) nodeC.Remove() - require.Equal(t, workerID(100), nodeD.assignedWorkerID()) + require.Equal(t, cacheID(100), nodeD.assignedWorkerID()) } func TestNodeMultipleDependencies(t *testing.T) { @@ -90,15 +99,15 @@ func TestNodeMultipleDependencies(t *testing.T) { // C─┤ // └────►B - nodeA := NewNode() - nodeB := NewNode() - nodeC := NewNode() + nodeA := newNodeForTest() + nodeB := newNodeForTest() + nodeC := newNodeForTest() nodeC.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA, nodeB.NodeID(): nodeB}, 999) - nodeC.RandWorkerID = func() workerID { return 100 } + nodeC.RandCacheID = func() cacheID { return 100 } - require.True(t, nodeA.assignTo(1)) - require.True(t, nodeB.assignTo(2)) + require.True(t, nodeA.tryAssignTo(1)) + require.True(t, nodeB.tryAssignTo(2)) require.Equal(t, unassigned, nodeC.assignedWorkerID()) @@ -111,34 +120,34 @@ func TestNodeResolveImmediately(t *testing.T) { t.Parallel() // Node A depends on 0 unresolved dependencies and some resolved dependencies. - nodeA := NewNode() - nodeA.RandWorkerID = func() workerID { return workerID(100) } + nodeA := newNodeForTest() + nodeA.RandCacheID = func() cacheID { return cacheID(100) } nodeA.DependOn(nil, 999) - require.Equal(t, workerID(100), nodeA.assignedWorkerID()) + require.Equal(t, cacheID(100), nodeA.assignedWorkerID()) // Node D depends on B and C, all of them are assigned to 1. - nodeB := NewNode() - require.True(t, nodeB.assignTo(1)) - nodeC := NewNode() - require.True(t, nodeC.assignTo(1)) - nodeD := NewNode() - nodeD.RandWorkerID = func() workerID { return workerID(100) } + nodeB := newNodeForTest() + require.True(t, nodeB.tryAssignTo(1)) + nodeC := newNodeForTest() + require.True(t, nodeC.tryAssignTo(1)) + nodeD := newNodeForTest() + nodeD.RandCacheID = func() cacheID { return cacheID(100) } nodeD.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 0) - require.Equal(t, workerID(1), nodeD.assignedWorkerID()) + require.Equal(t, cacheID(1), nodeD.assignedWorkerID()) // Node E depends on B and C and some other resolved dependencies. nodeB.Remove() nodeC.Remove() - nodeE := NewNode() - nodeE.RandWorkerID = func() workerID { return workerID(100) } + nodeE := newNodeForTest() + nodeE.RandCacheID = func() cacheID { return cacheID(100) } nodeE.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 999) - require.Equal(t, workerID(100), nodeE.assignedWorkerID()) + require.Equal(t, cacheID(100), nodeE.assignedWorkerID()) } func TestNodeDependOnSelf(t *testing.T) { t.Parallel() - nodeA := NewNode() + nodeA := newNodeForTest() require.Panics(t, func() { nodeA.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 999) }) @@ -147,7 +156,9 @@ func TestNodeDependOnSelf(t *testing.T) { func TestNodeDoubleAssigning(t *testing.T) { t.Parallel() - nodeA := NewNode() - require.True(t, nodeA.assignTo(1)) - require.False(t, nodeA.assignTo(2)) + // nodeA := newNodeForTest() + // require.True(t, nodeA.tryAssignTo(1)) + // require.False(t, nodeA.tryAssignTo(2)) + + require.True(t, -1 == assignedToAny) } diff --git a/pkg/causality/internal/slots.go b/pkg/causality/internal/slots.go index d6d131e02c8..23ec22bdcf2 100644 --- a/pkg/causality/internal/slots.go +++ b/pkg/causality/internal/slots.go @@ -27,6 +27,8 @@ type slot[E SlotNode[E]] struct { type SlotNode[T any] interface { // NodeID tells the node's ID. NodeID() int64 + // Hashs returns the sorted and deduped hashes of the node. + Hashes() []uint64 // Construct a dependency on `others`. DependOn(dependencyNodes map[int64]T, noDependencyKeyCnt int) // Remove the node itself and notify all dependers. @@ -56,7 +58,8 @@ func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] { } // Add adds an elem to the slots and calls DependOn for elem. -func (s *Slots[E]) Add(elem E, hashes []uint64) { +func (s *Slots[E]) Add(elem E) { + hashes := elem.Hashes() dependencyNodes := make(map[int64]E, len(hashes)) noDependecyCnt := 0 @@ -101,7 +104,8 @@ func (s *Slots[E]) Add(elem E, hashes []uint64) { } // Free removes an element from the Slots. -func (s *Slots[E]) Free(elem E, hashes []uint64) { +func (s *Slots[E]) Free(elem E) { + hashes := elem.Hashes() for _, hash := range hashes { slotIdx := getSlot(hash, s.numSlots) s.slots[slotIdx].mu.Lock() diff --git a/pkg/causality/internal/slots_test.go b/pkg/causality/internal/slots_test.go index 42983909b25..71c41deac0f 100644 --- a/pkg/causality/internal/slots_test.go +++ b/pkg/causality/internal/slots_test.go @@ -30,14 +30,14 @@ func TestSlotsTrivial(t *testing.T) { nodes := make([]*Node, 0, 1000) for i := 0; i < count; i++ { - node := NewNode() - node.RandWorkerID = func() workerID { return 100 } - slots.Add(node, []uint64{1, 2, 3, 4, 5}) + node := NewNode([]uint64{1, 2, 3, 4, 5}) + node.RandCacheID = func() cacheID { return 100 } + slots.Add(node) nodes = append(nodes, node) } for i := 0; i < count; i++ { - slots.Free(nodes[i], []uint64{1, 2, 3, 4, 5}) + slots.Free(nodes[i]) } require.Equal(t, 0, len(slots.slots[1].nodes)) @@ -55,8 +55,8 @@ func TestSlotsConcurrentOps(t *testing.T) { freeNodeChan := make(chan *Node, N) inuseNodeChan := make(chan *Node, N) newNode := func() *Node { - node := NewNode() - node.RandWorkerID = func() workerID { return 100 } + node := NewNode([]uint64{1, 9, 17, 25, 33}) + node.RandCacheID = func() cacheID { return 100 } return node } for i := 0; i < N; i++ { @@ -77,7 +77,7 @@ func TestSlotsConcurrentOps(t *testing.T) { return case node := <-freeNodeChan: // keys belong to the same slot after hash, since slot num is 8 - slots.Add(node, []uint64{1, 9, 17, 25, 33}) + slots.Add(node) inuseNodeChan <- node } } @@ -92,7 +92,7 @@ func TestSlotsConcurrentOps(t *testing.T) { return case node := <-inuseNodeChan: // keys belong to the same slot after hash, since slot num is 8 - slots.Free(node, []uint64{1, 9, 17, 25, 33}) + slots.Free(node) freeNodeChan <- newNode() } } diff --git a/pkg/causality/tests/driver.go b/pkg/causality/tests/driver.go index e4e9f637888..bac3aa50c4d 100644 --- a/pkg/causality/tests/driver.go +++ b/pkg/causality/tests/driver.go @@ -26,7 +26,7 @@ import ( type conflictTestDriver struct { workers []*workerForTest - conflictDetector *causality.ConflictDetector[*workerForTest, *txnForTest] + conflictDetector *causality.ConflictDetector[*txnForTest] generator workloadGenerator pendingCount atomic.Int64 @@ -36,11 +36,17 @@ type conflictTestDriver struct { func newConflictTestDriver( numWorkers int, numSlots int, workload workloadGenerator, ) *conflictTestDriver { + detector := causality.NewConflictDetector[*txnForTest](uint64(numSlots), causality.TxnCacheOption{ + Count: numWorkers, + Size: 1024, + BlockStrategy: causality.BlockStrategyWaitAvailable, + }) + workers := make([]*workerForTest, 0, numWorkers) for i := 0; i < numWorkers; i++ { - workers = append(workers, newWorkerForTest()) + id := int64(i) + workers = append(workers, newWorkerForTest(detector.GetOutChByCacheID(id))) } - detector := causality.NewConflictDetector[*workerForTest, *txnForTest](workers, uint64(numSlots)) return &conflictTestDriver{ workers: workers, conflictDetector: detector, diff --git a/pkg/causality/tests/worker.go b/pkg/causality/tests/worker.go index f78f406ba44..50f50319262 100644 --- a/pkg/causality/tests/worker.go +++ b/pkg/causality/tests/worker.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/engine/pkg/containers" + "github.com/pingcap/tiflow/pkg/causality" ) type txnForTest struct { @@ -39,7 +40,6 @@ func (t *txnForTest) Finish(err error) { type txnWithUnlock struct { *txnForTest - unlock func() } type workerForTest struct { @@ -49,7 +49,7 @@ type workerForTest struct { execFunc func(*txnForTest) error } -func newWorkerForTest() *workerForTest { +func newWorkerForTest(txnCh <-chan causality.TxnWithNotifier[*txnForTest]) *workerForTest { ret := &workerForTest{ txnQueue: containers.NewSliceQueue[txnWithUnlock](), closeCh: make(chan struct{}), @@ -58,46 +58,33 @@ func newWorkerForTest() *workerForTest { ret.wg.Add(1) go func() { defer ret.wg.Done() - ret.run() + ret.run(txnCh) }() return ret } -func (w *workerForTest) Add(txn *txnForTest, unlock func()) { - w.txnQueue.Push(txnWithUnlock{txnForTest: txn, unlock: unlock}) -} - func (w *workerForTest) Close() { close(w.closeCh) w.wg.Wait() } -func (w *workerForTest) run() { -outer: +func (w *workerForTest) run(txnCh <-chan causality.TxnWithNotifier[*txnForTest]) { for { select { case <-w.closeCh: return - case <-w.txnQueue.C: - } - - for { - txn, ok := w.txnQueue.Pop() - if !ok { - continue outer - } - + case txn := <-txnCh: var err error if w.execFunc != nil { - err = errors.Trace(w.execFunc(txn.txnForTest)) + err = errors.Trace(w.execFunc(txn.TxnEvent)) } - txn.unlock() + txn.PostTxnExecuted() // Finish must be called after unlock, // because the conflictTestDriver needs to make sure // that all conflicts have been resolved before exiting. - txn.Finish(err) + txn.TxnEvent.Finish(err) } } } diff --git a/pkg/causality/txn_cache.go b/pkg/causality/txn_cache.go new file mode 100644 index 00000000000..be7ec1ab4d6 --- /dev/null +++ b/pkg/causality/txn_cache.go @@ -0,0 +1,142 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package causality + +import ( + "sync/atomic" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +type txnEvent interface { + // OnConflictResolved is called when the event leaves ConflictDetector. + OnConflictResolved() + + // Hashes are in range [0, math.MaxUint64) and must be deduped. + // + // NOTE: if the conflict detector is accessed by multiple threads concurrently, + // GenSortedDedupKeysHash must also be sorted based on `key % numSlots`. + GenSortedDedupKeysHash(numSlots uint64) []uint64 +} + +// TxnWithNotifier is a wrapper of txnEvent with a PostTxnExecuted. +type TxnWithNotifier[Txn txnEvent] struct { + TxnEvent Txn + // The PostTxnExecuted will remove the txn related Node in the conflict detector's + // dependency graph and resolve related dependencies for these transacitons + // which depend on this executed txn. + // + // NOTE: the PostTxnExecuted() must be called after the txn executed. + PostTxnExecuted func() +} + +// TxnCacheOption is the option for creating a cache for resolved txns. +type TxnCacheOption struct { + // Count controls the number of caches, txns in different caches could be executed concurrently. + Count int + // Size controls the max number of txns a cache can hold. + Size int + // BlockStrategy controls the strategy when the cache is full. + BlockStrategy BlockStrategy +} + +// In current implementation, the conflict detector will push txn to the txnCache. +type txnCache[Txn txnEvent] interface { + // add adds a event to the Cache. + add(txn TxnWithNotifier[Txn]) bool + // out returns a channel to receive events which are ready to be executed. + out() <-chan TxnWithNotifier[Txn] +} + +func newTxnCache[Txn txnEvent](opt TxnCacheOption) txnCache[Txn] { + log.Info("create new worker cache in conflict detector", + zap.Int("cacheCount", opt.Count), + zap.Int("cacheSize", opt.Size), zap.String("BlockStrategy", string(opt.BlockStrategy))) + if opt.Size <= 0 { + log.Panic("WorkerOption.CacheSize should be greater than 0, please report a bug") + } + + switch opt.BlockStrategy { + case BlockStrategyWaitAvailable: + return &boundedTxnCache[Txn]{ch: make(chan TxnWithNotifier[Txn], opt.Size)} + case BlockStrategyWaitEmpty: + return &boundedTxnCacheWithBlock[Txn]{ch: make(chan TxnWithNotifier[Txn], opt.Size)} + default: + return nil + } +} + +// boundedTxnCache is a cache which has a limit on the number of txns it can hold. +// +//nolint:unused +type boundedTxnCache[Txn txnEvent] struct { + ch chan TxnWithNotifier[Txn] +} + +//nolint:unused +func (w *boundedTxnCache[Txn]) add(txn TxnWithNotifier[Txn]) bool { + select { + case w.ch <- txn: + return true + default: + return false + } +} + +//nolint:unused +func (w *boundedTxnCache[Txn]) out() <-chan TxnWithNotifier[Txn] { + return w.ch +} + +// boundedTxnCacheWithBlock is a special boundedWorker. Once the cache +// is full, it will block until all cached txns are consumed. +type boundedTxnCacheWithBlock[Txn txnEvent] struct { + ch chan TxnWithNotifier[Txn] + //nolint:unused + isBlocked atomic.Bool +} + +//nolint:unused +func (w *boundedTxnCacheWithBlock[Txn]) add(txn TxnWithNotifier[Txn]) bool { + if w.isBlocked.Load() && len(w.ch) <= 0 { + w.isBlocked.Store(false) + } + + if !w.isBlocked.Load() { + select { + case w.ch <- txn: + return true + default: + w.isBlocked.CompareAndSwap(false, true) + } + } + return false +} + +//nolint:unused +func (w *boundedTxnCacheWithBlock[Txn]) out() <-chan TxnWithNotifier[Txn] { + return w.ch +} + +// BlockStrategy is the strategy to handle the situation when the cache is full. +type BlockStrategy string + +const ( + // BlockStrategyWaitAvailable means the cache will block until there is an available slot. + BlockStrategyWaitAvailable BlockStrategy = "waitAvailable" + // BlockStrategyWaitEmpty means the cache will block until all cached txns are consumed. + BlockStrategyWaitEmpty = "waitAll" + // TODO: maybe we can implement a strategy that can automatically adapt to different scenarios +) diff --git a/pkg/causality/txn_cache_test.go b/pkg/causality/txn_cache_test.go new file mode 100644 index 00000000000..3ddfe4e518e --- /dev/null +++ b/pkg/causality/txn_cache_test.go @@ -0,0 +1,85 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package causality + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type mockTxnEvent struct{} + +func (m mockTxnEvent) OnConflictResolved() { +} + +func (m mockTxnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 { + return nil +} + +func TestBoundedWorker(t *testing.T) { + t.Parallel() + + size := 100 + // Create a worker with 1 worker and 1 cache size. + worker := newTxnCache[txnEvent](TxnCacheOption{ + Count: 1, + Size: size, + BlockStrategy: BlockStrategyWaitAvailable, + }) + for i := 0; i < size; i++ { + // Add 10 events to the worker. + ok := worker.add(TxnWithNotifier[txnEvent]{ + TxnEvent: mockTxnEvent{}, + PostTxnExecuted: func() {}, + }) + require.True(t, ok) + } + e := TxnWithNotifier[txnEvent]{ + TxnEvent: mockTxnEvent{}, + PostTxnExecuted: func() {}, + } + require.False(t, worker.add(e)) + e = <-worker.out() + require.NotNil(t, e) + require.True(t, worker.add(e)) +} + +func TestBoundedWorkerWithBlock(t *testing.T) { + t.Parallel() + + size := 100 + // Create a worker with 1 worker and 1 cache size. + worker := newTxnCache[txnEvent](TxnCacheOption{ + Count: 1, + Size: size, + BlockStrategy: BlockStrategyWaitEmpty, + }) + for i := 0; i < size; i++ { + // Add 10 events to the worker. + ok := worker.add(TxnWithNotifier[txnEvent]{ + TxnEvent: mockTxnEvent{}, + PostTxnExecuted: func() {}, + }) + require.True(t, ok) + } + e := TxnWithNotifier[txnEvent]{ + TxnEvent: mockTxnEvent{}, + PostTxnExecuted: func() {}, + } + require.False(t, worker.add(e)) + e = <-worker.out() + require.NotNil(t, e) + require.False(t, worker.add(e)) +} diff --git a/pkg/causality/worker.go b/pkg/causality/worker.go deleted file mode 100644 index 738ba889183..00000000000 --- a/pkg/causality/worker.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package causality - -type txnEvent interface { - // OnConflictResolved is called when the event leaves ConflictDetector. - OnConflictResolved() - - // Hashes are in range [0, math.MaxUint64) and must be deduped. - // - // NOTE: if the conflict detector is accessed by multiple threads concurrently, - // GenSortedDedupKeysHash must also be sorted based on `key % numSlots`. - GenSortedDedupKeysHash(numSlots uint64) []uint64 -} - -type worker[Txn txnEvent] interface { - Add(txn Txn, unlock func()) -}