Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: limit the number which download from supernode for one task
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Aug 19, 2019
1 parent 7d6bdc2 commit 7f73758
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 10 deletions.
2 changes: 1 addition & 1 deletion supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type BaseProperties struct {
// PeerDownLimit is the download limit of a peer. When a peer starts to download a file/image,
// it will download file/image in the form of pieces. PeerDownLimit mean that a peer can only
// stand starting PeerDownLimit concurrent downloading tasks.
// default: 4
// default: 5
PeerDownLimit int `yaml:"peerDownLimit"`

// When dfget node starts to play a role of peer, it will provide services for other peers
Expand Down
15 changes: 15 additions & 0 deletions supernode/daemon/mgr/mock/mock_progress_mgr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions supernode/daemon/mgr/progress/progress_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// DeleteTaskID deletes the super progress with specified taskID.
func (pm *Manager) DeleteTaskID(ctx context.Context, taskID string) (err error) {
pm.superLoad.remove(taskID)
return pm.superProgress.remove(taskID)
}

Expand Down
23 changes: 15 additions & 8 deletions supernode/daemon/mgr/progress/progress_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package progress
import (
"context"
"fmt"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
Expand Down Expand Up @@ -51,39 +50,47 @@ var _ mgr.ProgressMgr = &Manager{}
// Manager is an implementation of the interface of ProgressMgr.
type Manager struct {
// superProgress maintains the super progress.
// key:taskID,value:*superState
// key:taskID string, value:superState *superState
superProgress *stateSyncMap

// clientProgress maintains the client progress.
// key:CID,value:*clientState
// key:CID string, value:clientState *clientState
clientProgress *stateSyncMap

// peerProgress maintains the peer progress.
// key:PeerID,value:*peerState
// key:PeerID string, value:peerState *peerState
peerProgress *stateSyncMap

// pieceProgress maintains the information about
// which peers the piece currently exists on
// key:pieceNum@taskID,value:*pieceState
// key:pieceNum@taskID string, value:pieceState *pieceState
pieceProgress *stateSyncMap

// clientBlackInfo maintains the blacklist of the PID.
// key:srcPID,value:map[dstPID]*Atomic
// key:srcPID string, value:dstPIDMap map[dstPID]*Atomic
clientBlackInfo *syncmap.SyncMap

// superLoad maintains the load num downloaded from the supernode for each task.
// key:taskID string, value:superLoadState *superLoadState
superLoad *stateSyncMap

cfg *config.Config
}

// NewManager returns a new Manager.
func NewManager(cfg *config.Config) (*Manager, error) {
return &Manager{
manager := &Manager{
cfg: cfg,
superProgress: newStateSyncMap(),
clientProgress: newStateSyncMap(),
peerProgress: newStateSyncMap(),
pieceProgress: newStateSyncMap(),
clientBlackInfo: syncmap.NewSyncMap(),
}, nil
superLoad: newStateSyncMap(),
}

manager.startRenewSuperLoad()
return manager, nil
}

// InitProgress inits the correlation information between peers and pieces, etc.
Expand Down
17 changes: 17 additions & 0 deletions supernode/daemon/mgr/progress/progress_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package progress

import (
"time"

"github.com/dragonflyoss/Dragonfly/pkg/atomiccount"
"github.com/dragonflyoss/Dragonfly/pkg/syncmap"

Expand Down Expand Up @@ -61,6 +63,14 @@ type peerState struct {
serviceDownTime int64
}

type superLoadState struct {
// superLoad maintains the load num downloaded from the supernode for each task.
loadValue *atomiccount.AtomicInt

// loadModTime will record the time when the load be modified.
loadModTime time.Time
}

func newSuperState() *superState {
return &superState{
pieceBitSet: &bitset.BitSet{},
Expand All @@ -81,3 +91,10 @@ func newPeerState() *peerState {
serviceErrorCount: atomiccount.NewAtomicInt(0),
}
}

func newSuperLoadState() *superLoadState {
return &superLoadState{
loadValue: atomiccount.NewAtomicInt(0),
loadModTime: time.Now(),
}
}
16 changes: 15 additions & 1 deletion supernode/daemon/mgr/progress/state_sync_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pkg/errors"
)

// stateSyncMap is a thread-safe map.
// stateSyncMap is a thread-safe map for progress state.
type stateSyncMap struct {
*syncmap.SyncMap
}
Expand Down Expand Up @@ -102,6 +102,20 @@ func (mmap *stateSyncMap) getAsPieceState(key string) (*pieceState, error) {
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
}

// getAsSuperLoadState returns result as *superLoadState.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *stateSyncMap) getAsSuperLoadState(key string) (*superLoadState, error) {
v, err := mmap.get(key)
if err != nil {
return nil, errors.Wrapf(err, "key: %s", key)
}

if value, ok := v.(*superLoadState); ok {
return value, nil
}
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
}

// remove deletes the key-value pair from the mmap.
// The ErrEmptyValue error will be returned if the key is empty.
// And the ErrDataNotFound error will be returned if the key cannot be found.
Expand Down
75 changes: 75 additions & 0 deletions supernode/daemon/mgr/progress/superload_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package progress

import (
"context"
"time"

"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
)

const (
// renewInterval is the interval time to check the superload.
renewInterval = 2 * time.Second

// renewDelayTime if the superload has not been changed after renewDelayTime, it should be renewed.
renewDelayTime = 30 * time.Second
)

// UpdateSuperLoad returns whether the superLoad will exceed the limit after add the delta.
// If not, the delta will be updated.
func (pm *Manager) UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error) {
loadState, err := pm.superLoad.getAsSuperLoadState(taskID)
if err != nil {
if !errortypes.IsDataNotFound(err) {
return false, err
}

// init the value for taskID when not found.
pm.superLoad.add(taskID, newSuperLoadState())
}

if loadState.loadValue.Add(delta) > limit && limit > 0 {
loadState.loadValue.Add(-delta)
return false, nil
}
loadState.loadModTime = time.Now()

return true, nil
}

func (pm *Manager) startRenewSuperLoad() {
ticker := time.NewTicker(renewInterval)

for range ticker.C {
pm.renewSuperLoad()
}
}

func (pm *Manager) renewSuperLoad() {
rangeFunc := func(key, value interface{}) bool {
if v, ok := value.(*superLoadState); ok {
if time.Since(v.loadModTime) > renewDelayTime {
v.loadValue.Set(0)
}
}
return true
}

pm.superLoad.Range(rangeFunc)
}
5 changes: 5 additions & 0 deletions supernode/daemon/mgr/progress_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type ProgressMgr interface {
// GetBlackInfoByPeerID gets black info with specified peerID.
GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *syncmap.SyncMap, err error)

// UpdateSuperLoad update the superLoad with delta.
//
// The value will be rolled back if it exceeds the limit after updated and returns false.
UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error)

