Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: limit the number which download from supernode for one task
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Aug 15, 2019
1 parent 4439903 commit b3b0e6e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 6 deletions.
2 changes: 1 addition & 1 deletion supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type BaseProperties struct {
// PeerDownLimit is the download limit of a peer. When a peer starts to download a file/image,
// it will download file/image in the form of pieces. PeerDownLimit mean that a peer can only
// stand starting PeerDownLimit concurrent downloading tasks.
// default: 4
// default: 5
PeerDownLimit int `yaml:"peerDownLimit"`

// When dfget node starts to play a role of peer, it will provide services for other peers
Expand Down
32 changes: 27 additions & 5 deletions supernode/daemon/mgr/progress/progress_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/atomiccount"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/syncmap"
Expand Down Expand Up @@ -49,26 +50,30 @@ var _ mgr.ProgressMgr = &Manager{}
// Manager is an implementation of the interface of ProgressMgr.
type Manager struct {
// superProgress maintains the super progress.
// key:taskID,value:*superState
// key:taskID string, value:superState *superState
superProgress *stateSyncMap

// clientProgress maintains the client progress.
// key:CID,value:*clientState
// key:CID string, value:clientState *clientState
clientProgress *stateSyncMap

// peerProgress maintains the peer progress.
// key:PeerID,value:*peerState
// key:PeerID string, value:peerState *peerState
peerProgress *stateSyncMap

// pieceProgress maintains the information about
// which peers the piece currently exists on
// key:pieceNum@taskID,value:*pieceState
// key:pieceNum@taskID string, value:pieceState *pieceState
pieceProgress *stateSyncMap

// clientBlackInfo maintains the blacklist of the PID.
// key:srcPID,value:map[dstPID]*Atomic
// key:srcPID string, value:dstPIDMap map[dstPID]*Atomic
clientBlackInfo *syncmap.SyncMap

// superLoad maintains the load num downloaded from the supernode for each task.
// key:taskID string, value:load *AtomicInt
superLoad *syncmap.SyncMap

cfg *config.Config
}

Expand All @@ -81,6 +86,7 @@ func NewManager(cfg *config.Config) (*Manager, error) {
peerProgress: newStateSyncMap(),
pieceProgress: newStateSyncMap(),
clientBlackInfo: syncmap.NewSyncMap(),
superLoad: syncmap.NewSyncMap(),
}, nil
}

Expand Down Expand Up @@ -298,6 +304,22 @@ func (pm *Manager) GetBlackInfoByPeerID(ctx context.Context, peerID string) (dst
return pm.clientBlackInfo.GetAsMap(peerID)
}

// GetSuperLoad returns the superLoad with specified taskID.
func (pm *Manager) GetSuperLoad(ctx context.Context, taskID string) (*atomiccount.AtomicInt, error) {
load, err := pm.superLoad.GetAsAtomicInt(taskID)
if err != nil {
if !errortypes.IsDataNotFound(err) {
return nil, err
}

// init the value for taskID when not found.
load = atomiccount.NewAtomicInt(0)
pm.superLoad.Add(taskID, load)
}

return load, nil
}

// getSuccessfulPieces gets pieces that the piece has been downloaded successful.
func getSuccessfulPieces(clientBitset, cdnBitset *bitset.BitSet) ([]int, error) {
successPieces := make([]int, 0)
Expand Down
3 changes: 3 additions & 0 deletions supernode/daemon/mgr/progress_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,7 @@ type ProgressMgr interface {

// GetBlackInfoByPeerID gets black info with specified peerID.
GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *syncmap.SyncMap, err error)

// GetSuperLoad returns the superLoad with specified taskID.
GetSuperLoad(ctx context.Context, taskID string) (superLoad *atomiccount.AtomicInt, err error)
}
13 changes: 13 additions & 0 deletions supernode/daemon/mgr/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
continue
}

// We limit the number of simultaneous connections that supernode can accept for each task.
if sm.cfg.IsSuperPID(dstPID) {
superLoad, err := sm.progressMgr.GetSuperLoad(ctx, taskID)
if err != nil {
logrus.Warnf("failed to get super load taskID(%s) clientID(%s): %v", taskID, clientID, superLoad)
}
if superLoad.Get() >= int32(sm.cfg.PeerDownLimit) {
continue
}

superLoad.Add(1)
}

if err := sm.progressMgr.UpdateClientProgress(ctx, taskID, clientID, dstPID, pieceNums[i], config.PieceRUNNING); err != nil {
logrus.Warnf("failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID)
continue
Expand Down
10 changes: 10 additions & 0 deletions supernode/daemon/mgr/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str
pieceRange, taskID, pieceUpdateRequest.ClientID)
}

// when a peer success to download a piece from supernode,
// and the load of supernode for the taskID should be decremented by one.
if tm.cfg.IsSuperPID(pieceUpdateRequest.DstPID) {
superLoad, err := tm.progressMgr.GetSuperLoad(ctx, taskID)
if err != nil {
logrus.Warnf("failed to get superLoad taskID(%s) clientID(%s): %v", taskID, pieceUpdateRequest.ClientID, err)
}
superLoad.Add(-1)
}

// get dfgetTask according to the CID
srcDfgetTask, err := tm.dfgetTaskMgr.Get(ctx, pieceUpdateRequest.ClientID, taskID)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions supernode/daemon/mgr/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas
}
logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID)

if len(pieceResult) == 0 {
return false, nil, errortypes.ErrPeerWait
}
var pieceInfos []*types.PieceInfo
for _, v := range pieceResult {
logrus.Debugf("get scheduler result item: %+v with taskID(%s) and clientID(%s)", v, task.ID, clientID)
Expand Down

0 comments on commit b3b0e6e

Please sign in to comment.