Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
refactor: update some code and output logs for supernode
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Aug 30, 2019
1 parent ba585e2 commit 655c205
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions supernode/daemon/mgr/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@ func (sm *Manager) Schedule(ctx context.Context, taskID, clientID, peerID string
return nil, err
}
if len(pieceAvailable) == 0 {
return nil, errors.Wrapf(errortypes.ErrPeerWait, "taskID: %s", taskID)
return nil, errors.Wrapf(errortypes.ErrPeerWait, "taskID(%s) clientID(%s)", taskID, clientID)
}
logrus.Debugf("scheduler get available pieces %v for taskID(%s)", pieceAvailable, taskID)
logrus.Debugf("scheduler get available pieces %v for taskID(%s) clientID(%s)", pieceAvailable, taskID, clientID)

// get running pieces
pieceRunning, err := sm.progressMgr.GetPieceProgressByCID(ctx, taskID, clientID, "running")
if err != nil {
return nil, err
}
logrus.Debugf("scheduler get running pieces %v for taskID(%s)", pieceRunning, taskID)
logrus.Debugf("scheduler get running pieces %v for taskID(%s) clientID(%s)", pieceRunning, taskID, clientID)
runningCount := len(pieceRunning)
if runningCount >= config.PeerDownLimit {
if runningCount >= sm.cfg.PeerDownLimit {
return nil, errors.Wrapf(errortypes.PeerContinue, "taskID: %s,clientID: %s", taskID, clientID)
}

Expand All @@ -79,7 +79,7 @@ func (sm *Manager) Schedule(ctx context.Context, taskID, clientID, peerID string
if err != nil {
return nil, err
}
logrus.Debugf("scheduler get pieces %v with prioritize for taskID(%s)", pieceNums, taskID)
logrus.Debugf("scheduler get pieces %v with prioritize for taskID(%s) clientID(%s)", pieceNums, taskID, clientID)

return sm.getPieceResults(ctx, taskID, clientID, peerID, pieceNums, runningCount)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
return nil, err
}
if srcPeerState.ClientErrorCount.Get() > config.FailCountLimit {
logrus.Warnf("peerID: %s got errors for %d times which reaches error limit: %d for taskID(%s)",
logrus.Warnf("scheduler: peerID: %s got errors for %d times which reaches error limit: %d for taskID(%s)",
peerID, srcPeerState.ClientErrorCount.Get(), config.FailCountLimit, taskID)
useSupernode = true
}
Expand All @@ -161,6 +161,7 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
} else {
// get peerIDs by pieceNum
peerIDs, err := sm.progressMgr.GetPeerIDsByPieceNum(ctx, taskID, pieceNums[i])
logrus.Debugf("scheduler: success to get peerIDs(%v) pieceNum(%d) taskID(%s), clientID(%s)", peerIDs, pieceNums[i], taskID, clientID)
if err != nil {
return nil, errors.Wrapf(errortypes.ErrUnknowError, "failed to get peerIDs for pieceNum: %d of taskID: %s", pieceNums[i], taskID)
}
Expand All @@ -172,7 +173,7 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
}

if err := sm.progressMgr.UpdateClientProgress(ctx, taskID, clientID, dstPID, pieceNums[i], config.PieceRUNNING); err != nil {
logrus.Warnf("failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID)
logrus.Warnf("scheduler: failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID)
continue
}

Expand All @@ -183,7 +184,7 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
})

runningCount++
if runningCount >= config.PeerDownLimit {
if runningCount >= sm.cfg.PeerDownLimit {
break
}
}
Expand All @@ -203,34 +204,40 @@ func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, p
// if failed to get peerState, and then it should not be needed.
peerState, err := sm.progressMgr.GetPeerStateByPeerID(ctx, peerIDs[i])
if err != nil {
logrus.Warnf("scheduler: failed to GetPeerStateByPeerID taskID(%s) peerID(%s): %v", taskID, peerIDs[i], err)
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
}

// if the service has been down, and then it should not be needed.
if peerState.ServiceDownTime > 0 {
logrus.Warnf("scheduler: the peer(%s) has been offline and will delete it from piece state", peerIDs[i])
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
}

// if service has failed for EliminationLimit times, and then it should not be needed.
if peerState.ServiceErrorCount != nil && peerState.ServiceErrorCount.Get() >= config.EliminationLimit {
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
if peerState.ServiceErrorCount != nil {
serviceErrorCount := peerState.ServiceErrorCount.Get()
if int(serviceErrorCount) >= sm.cfg.EliminationLimit {
logrus.Warnf("scheduler: the peer(%s) has been eliminated to because of too many errors(%d) occurred as a peer server", peerIDs[i], serviceErrorCount)
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
}
}

// if the v is in the blackList, try the next one.
blackInfo, err := sm.progressMgr.GetBlackInfoByPeerID(ctx, peerIDs[i])
if err != nil && !errortypes.IsDataNotFound(err) {
logrus.Errorf("failed to get blackInfo for peerID %s: %v", peerIDs[i], err)
logrus.Warnf("scheduler: failed to get blackInfo for peerID %s: %v", peerIDs[i], err)
continue
}
if blackInfo != nil && isExistInMap(blackInfo, peerIDs[i]) {
continue
}

if peerState.ProducerLoad != nil {
if peerState.ProducerLoad.Add(1) <= config.PeerUpLimit {
if peerState.ProducerLoad.Add(1) <= int32(sm.cfg.PeerUpLimit) {
return peerIDs[i]
}
peerState.ProducerLoad.Add(-1)
Expand All @@ -241,7 +248,7 @@ func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, p

func (sm *Manager) deletePeerIDByPieceNum(ctx context.Context, taskID string, pieceNum int, peerID string) {
if err := sm.progressMgr.DeletePeerIDByPieceNum(ctx, taskID, pieceNum, peerID); err != nil {
logrus.Warnf("failed to delete the peerID %s for pieceNum %d of taskID: %s", peerID, pieceNum, taskID)
logrus.Warnf("scheduler: failed to delete the peerID %s for pieceNum %d of taskID: %s", peerID, pieceNum, taskID)
}
}

Expand Down

0 comments on commit 655c205

Please sign in to comment.