Skip to content

Commit

Permalink
This is an automated cherry-pick of #10376
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
zhangjinpeng87 authored and ti-chi-bot committed Mar 14, 2024
1 parent 70447e5 commit 47e3ea8
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 142 deletions.
30 changes: 16 additions & 14 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
// nextWorkerID is used to dispatch transactions round-robin.
nextWorkerID atomic.Int64

// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
// Used to run a background goroutine to GC.
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
Expand All @@ -54,12 +53,20 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
<<<<<<< HEAD

Check failure on line 56 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expecting expression

Check failure on line 56 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expecting expression
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),

Check failure on line 58 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ] in composite literal; possibly missing comma or }

Check failure on line 58 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ] in composite literal; possibly missing comma or }
numSlots: numSlots,
notifiedNodes: chann.NewDrainableChann[func()](),
garbageNodes: chann.NewDrainableChann[txnFinishedEvent](),
closeCh: make(chan struct{}),

Check failure on line 62 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ) after top level declaration

Check failure on line 62 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ) after top level declaration
=======
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
>>>>>>> 9e0cacfc75 (sink-to-mysql(cdc) simplify conflict detector, prevent workload skew issue (#10376))

Check failure on line 69 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 69 in pkg/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
}

ret.wg.Add(1)
Expand All @@ -78,23 +85,23 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
// SendToWorker is called after all dependencies are removed.
node.SendToWorker = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// and remove related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are resolved.
// Send this txn to related worker as soon as all dependencies are removed.
d.sendToWorker(txn, postTxnExecuted, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
}

Expand All @@ -106,17 +113,12 @@ func (d *ConflictDetector[Worker, Txn]) Close() {

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
}()
for {
select {
case <-d.closeCh:
return
case notifyCallback := <-d.notifiedNodes.Out():
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
Expand Down
157 changes: 35 additions & 122 deletions pkg/causality/internal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package internal

import (
"fmt"
"sync"
stdatomic "sync/atomic"

Expand Down Expand Up @@ -48,18 +49,14 @@ type Node struct {
// Immutable fields.
id int64

// Called when all dependencies are resolved.
OnResolved func(id workerID)
// Set the id generator to get a random ID.
// SendToWorker is used to send the node to a worker.
SendToWorker func(id workerID)
// RandWorkerID is used to select a worker randomly.
RandWorkerID func() workerID
// Set the callback that the node is notified.
OnNotified func(callback func())

// Following fields are used for notifying a node's dependers lock-free.
totalDependencies int32
resolvedDependencies int32
removedDependencies int32
resolvedList []int64
totalDependencies int32
removedDependencies int32

// Following fields are protected by `mu`.
mu sync.Mutex
Expand All @@ -85,12 +82,10 @@ type Node struct {
func NewNode() (ret *Node) {
defer func() {
ret.id = genNextNodeID()
ret.OnResolved = nil
ret.SendToWorker = nil
ret.RandWorkerID = nil
ret.totalDependencies = 0
ret.resolvedDependencies = 0
ret.removedDependencies = 0
ret.resolvedList = nil
ret.assignedTo = unassigned
ret.removed = false
}()
Expand All @@ -106,42 +101,20 @@ func (n *Node) NodeID() int64 {

// DependOn implements interface internal.SlotNode.
func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) {
resolvedDependencies, removedDependencies := int32(0), int32(0)

depend := func(target *Node) {
if target == nil {
// For a given Node, every dependency corresponds to a target.
// If target is nil it means the dependency doesn't conflict
// with any other nodes. However it's still necessary to track
// it because Node.tryResolve needs to counting the number of
// resolved dependencies.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny)
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
return
}

if target.id == n.id {
panic("you cannot depend on yourself")
panic("cannot depend on yourself")
}

// The target node might be removed or modified in other places, for example
// after its corresponding transaction has been executed.
target.mu.Lock()
defer target.mu.Unlock()

if target.assignedTo != unassigned {
// The target has already been assigned to a worker.
// In this case, record the worker ID in `resolvedList`, and this node
// probably can be sent to the same worker and executed sequentially.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo)
}

// Add the node to the target's dependers if the target has not been removed.
// Add the node to the target's dependers if the target is not removed.
if target.removed {
// The target has already been removed.
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
stdatomic.AddInt32(&n.removedDependencies, 1)
} else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist {
// Should never depend on a target redundantly.
panic("should never exist")
Expand All @@ -153,24 +126,19 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int)
// ?: why gen new ID here?
n.id = genNextNodeID()

// `totalDependcies` and `resolvedList` must be initialized before depending on any targets.
// `totalDependcies` must be initialized before depending on any targets.
n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt)
n.resolvedList = make([]int64, 0, n.totalDependencies)
for i := 0; i < int(n.totalDependencies); i++ {
n.resolvedList = append(n.resolvedList, unassigned)
}
n.removedDependencies = int32(noDependencyKeyCnt)

for _, node := range dependencyNodes {
depend(node)
}
for i := 0; i < noDependencyKeyCnt; i++ {
depend(nil)
}

n.maybeResolve(resolvedDependencies, removedDependencies)
n.maybeReadyToRun()
}

// Remove implements interface internal.SlotNode.
// Remove will be called after related transaction is executed.
func (n *Node) Remove() {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -179,8 +147,8 @@ func (n *Node) Remove() {
if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeResolve(0, removedDependencies)
stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeReadyToRun()
return true
})
n.dependers.Clear(true)
Expand All @@ -197,7 +165,7 @@ func (n *Node) Free() {
}

n.id = invalidNodeID
n.OnResolved = nil
n.SendToWorker = nil
n.RandWorkerID = nil

// TODO: reuse node if necessary. Currently it's impossible if async-notify is used.
Expand All @@ -212,96 +180,41 @@ func (n *Node) assignTo(workerID int64) bool {
defer n.mu.Unlock()

if n.assignedTo != unassigned {
// Already resolved by some other guys.
// Already handled by some other guys.
return false
}

n.assignedTo = workerID
if n.OnResolved != nil {
n.OnResolved(workerID)
n.OnResolved = nil
}

if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1)
stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo)
node.maybeResolve(resolvedDependencies, 0)
return true
})
if n.SendToWorker != nil {
n.SendToWorker(workerID)
n.SendToWorker = nil
}

return true
}

func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) {
if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok {
if workerNum < 0 {
panic("Node.tryResolve must return a valid worker ID")
}
if n.OnNotified != nil {
// Notify the conflict detector background worker to assign the node to the worker asynchronously.
n.OnNotified(func() { n.assignTo(workerNum) })
} else {
// Assign the node to the worker directly.
n.assignTo(workerNum)
}
}
}

// tryResolve try to find a worker to assign the node to.
// Returns (_, false) if there is a conflict,
// returns (rand, true) if there is no conflict,
// returns (N, true) if only worker N can be used.
func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies)
if resolved && assignedTo == assignedToAny {
assignedTo = n.RandWorkerID()
func (n *Node) maybeReadyToRun() {
if ok := n.checkReadiness(); ok {
// Assign the node to the worker directly.
n.assignTo(n.RandWorkerID())
}
return assignedTo, resolved
}

func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
// checkReadiness check if all dependencies have been removed.
// Returns false if there are some conflicts, returns true if all dependencies
// are removed.
func (n *Node) checkReadiness() bool {
if n.totalDependencies == 0 {
// No conflicts, can select any workers.
return assignedToAny, true
}

if resolvedDependencies == n.totalDependencies {
firstDep := stdatomic.LoadInt64(&n.resolvedList[0])
hasDiffDep := false
for i := 1; i < int(n.totalDependencies); i++ {
curr := stdatomic.LoadInt64(&n.resolvedList[i])
// // Todo: simplify assign to logic, only resolve dependencies nodes after
// // corresponding transactions are executed.
// //
// // In DependOn, depend(nil) set resolvedList[i] to assignedToAny
// // for these no dependecy keys.
// if curr == assignedToAny {
// continue
// }
if firstDep != curr {
hasDiffDep = true
break
}
}
if !hasDiffDep {
// If all dependency nodes are assigned to the same worker, we can assign
// this node to the same worker directly, and they will execute sequentially.
// On the other hand, if dependency nodes are assigned to different workers,
// This node has to wait all dependency txn executed and all depencecy nodes
// are removed.
return firstDep, true
}
return true
}

// All dependcies are removed, so assign the node to any worker is fine.
if removedDependencies == n.totalDependencies {
return assignedToAny, true
removedDependencies := stdatomic.LoadInt32(&n.removedDependencies)
if removedDependencies > n.totalDependencies {
panic(fmt.Sprintf("removedDependencies %d > totalDependencies %d which is not expected",
removedDependencies, n.totalDependencies))
}

return unassigned, false
return removedDependencies == n.totalDependencies
}

func (n *Node) getOrCreateDependers() *btree.BTreeG[*Node] {
Expand Down
17 changes: 12 additions & 5 deletions pkg/causality/internal/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,18 @@ func TestNodeDependOn(t *testing.T) {
func TestNodeSingleDependency(t *testing.T) {
t.Parallel()

// Node B depends on A, without any other resolved dependencies.
// Node B depends on A, without any other removed dependencies.
nodeA := NewNode()
nodeB := NewNode()
nodeB.RandWorkerID = func() workerID { return 100 }
nodeB.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 0)
require.True(t, nodeA.assignTo(1))
require.Equal(t, workerID(1), nodeA.assignedWorkerID())
require.Equal(t, workerID(1), nodeB.assignedWorkerID())
// Node B should be unassigned before Node A is removed.
require.Equal(t, unassigned, nodeB.assignedWorkerID())
nodeA.Remove()
// Node B should be assigned to random worker after Node A is removed.
require.Equal(t, workerID(100), nodeB.assignedWorkerID())

// Node D depends on C, with some other resolved dependencies.
nodeC := NewNode()
Expand Down Expand Up @@ -124,11 +128,14 @@ func TestNodeResolveImmediately(t *testing.T) {
nodeD := NewNode()
nodeD.RandWorkerID = func() workerID { return workerID(100) }
nodeD.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 0)
require.Equal(t, workerID(1), nodeD.assignedWorkerID())

// Node E depends on B and C and some other resolved dependencies.
// NodeD should be unassigned before Node B and C are removed.
require.Equal(t, unassigned, nodeD.assignedWorkerID())
nodeB.Remove()
nodeC.Remove()
// NodeD should be assigned to random worker after Node B and C are removed.
require.Equal(t, workerID(100), nodeD.assignedWorkerID())

// Node E depends on B and C and some other resolved dependencies.
nodeE := NewNode()
nodeE.RandWorkerID = func() workerID { return workerID(100) }
nodeE.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 999)
Expand Down
2 changes: 1 addition & 1 deletion pkg/causality/internal/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSlotsConcurrentOps(t *testing.T) {
freeNodeChan <- newNode()
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

// test concurrent add and remove won't panic
Expand Down

0 comments on commit 47e3ea8

Please sign in to comment.