// DeleteTaskID deletes the super progress with specified taskID.
DeleteTaskID(ctx context.Context, taskID string) (err error)

Expand Down
12 changes: 12 additions & 0 deletions supernode/daemon/mgr/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
continue
}

// We limit the number of simultaneous connections that supernode can accept for each task.
if sm.cfg.IsSuperPID(dstPID) {
updated, err := sm.progressMgr.UpdateSuperLoad(ctx, taskID, 1, int32(sm.cfg.PeerDownLimit))
if err != nil {
logrus.Warnf("failed to update super load taskID(%s) clientID(%s): %v", taskID, clientID, err)
continue
}
if !updated {
continue
}
}

if err := sm.progressMgr.UpdateClientProgress(ctx, taskID, clientID, dstPID, pieceNums[i], config.PieceRUNNING); err != nil {
logrus.Warnf("failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID)
continue
Expand Down
9 changes: 9 additions & 0 deletions supernode/daemon/mgr/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str
pieceRange, taskID, pieceUpdateRequest.ClientID)
}

// when a peer success to download a piece from supernode,
// and the load of supernode for the taskID should be decremented by one.
if tm.cfg.IsSuperPID(pieceUpdateRequest.DstPID) {
_, err := tm.progressMgr.UpdateSuperLoad(ctx, taskID, -1, -1)
if err != nil {
logrus.Warnf("failed to update superLoad taskID(%s) clientID(%s): %v", taskID, pieceUpdateRequest.ClientID, err)
}
}

// get dfgetTask according to the CID
srcDfgetTask, err := tm.dfgetTaskMgr.Get(ctx, pieceUpdateRequest.ClientID, taskID)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions supernode/daemon/mgr/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas
tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peer.IP.String()).Observe(timeutils.SinceInMilliseconds(startTime))
logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID)

if len(pieceResult) == 0 {
return false, nil, errortypes.ErrPeerWait
}
var pieceInfos []*types.PieceInfo
for _, v := range pieceResult {
logrus.Debugf("get scheduler result item: %+v with taskID(%s) and clientID(%s)", v, task.ID, clientID)
Expand Down

0 comments on commit 7f73758

Please sign in to comment.