From 8f4e56affc0c467cfd37d49bcc4cb242b4914f7a Mon Sep 17 00:00:00 2001 From: Jinpeng Zhang Date: Mon, 1 Jan 2024 21:41:01 -0800 Subject: [PATCH] sink-to-mysql(cdc) simplify conflict detector, prevent workload skew issue (#10376) close pingcap/tiflow#10341 --- pkg/causality/conflict_detector.go | 33 +++--- pkg/causality/internal/node.go | 157 ++++++--------------------- pkg/causality/internal/node_test.go | 17 ++- pkg/causality/internal/slots_test.go | 2 +- 4 files changed, 61 insertions(+), 148 deletions(-) diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index 0ad209c5106..9d58731bf19 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -36,11 +36,10 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct { // nextWorkerID is used to dispatch transactions round-robin. nextWorkerID atomic.Int64 - // Used to run a background goroutine to GC or notify nodes. - notifiedNodes *chann.DrainableChann[func()] - garbageNodes *chann.DrainableChann[txnFinishedEvent] - wg sync.WaitGroup - closeCh chan struct{} + // Used to run a background goroutine to GC. + garbageNodes *chann.DrainableChann[txnFinishedEvent] + wg sync.WaitGroup + closeCh chan struct{} } type txnFinishedEvent struct { @@ -54,12 +53,11 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( numSlots uint64, ) *ConflictDetector[Worker, Txn] { ret := &ConflictDetector[Worker, Txn]{ - workers: workers, - slots: internal.NewSlots[*internal.Node](numSlots), - numSlots: numSlots, - notifiedNodes: chann.NewDrainableChann[func()](), - garbageNodes: chann.NewDrainableChann[txnFinishedEvent](), - closeCh: make(chan struct{}), + workers: workers, + slots: internal.NewSlots[*internal.Node](numSlots), + numSlots: numSlots, + garbageNodes: chann.NewDrainableChann[txnFinishedEvent](), + closeCh: make(chan struct{}), } ret.wg.Add(1) @@ -78,11 +76,12 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots) node := internal.NewNode() - node.OnResolved = func(workerID int64) { + // SendToWorker is called after all dependencies are removed. + node.SendToWorker = func(workerID int64) { // This callback is called after the transaction is executed. postTxnExecuted := func() { // After this transaction is executed, we can remove the node from the graph, - // and resolve related dependencies for these transacitons which depend on this + // and remove related dependencies for these transacitons which depend on this // executed transaction. node.Remove() @@ -90,11 +89,10 @@ func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { // occupied related slots. d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash} } - // Send this txn to related worker as soon as all dependencies are resolved. + // Send this txn to related worker as soon as all dependencies are removed. d.sendToWorker(txn, postTxnExecuted, workerID) } node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) } - node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback } d.slots.Add(node, sortedKeysHash) } @@ -106,17 +104,12 @@ func (d *ConflictDetector[Worker, Txn]) Close() { func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { defer func() { - d.notifiedNodes.CloseAndDrain() d.garbageNodes.CloseAndDrain() }() for { select { case <-d.closeCh: return - case notifyCallback := <-d.notifiedNodes.Out(): - if notifyCallback != nil { - notifyCallback() - } case event := <-d.garbageNodes.Out(): if event.node != nil { d.slots.Free(event.node, event.sortedKeysHash) diff --git a/pkg/causality/internal/node.go b/pkg/causality/internal/node.go index cfcd5fc2ff8..34d7574ea78 100644 --- a/pkg/causality/internal/node.go +++ b/pkg/causality/internal/node.go @@ -14,6 +14,7 @@ package internal import ( + "fmt" "sync" stdatomic "sync/atomic" @@ -48,18 +49,14 @@ type Node struct { // Immutable fields. id int64 - // Called when all dependencies are resolved. - OnResolved func(id workerID) - // Set the id generator to get a random ID. + // SendToWorker is used to send the node to a worker. + SendToWorker func(id workerID) + // RandWorkerID is used to select a worker randomly. RandWorkerID func() workerID - // Set the callback that the node is notified. - OnNotified func(callback func()) // Following fields are used for notifying a node's dependers lock-free. - totalDependencies int32 - resolvedDependencies int32 - removedDependencies int32 - resolvedList []int64 + totalDependencies int32 + removedDependencies int32 // Following fields are protected by `mu`. mu sync.Mutex @@ -85,12 +82,10 @@ type Node struct { func NewNode() (ret *Node) { defer func() { ret.id = genNextNodeID() - ret.OnResolved = nil + ret.SendToWorker = nil ret.RandWorkerID = nil ret.totalDependencies = 0 - ret.resolvedDependencies = 0 ret.removedDependencies = 0 - ret.resolvedList = nil ret.assignedTo = unassigned ret.removed = false }() @@ -106,23 +101,9 @@ func (n *Node) NodeID() int64 { // DependOn implements interface internal.SlotNode. func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) { - resolvedDependencies, removedDependencies := int32(0), int32(0) - depend := func(target *Node) { - if target == nil { - // For a given Node, every dependency corresponds to a target. - // If target is nil it means the dependency doesn't conflict - // with any other nodes. However it's still necessary to track - // it because Node.tryResolve needs to counting the number of - // resolved dependencies. - resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1) - stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny) - removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1) - return - } - if target.id == n.id { - panic("you cannot depend on yourself") + panic("cannot depend on yourself") } // The target node might be removed or modified in other places, for example @@ -130,18 +111,10 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) target.mu.Lock() defer target.mu.Unlock() - if target.assignedTo != unassigned { - // The target has already been assigned to a worker. - // In this case, record the worker ID in `resolvedList`, and this node - // probably can be sent to the same worker and executed sequentially. - resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1) - stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo) - } - - // Add the node to the target's dependers if the target has not been removed. + // Add the node to the target's dependers if the target is not removed. if target.removed { // The target has already been removed. - removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1) + stdatomic.AddInt32(&n.removedDependencies, 1) } else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist { // Should never depend on a target redundantly. panic("should never exist") @@ -153,24 +126,19 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) // ?: why gen new ID here? n.id = genNextNodeID() - // `totalDependcies` and `resolvedList` must be initialized before depending on any targets. + // `totalDependcies` must be initialized before depending on any targets. n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt) - n.resolvedList = make([]int64, 0, n.totalDependencies) - for i := 0; i < int(n.totalDependencies); i++ { - n.resolvedList = append(n.resolvedList, unassigned) - } + n.removedDependencies = int32(noDependencyKeyCnt) for _, node := range dependencyNodes { depend(node) } - for i := 0; i < noDependencyKeyCnt; i++ { - depend(nil) - } - n.maybeResolve(resolvedDependencies, removedDependencies) + n.maybeReadyToRun() } // Remove implements interface internal.SlotNode. +// Remove will be called after related transaction is executed. func (n *Node) Remove() { n.mu.Lock() defer n.mu.Unlock() @@ -179,8 +147,8 @@ func (n *Node) Remove() { if n.dependers != nil { // `mu` must be holded during accessing dependers. n.dependers.Ascend(func(node *Node) bool { - removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1) - node.maybeResolve(0, removedDependencies) + stdatomic.AddInt32(&node.removedDependencies, 1) + node.maybeReadyToRun() return true }) n.dependers.Clear(true) @@ -197,7 +165,7 @@ func (n *Node) Free() { } n.id = invalidNodeID - n.OnResolved = nil + n.SendToWorker = nil n.RandWorkerID = nil // TODO: reuse node if necessary. Currently it's impossible if async-notify is used. @@ -212,96 +180,41 @@ func (n *Node) assignTo(workerID int64) bool { defer n.mu.Unlock() if n.assignedTo != unassigned { - // Already resolved by some other guys. + // Already handled by some other guys. return false } n.assignedTo = workerID - if n.OnResolved != nil { - n.OnResolved(workerID) - n.OnResolved = nil - } - - if n.dependers != nil { - // `mu` must be holded during accessing dependers. - n.dependers.Ascend(func(node *Node) bool { - resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1) - stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo) - node.maybeResolve(resolvedDependencies, 0) - return true - }) + if n.SendToWorker != nil { + n.SendToWorker(workerID) + n.SendToWorker = nil } return true } -func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) { - if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok { - if workerNum < 0 { - panic("Node.tryResolve must return a valid worker ID") - } - if n.OnNotified != nil { - // Notify the conflict detector background worker to assign the node to the worker asynchronously. - n.OnNotified(func() { n.assignTo(workerNum) }) - } else { - // Assign the node to the worker directly. - n.assignTo(workerNum) - } - } -} - -// tryResolve try to find a worker to assign the node to. -// Returns (_, false) if there is a conflict, -// returns (rand, true) if there is no conflict, -// returns (N, true) if only worker N can be used. -func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) { - assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies) - if resolved && assignedTo == assignedToAny { - assignedTo = n.RandWorkerID() +func (n *Node) maybeReadyToRun() { + if ok := n.checkReadiness(); ok { + // Assign the node to the worker directly. + n.assignTo(n.RandWorkerID()) } - return assignedTo, resolved } -func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) { +// checkReadiness check if all dependencies have been removed. +// Returns false if there are some conflicts, returns true if all dependencies +// are removed. +func (n *Node) checkReadiness() bool { if n.totalDependencies == 0 { // No conflicts, can select any workers. - return assignedToAny, true - } - - if resolvedDependencies == n.totalDependencies { - firstDep := stdatomic.LoadInt64(&n.resolvedList[0]) - hasDiffDep := false - for i := 1; i < int(n.totalDependencies); i++ { - curr := stdatomic.LoadInt64(&n.resolvedList[i]) - // // Todo: simplify assign to logic, only resolve dependencies nodes after - // // corresponding transactions are executed. - // // - // // In DependOn, depend(nil) set resolvedList[i] to assignedToAny - // // for these no dependecy keys. - // if curr == assignedToAny { - // continue - // } - if firstDep != curr { - hasDiffDep = true - break - } - } - if !hasDiffDep { - // If all dependency nodes are assigned to the same worker, we can assign - // this node to the same worker directly, and they will execute sequentially. - // On the other hand, if dependency nodes are assigned to different workers, - // This node has to wait all dependency txn executed and all depencecy nodes - // are removed. - return firstDep, true - } + return true } - // All dependcies are removed, so assign the node to any worker is fine. - if removedDependencies == n.totalDependencies { - return assignedToAny, true + removedDependencies := stdatomic.LoadInt32(&n.removedDependencies) + if removedDependencies > n.totalDependencies { + panic(fmt.Sprintf("removedDependencies %d > totalDependencies %d which is not expected", + removedDependencies, n.totalDependencies)) } - - return unassigned, false + return removedDependencies == n.totalDependencies } func (n *Node) getOrCreateDependers() *btree.BTreeG[*Node] { diff --git a/pkg/causality/internal/node_test.go b/pkg/causality/internal/node_test.go index 776f5359550..b1aa05a0eb3 100644 --- a/pkg/causality/internal/node_test.go +++ b/pkg/causality/internal/node_test.go @@ -62,14 +62,18 @@ func TestNodeDependOn(t *testing.T) { func TestNodeSingleDependency(t *testing.T) { t.Parallel() - // Node B depends on A, without any other resolved dependencies. + // Node B depends on A, without any other removed dependencies. nodeA := NewNode() nodeB := NewNode() nodeB.RandWorkerID = func() workerID { return 100 } nodeB.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 0) require.True(t, nodeA.assignTo(1)) require.Equal(t, workerID(1), nodeA.assignedWorkerID()) - require.Equal(t, workerID(1), nodeB.assignedWorkerID()) + // Node B should be unassigned before Node A is removed. + require.Equal(t, unassigned, nodeB.assignedWorkerID()) + nodeA.Remove() + // Node B should be assigned to random worker after Node A is removed. + require.Equal(t, workerID(100), nodeB.assignedWorkerID()) // Node D depends on C, with some other resolved dependencies. nodeC := NewNode() @@ -124,11 +128,14 @@ func TestNodeResolveImmediately(t *testing.T) { nodeD := NewNode() nodeD.RandWorkerID = func() workerID { return workerID(100) } nodeD.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 0) - require.Equal(t, workerID(1), nodeD.assignedWorkerID()) - - // Node E depends on B and C and some other resolved dependencies. + // NodeD should be unassigned before Node B and C are removed. + require.Equal(t, unassigned, nodeD.assignedWorkerID()) nodeB.Remove() nodeC.Remove() + // NodeD should be assigned to random worker after Node B and C are removed. + require.Equal(t, workerID(100), nodeD.assignedWorkerID()) + + // Node E depends on B and C and some other resolved dependencies. nodeE := NewNode() nodeE.RandWorkerID = func() workerID { return workerID(100) } nodeE.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 999) diff --git a/pkg/causality/internal/slots_test.go b/pkg/causality/internal/slots_test.go index 42983909b25..101a14e5162 100644 --- a/pkg/causality/internal/slots_test.go +++ b/pkg/causality/internal/slots_test.go @@ -63,7 +63,7 @@ func TestSlotsConcurrentOps(t *testing.T) { freeNodeChan <- newNode() } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() // test concurrent add and remove won't panic