diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 4bb3476d6aa..9544be0a778 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -308,20 +308,35 @@ func (p *Peer) ReplaceParent(parent *Peer) { p.StoreParent(parent) } -// TreeTotalNodeCount represents tree's total node count -func (p *Peer) TreeTotalNodeCount() int { - count := 1 - p.Children.Range(func(_, value interface{}) bool { - node, ok := value.(*Peer) +// Depth represents depth of tree +func (p *Peer) Depth() int { + p.mu.RLock() + defer p.mu.RUnlock() + + node := p + var depth int + for node != nil { + depth++ + + if node.Host.IsCDN { + break + } + + parent, ok := node.LoadParent() if !ok { - return true + break + } + + // Prevent traversal tree from infinite loop + if p == parent { + p.Log.Info("tree structure produces an infinite loop") + break } - count += node.TreeTotalNodeCount() - return true - }) + node = parent + } - return count + return depth } // IsDescendant determines whether it is ancestor of peer diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index c4ca16fd2a2..263448736e6 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -467,30 +467,43 @@ func TestPeer_ReplaceParent(t *testing.T) { } } -func TestPeer_TreeTotalNodeCount(t *testing.T) { +func TestPeer_Depth(t *testing.T) { tests := []struct { - name string - childID string - expect func(t *testing.T, peer *Peer, mockChildPeer *Peer) + name string + expect func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) }{ { - name: "get tree total node count", - childID: idgen.PeerID("127.0.0.1"), - expect: func(t *testing.T, peer *Peer, mockChildPeer *Peer) { + name: "there is only one node in the tree", + expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) { assert := assert.New(t) - peer.StoreChild(mockChildPeer) - assert.Equal(peer.TreeTotalNodeCount(), 2) - mockChildPeer.ID = idgen.PeerID("0.0.0.0") - peer.StoreChild(mockChildPeer) - assert.Equal(peer.TreeTotalNodeCount(), 3) + assert.Equal(peer.Depth(), 1) }, }, { - name: "tree is empty", - childID: idgen.PeerID("127.0.0.1"), - expect: func(t *testing.T, peer *Peer, mockChildPeer *Peer) { + name: "more than one node in the tree", + expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) { + peer.StoreParent(parent) + assert := assert.New(t) - assert.Equal(peer.TreeTotalNodeCount(), 1) + assert.Equal(peer.Depth(), 2) + }, + }, + { + name: "node parent is cdn", + expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) { + peer.StoreParent(cdnParent) + + assert := assert.New(t) + assert.Equal(peer.Depth(), 2) + }, + }, + { + name: "node parent is itself", + expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) { + peer.StoreParent(peer) + + assert := assert.New(t) + assert.Equal(peer.Depth(), 1) }, }, } @@ -498,11 +511,13 @@ func TestPeer_TreeTotalNodeCount(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { mockHost := NewHost(mockRawHost) + mockCDNHost := NewHost(mockRawCDNHost, WithIsCDN(true)) mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) - mockChildPeer := NewPeer(tc.childID, mockTask, mockHost) peer := NewPeer(mockPeerID, mockTask, mockHost) + parent := NewPeer(idgen.PeerID("127.0.0.2"), mockTask, mockHost) + cdnParent := NewPeer(mockCDNPeerID, mockTask, mockCDNHost) - tc.expect(t, peer, mockChildPeer) + tc.expect(t, peer, parent, cdnParent) }) } } diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index d28b679172b..91f8deca927 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -20,6 +20,7 @@ package scheduler import ( "context" + "fmt" "sort" "time" @@ -37,6 +38,9 @@ const ( // Default number of available parents after filtering defaultFilterParentCount = 5 + + // Default tree depth limit + defaultDepthLimit = 10 ) type Scheduler interface { @@ -250,6 +254,15 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) [] return true } + if parent.Depth() > defaultDepthLimit { + peer.Log.Infof("exceeds the %d depth limit of the tree", defaultDepthLimit) + return true + } + + fmt.Println("111111111111") + fmt.Println(parent.Depth()) + fmt.Println("111111111111") + if parent.IsDescendant(peer) { peer.Log.Infof("parent %s is not selected because it is descendant", parent.ID) return true diff --git a/scheduler/scheduler/scheduler_test.go b/scheduler/scheduler/scheduler_test.go index f9b6f847324..e6c7fb50f3b 100644 --- a/scheduler/scheduler/scheduler_test.go +++ b/scheduler/scheduler/scheduler_test.go @@ -829,6 +829,23 @@ func TestScheduler_FindParent(t *testing.T) { assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID}, parent.ID) }, }, + { + name: "exceeds the depth limit of the tree", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet, md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateRunning) + mockPeers[0].StoreParent(mockPeers[1]) + mockPeers[1].StoreParent(mockPeers[2]) + mockPeers[2].StoreParent(mockPeers[3]) + peer.Task.StorePeer(mockPeers[0]) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1) + }, + expect: func(t *testing.T, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, } for _, tc := range tests { @@ -842,6 +859,8 @@ func TestScheduler_FindParent(t *testing.T) { mockPeers := []*resource.Peer{ resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost), resource.NewPeer(idgen.PeerID("127.0.0.2"), mockTask, mockHost), + resource.NewPeer(idgen.PeerID("127.0.0.3"), mockTask, mockHost), + resource.NewPeer(idgen.PeerID("127.0.0.4"), mockTask, mockHost), } blocklist := set.NewSafeSet()