Skip to content

Commit

Permalink
feat: limit tree depth
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Feb 28, 2022
1 parent ded3174 commit 7a79317
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 28 deletions.
35 changes: 25 additions & 10 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,35 @@ func (p *Peer) ReplaceParent(parent *Peer) {
p.StoreParent(parent)
}

// TreeTotalNodeCount represents tree's total node count
func (p *Peer) TreeTotalNodeCount() int {
count := 1
p.Children.Range(func(_, value interface{}) bool {
node, ok := value.(*Peer)
// Depth represents depth of tree
func (p *Peer) Depth() int {
p.mu.RLock()
defer p.mu.RUnlock()

node := p
var depth int
for node != nil {
depth++

if node.Host.IsCDN {
break
}

parent, ok := node.LoadParent()
if !ok {
return true
break
}

// Prevent traversal tree from infinite loop
if p == parent {
p.Log.Info("tree structure produces an infinite loop")
break
}

count += node.TreeTotalNodeCount()
return true
})
node = parent
}

return count
return depth
}

// IsDescendant determines whether it is ancestor of peer
Expand Down
51 changes: 33 additions & 18 deletions scheduler/resource/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,42 +467,57 @@ func TestPeer_ReplaceParent(t *testing.T) {
}
}

func TestPeer_TreeTotalNodeCount(t *testing.T) {
func TestPeer_Depth(t *testing.T) {
tests := []struct {
name string
childID string
expect func(t *testing.T, peer *Peer, mockChildPeer *Peer)
name string
expect func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer)
}{
{
name: "get tree total node count",
childID: idgen.PeerID("127.0.0.1"),
expect: func(t *testing.T, peer *Peer, mockChildPeer *Peer) {
name: "there is only one node in the tree",
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
assert := assert.New(t)
peer.StoreChild(mockChildPeer)
assert.Equal(peer.TreeTotalNodeCount(), 2)
mockChildPeer.ID = idgen.PeerID("0.0.0.0")
peer.StoreChild(mockChildPeer)
assert.Equal(peer.TreeTotalNodeCount(), 3)
assert.Equal(peer.Depth(), 1)
},
},
{
name: "tree is empty",
childID: idgen.PeerID("127.0.0.1"),
expect: func(t *testing.T, peer *Peer, mockChildPeer *Peer) {
name: "more than one node in the tree",
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
peer.StoreParent(parent)

assert := assert.New(t)
assert.Equal(peer.TreeTotalNodeCount(), 1)
assert.Equal(peer.Depth(), 2)
},
},
{
name: "node parent is cdn",
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
peer.StoreParent(cdnParent)

assert := assert.New(t)
assert.Equal(peer.Depth(), 2)
},
},
{
name: "node parent is itself",
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
peer.StoreParent(peer)

assert := assert.New(t)
assert.Equal(peer.Depth(), 1)
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockCDNHost := NewHost(mockRawCDNHost, WithIsCDN(true))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
mockChildPeer := NewPeer(tc.childID, mockTask, mockHost)
peer := NewPeer(mockPeerID, mockTask, mockHost)
parent := NewPeer(idgen.PeerID("127.0.0.2"), mockTask, mockHost)
cdnParent := NewPeer(mockCDNPeerID, mockTask, mockCDNHost)

tc.expect(t, peer, mockChildPeer)
tc.expect(t, peer, parent, cdnParent)
})
}
}
Expand Down
13 changes: 13 additions & 0 deletions scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package scheduler

import (
"context"
"fmt"
"sort"
"time"

Expand All @@ -37,6 +38,9 @@ const (

// Default number of available parents after filtering
defaultFilterParentCount = 5

// Default tree depth limit
defaultDepthLimit = 10
)

type Scheduler interface {
Expand Down Expand Up @@ -250,6 +254,15 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
return true
}

if parent.Depth() > defaultDepthLimit {
peer.Log.Infof("exceeds the %d depth limit of the tree", defaultDepthLimit)
return true
}

fmt.Println("111111111111")
fmt.Println(parent.Depth())
fmt.Println("111111111111")

if parent.IsDescendant(peer) {
peer.Log.Infof("parent %s is not selected because it is descendant", parent.ID)
return true
Expand Down
19 changes: 19 additions & 0 deletions scheduler/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,23 @@ func TestScheduler_FindParent(t *testing.T) {
assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID}, parent.ID)
},
},
{
name: "exceeds the depth limit of the tree",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
mockPeers[0].StoreParent(mockPeers[1])
mockPeers[1].StoreParent(mockPeers[2])
mockPeers[2].StoreParent(mockPeers[3])
peer.Task.StorePeer(mockPeers[0])

md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1)
},
expect: func(t *testing.T, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
assert := assert.New(t)
assert.False(ok)
},
},
}

for _, tc := range tests {
Expand All @@ -842,6 +859,8 @@ func TestScheduler_FindParent(t *testing.T) {
mockPeers := []*resource.Peer{
resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost),
resource.NewPeer(idgen.PeerID("127.0.0.2"), mockTask, mockHost),
resource.NewPeer(idgen.PeerID("127.0.0.3"), mockTask, mockHost),
resource.NewPeer(idgen.PeerID("127.0.0.4"), mockTask, mockHost),
}
blocklist := set.NewSafeSet()

Expand Down

0 comments on commit 7a79317

Please sign in to comment.