Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink-to-mysql(cdc) simplify conflict detector, prevent workload skew issue (#10376) #10780

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
// nextWorkerID is used to dispatch transactions round-robin.
nextWorkerID atomic.Int64

// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
// Used to run a background goroutine to GC.
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
Expand All @@ -54,12 +53,11 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewDrainableChann[func()](),
garbageNodes: chann.NewDrainableChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
garbageNodes: chann.NewDrainableChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
}

ret.wg.Add(1)
Expand All @@ -78,23 +76,23 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
// SendToWorker is called after all dependencies are removed.
node.SendToWorker = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// and remove related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are resolved.
// Send this txn to related worker as soon as all dependencies are removed.
d.sendToWorker(txn, postTxnExecuted, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
}

Expand All @@ -106,17 +104,12 @@ func (d *ConflictDetector[Worker, Txn]) Close() {

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
}()
for {
select {
case <-d.closeCh:
return
case notifyCallback := <-d.notifiedNodes.Out():
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
Expand Down
157 changes: 35 additions & 122 deletions pkg/causality/internal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package internal

import (
"fmt"
"sync"
stdatomic "sync/atomic"

Expand Down Expand Up @@ -48,18 +49,14 @@ type Node struct {
// Immutable fields.
id int64

// Called when all dependencies are resolved.
OnResolved func(id workerID)
// Set the id generator to get a random ID.
// SendToWorker is used to send the node to a worker.
SendToWorker func(id workerID)
// RandWorkerID is used to select a worker randomly.
RandWorkerID func() workerID
// Set the callback that the node is notified.
OnNotified func(callback func())

// Following fields are used for notifying a node's dependers lock-free.
totalDependencies int32
resolvedDependencies int32
removedDependencies int32
resolvedList []int64
totalDependencies int32
removedDependencies int32

// Following fields are protected by `mu`.
mu sync.Mutex
Expand All @@ -85,12 +82,10 @@ type Node struct {
func NewNode() (ret *Node) {
defer func() {
ret.id = genNextNodeID()
ret.OnResolved = nil
ret.SendToWorker = nil
ret.RandWorkerID = nil
ret.totalDependencies = 0
ret.resolvedDependencies = 0
ret.removedDependencies = 0
ret.resolvedList = nil
ret.assignedTo = unassigned
ret.removed = false
}()
Expand All @@ -106,42 +101,20 @@ func (n *Node) NodeID() int64 {

// DependOn implements interface internal.SlotNode.
func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) {
resolvedDependencies, removedDependencies := int32(0), int32(0)

depend := func(target *Node) {
if target == nil {
// For a given Node, every dependency corresponds to a target.
// If target is nil it means the dependency doesn't conflict
// with any other nodes. However it's still necessary to track
// it because Node.tryResolve needs to counting the number of
// resolved dependencies.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny)
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
return
}

if target.id == n.id {
panic("you cannot depend on yourself")
panic("cannot depend on yourself")
}

// The target node might be removed or modified in other places, for example
// after its corresponding transaction has been executed.
target.mu.Lock()
defer target.mu.Unlock()

if target.assignedTo != unassigned {
// The target has already been assigned to a worker.
// In this case, record the worker ID in `resolvedList`, and this node
// probably can be sent to the same worker and executed sequentially.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo)
}

// Add the node to the target's dependers if the target has not been removed.
// Add the node to the target's dependers if the target is not removed.
if target.removed {
// The target has already been removed.
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
stdatomic.AddInt32(&n.removedDependencies, 1)
} else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist {
// Should never depend on a target redundantly.
panic("should never exist")
Expand All @@ -153,24 +126,19 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int)
// ?: why gen new ID here?
n.id = genNextNodeID()

// `totalDependcies` and `resolvedList` must be initialized before depending on any targets.
// `totalDependcies` must be initialized before depending on any targets.
n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt)
n.resolvedList = make([]int64, 0, n.totalDependencies)
for i := 0; i < int(n.totalDependencies); i++ {
n.resolvedList = append(n.resolvedList, unassigned)
}
n.removedDependencies = int32(noDependencyKeyCnt)

for _, node := range dependencyNodes {
depend(node)
}
for i := 0; i < noDependencyKeyCnt; i++ {
depend(nil)
}

n.maybeResolve(resolvedDependencies, removedDependencies)
n.maybeReadyToRun()
}

// Remove implements interface internal.SlotNode.
// Remove will be called after related transaction is executed.
func (n *Node) Remove() {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -179,8 +147,8 @@ func (n *Node) Remove() {
if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeResolve(0, removedDependencies)
stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeReadyToRun()
return true
})
n.dependers.Clear(true)
Expand All @@ -197,7 +165,7 @@ func (n *Node) Free() {
}

n.id = invalidNodeID
n.OnResolved = nil
n.SendToWorker = nil
n.RandWorkerID = nil

// TODO: reuse node if necessary. Currently it's impossible if async-notify is used.
Expand All @@ -212,96 +180,41 @@ func (n *Node) assignTo(workerID int64) bool {
defer n.mu.Unlock()

if n.assignedTo != unassigned {
// Already resolved by some other guys.
// Already handled by some other guys.
return false
}

n.assignedTo = workerID
if n.OnResolved != nil {
n.OnResolved(workerID)
n.OnResolved = nil
}

if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1)
stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo)
node.maybeResolve(resolvedDependencies, 0)
return true
})
if n.SendToWorker != nil {
n.SendToWorker(workerID)
n.SendToWorker = nil
}

return true
}

func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) {
if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok {
if workerNum < 0 {
panic("Node.tryResolve must return a valid worker ID")
}
if n.OnNotified != nil {
// Notify the conflict detector background worker to assign the node to the worker asynchronously.
n.OnNotified(func() { n.assignTo(workerNum) })
} else {
// Assign the node to the worker directly.
n.assignTo(workerNum)
}
}
}

// tryResolve try to find a worker to assign the node to.
// Returns (_, false) if there is a conflict,
// returns (rand, true) if there is no conflict,
// returns (N, true) if only worker N can be used.
func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies)
if resolved && assignedTo == assignedToAny {
assignedTo = n.RandWorkerID()
func (n *Node) maybeReadyToRun() {
if ok := n.checkReadiness(); ok {
// Assign the node to the worker directly.
n.assignTo(n.RandWorkerID())
}
return assignedTo, resolved
}

func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
// checkReadiness check if all dependencies have been removed.
// Returns false if there are some conflicts, returns true if all dependencies
// are removed.
func (n *Node) checkReadiness() bool {
if n.totalDependencies == 0 {
// No conflicts, can select any workers.
return assignedToAny, true
}

if resolvedDependencies == n.totalDependencies {
firstDep := stdatomic.LoadInt64(&n.resolvedList[0])
hasDiffDep := false
for i := 1; i < int(n.totalDependencies); i++ {
curr := stdatomic.LoadInt64(&n.resolvedList[i])
// // Todo: simplify assign to logic, only resolve dependencies nodes after
// // corresponding transactions are executed.
// //
// // In DependOn, depend(nil) set resolvedList[i] to assignedToAny
// // for these no dependecy keys.
// if curr == assignedToAny {
// continue
// }
if firstDep != curr {
hasDiffDep = true
break
}
}
if !hasDiffDep {
// If all dependency nodes are assigned to the same worker, we can assign
// this node to the same worker directly, and they will execute sequentially.
// On the other hand, if dependency nodes are assigned to different workers,
// This node has to wait all dependency txn executed and all depencecy nodes
// are removed.
return firstDep, true
}
return true
}

// All dependcies are removed, so assign the node to any worker is fine.
if removedDependencies == n.totalDependencies {
return assignedToAny, true
removedDependencies := stdatomic.LoadInt32(&n.removedDependencies)
if removedDependencies > n.totalDependencies {
panic(fmt.Sprintf("removedDependencies %d > totalDependencies %d which is not expected",
removedDependencies, n.totalDependencies))
}

return unassigned, false
return removedDependencies == n.totalDependencies
}

func (n *Node) getOrCreateDependers() *btree.BTreeG[*Node] {
Expand Down
17 changes: 12 additions & 5 deletions pkg/causality/internal/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,18 @@ func TestNodeDependOn(t *testing.T) {
func TestNodeSingleDependency(t *testing.T) {
t.Parallel()

// Node B depends on A, without any other resolved dependencies.
// Node B depends on A, without any other removed dependencies.
nodeA := NewNode()
nodeB := NewNode()
nodeB.RandWorkerID = func() workerID { return 100 }
nodeB.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 0)
require.True(t, nodeA.assignTo(1))
require.Equal(t, workerID(1), nodeA.assignedWorkerID())
require.Equal(t, workerID(1), nodeB.assignedWorkerID())
// Node B should be unassigned before Node A is removed.
require.Equal(t, unassigned, nodeB.assignedWorkerID())
nodeA.Remove()
// Node B should be assigned to random worker after Node A is removed.
require.Equal(t, workerID(100), nodeB.assignedWorkerID())

// Node D depends on C, with some other resolved dependencies.
nodeC := NewNode()
Expand Down Expand Up @@ -124,11 +128,14 @@ func TestNodeResolveImmediately(t *testing.T) {
nodeD := NewNode()
nodeD.RandWorkerID = func() workerID { return workerID(100) }
nodeD.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 0)
require.Equal(t, workerID(1), nodeD.assignedWorkerID())

// Node E depends on B and C and some other resolved dependencies.
// NodeD should be unassigned before Node B and C are removed.
require.Equal(t, unassigned, nodeD.assignedWorkerID())
nodeB.Remove()
nodeC.Remove()
// NodeD should be assigned to random worker after Node B and C are removed.
require.Equal(t, workerID(100), nodeD.assignedWorkerID())

// Node E depends on B and C and some other resolved dependencies.
nodeE := NewNode()
nodeE.RandWorkerID = func() workerID { return workerID(100) }
nodeE.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 999)
Expand Down
2 changes: 1 addition & 1 deletion pkg/causality/internal/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSlotsConcurrentOps(t *testing.T) {
freeNodeChan <- newNode()
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

// test concurrent add and remove won't panic
Expand Down
Loading