From b6ab2527e8763e3267bf4565f1959203ae14ed75 Mon Sep 17 00:00:00 2001 From: Starnop Date: Thu, 15 Aug 2019 12:21:48 +0800 Subject: [PATCH] feature: limit the number which download from supernode for one task Signed-off-by: Starnop --- supernode/config/config.go | 2 +- .../daemon/mgr/mock/mock_progress_mgr.go | 15 ++++ .../daemon/mgr/progress/progress_delete.go | 1 + .../daemon/mgr/progress/progress_manager.go | 23 ++++-- .../daemon/mgr/progress/progress_state.go | 17 +++++ .../daemon/mgr/progress/state_sync_map.go | 16 +++- .../daemon/mgr/progress/superload_manager.go | 75 +++++++++++++++++++ supernode/daemon/mgr/progress_mgr.go | 5 ++ supernode/daemon/mgr/scheduler/manager.go | 12 +++ supernode/daemon/mgr/task/manager.go | 9 +++ supernode/daemon/mgr/task/manager_util.go | 3 + 11 files changed, 168 insertions(+), 10 deletions(-) create mode 100644 supernode/daemon/mgr/progress/superload_manager.go diff --git a/supernode/config/config.go b/supernode/config/config.go index 6c3e3f73b..d4a6ff331 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -144,7 +144,7 @@ type BaseProperties struct { // PeerDownLimit is the download limit of a peer. When a peer starts to download a file/image, // it will download file/image in the form of pieces. PeerDownLimit mean that a peer can only // stand starting PeerDownLimit concurrent downloading tasks. - // default: 4 + // default: 5 PeerDownLimit int `yaml:"peerDownLimit"` // When dfget node starts to play a role of peer, it will provide services for other peers diff --git a/supernode/daemon/mgr/mock/mock_progress_mgr.go b/supernode/daemon/mgr/mock/mock_progress_mgr.go index 523de2261..7f69fabcb 100644 --- a/supernode/daemon/mgr/mock/mock_progress_mgr.go +++ b/supernode/daemon/mgr/mock/mock_progress_mgr.go @@ -183,6 +183,21 @@ func (mr *MockProgressMgrMockRecorder) GetBlackInfoByPeerID(ctx, peerID interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlackInfoByPeerID", reflect.TypeOf((*MockProgressMgr)(nil).GetBlackInfoByPeerID), ctx, peerID) } +// UpdateSuperLoad mocks base method +func (m *MockProgressMgr) UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateSuperLoad", ctx, taskID, delta, limit) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateSuperLoad indicates an expected call of UpdateSuperLoad +func (mr *MockProgressMgrMockRecorder) UpdateSuperLoad(ctx, taskID, delta, limit interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSuperLoad", reflect.TypeOf((*MockProgressMgr)(nil).UpdateSuperLoad), ctx, taskID, delta, limit) +} + // DeleteTaskID mocks base method func (m *MockProgressMgr) DeleteTaskID(ctx context.Context, taskID string) error { m.ctrl.T.Helper() diff --git a/supernode/daemon/mgr/progress/progress_delete.go b/supernode/daemon/mgr/progress/progress_delete.go index 44315cec5..1297eb507 100644 --- a/supernode/daemon/mgr/progress/progress_delete.go +++ b/supernode/daemon/mgr/progress/progress_delete.go @@ -8,6 +8,7 @@ import ( // DeleteTaskID deletes the super progress with specified taskID. func (pm *Manager) DeleteTaskID(ctx context.Context, taskID string) (err error) { + pm.superLoad.remove(taskID) return pm.superProgress.remove(taskID) } diff --git a/supernode/daemon/mgr/progress/progress_manager.go b/supernode/daemon/mgr/progress/progress_manager.go index c7cce2e88..8e7cbcdf6 100644 --- a/supernode/daemon/mgr/progress/progress_manager.go +++ b/supernode/daemon/mgr/progress/progress_manager.go @@ -19,7 +19,6 @@ package progress import ( "context" "fmt" - "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" @@ -51,39 +50,47 @@ var _ mgr.ProgressMgr = &Manager{} // Manager is an implementation of the interface of ProgressMgr. type Manager struct { // superProgress maintains the super progress. - // key:taskID,value:*superState + // key:taskID string, value:superState *superState superProgress *stateSyncMap // clientProgress maintains the client progress. - // key:CID,value:*clientState + // key:CID string, value:clientState *clientState clientProgress *stateSyncMap // peerProgress maintains the peer progress. - // key:PeerID,value:*peerState + // key:PeerID string, value:peerState *peerState peerProgress *stateSyncMap // pieceProgress maintains the information about // which peers the piece currently exists on - // key:pieceNum@taskID,value:*pieceState + // key:pieceNum@taskID string, value:pieceState *pieceState pieceProgress *stateSyncMap // clientBlackInfo maintains the blacklist of the PID. - // key:srcPID,value:map[dstPID]*Atomic + // key:srcPID string, value:dstPIDMap map[dstPID]*Atomic clientBlackInfo *syncmap.SyncMap + // superLoad maintains the load num downloaded from the supernode for each task. + // key:taskID string, value:superLoadState *superLoadState + superLoad *stateSyncMap + cfg *config.Config } // NewManager returns a new Manager. func NewManager(cfg *config.Config) (*Manager, error) { - return &Manager{ + manager := &Manager{ cfg: cfg, superProgress: newStateSyncMap(), clientProgress: newStateSyncMap(), peerProgress: newStateSyncMap(), pieceProgress: newStateSyncMap(), clientBlackInfo: syncmap.NewSyncMap(), - }, nil + superLoad: newStateSyncMap(), + } + + manager.startRenewSuperLoad() + return manager, nil } // InitProgress inits the correlation information between peers and pieces, etc. diff --git a/supernode/daemon/mgr/progress/progress_state.go b/supernode/daemon/mgr/progress/progress_state.go index 4d3540ff1..2dd5f8be0 100644 --- a/supernode/daemon/mgr/progress/progress_state.go +++ b/supernode/daemon/mgr/progress/progress_state.go @@ -17,6 +17,8 @@ package progress import ( + "time" + "github.com/dragonflyoss/Dragonfly/pkg/atomiccount" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" @@ -61,6 +63,14 @@ type peerState struct { serviceDownTime int64 } +type superLoadState struct { + // superLoad maintains the load num downloaded from the supernode for each task. + loadValue *atomiccount.AtomicInt + + // loadModTime will record the time when the load be modified. + loadModTime time.Time +} + func newSuperState() *superState { return &superState{ pieceBitSet: &bitset.BitSet{}, @@ -81,3 +91,10 @@ func newPeerState() *peerState { serviceErrorCount: atomiccount.NewAtomicInt(0), } } + +func newSuperLoadState() *superLoadState { + return &superLoadState{ + loadValue: atomiccount.NewAtomicInt(0), + loadModTime: time.Now(), + } +} diff --git a/supernode/daemon/mgr/progress/state_sync_map.go b/supernode/daemon/mgr/progress/state_sync_map.go index 80ef7a8f5..cde3a47fe 100644 --- a/supernode/daemon/mgr/progress/state_sync_map.go +++ b/supernode/daemon/mgr/progress/state_sync_map.go @@ -23,7 +23,7 @@ import ( "github.com/pkg/errors" ) -// stateSyncMap is a thread-safe map. +// stateSyncMap is a thread-safe map for progress state. type stateSyncMap struct { *syncmap.SyncMap } @@ -102,6 +102,20 @@ func (mmap *stateSyncMap) getAsPieceState(key string) (*pieceState, error) { return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) } +// getAsSuperLoadState returns result as *superLoadState. +// The ErrConvertFailed error will be returned if the assertion fails. +func (mmap *stateSyncMap) getAsSuperLoadState(key string) (*superLoadState, error) { + v, err := mmap.get(key) + if err != nil { + return nil, errors.Wrapf(err, "key: %s", key) + } + + if value, ok := v.(*superLoadState); ok { + return value, nil + } + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) +} + // remove deletes the key-value pair from the mmap. // The ErrEmptyValue error will be returned if the key is empty. // And the ErrDataNotFound error will be returned if the key cannot be found. diff --git a/supernode/daemon/mgr/progress/superload_manager.go b/supernode/daemon/mgr/progress/superload_manager.go new file mode 100644 index 000000000..40f9dca15 --- /dev/null +++ b/supernode/daemon/mgr/progress/superload_manager.go @@ -0,0 +1,75 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package progress + +import ( + "context" + "time" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" +) + +const ( + // renewInterval is the interval time to check the superload. + renewInterval = 2 * time.Second + + // renewDelayTime if the superload has not been changed after renewDelayTime, it should be renewed. + renewDelayTime = 30 * time.Second +) + +// UpdateSuperLoad returns whether the superLoad will exceed the limit after add the delta. +// If not, the delta will be updated. +func (pm *Manager) UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error) { + loadState, err := pm.superLoad.getAsSuperLoadState(taskID) + if err != nil { + if !errortypes.IsDataNotFound(err) { + return false, err + } + + // init the value for taskID when not found. + pm.superLoad.add(taskID, newSuperLoadState()) + } + + if loadState.loadValue.Add(delta) > limit && limit > 0 { + loadState.loadValue.Add(-delta) + return false, nil + } + loadState.loadModTime = time.Now() + + return true, nil +} + +func (pm *Manager) startRenewSuperLoad() { + ticker := time.NewTicker(renewInterval) + + for range ticker.C { + pm.renewSuperLoad() + } +} + +func (pm *Manager) renewSuperLoad() { + rangeFunc := func(key, value interface{}) bool { + if v, ok := value.(*superLoadState); ok { + if time.Since(v.loadModTime) > renewDelayTime { + v.loadValue.Set(0) + } + } + return true + } + + pm.superLoad.Range(rangeFunc) +} diff --git a/supernode/daemon/mgr/progress_mgr.go b/supernode/daemon/mgr/progress_mgr.go index e9f93382c..f2d3d489f 100644 --- a/supernode/daemon/mgr/progress_mgr.go +++ b/supernode/daemon/mgr/progress_mgr.go @@ -82,6 +82,11 @@ type ProgressMgr interface { // GetBlackInfoByPeerID gets black info with specified peerID. GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *syncmap.SyncMap, err error) + // UpdateSuperLoad update the superLoad with delta. + // + // The value will be rolled back if it exceeds the limit after updated and returns false. + UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error) + // DeleteTaskID deletes the super progress with specified taskID. DeleteTaskID(ctx context.Context, taskID string) (err error) diff --git a/supernode/daemon/mgr/scheduler/manager.go b/supernode/daemon/mgr/scheduler/manager.go index 3c152a0b9..b8f1647a8 100644 --- a/supernode/daemon/mgr/scheduler/manager.go +++ b/supernode/daemon/mgr/scheduler/manager.go @@ -171,6 +171,18 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID continue } + // We limit the number of simultaneous connections that supernode can accept for each task. + if sm.cfg.IsSuperPID(dstPID) { + updated, err := sm.progressMgr.UpdateSuperLoad(ctx, taskID, 1, int32(sm.cfg.PeerDownLimit)) + if err != nil { + logrus.Warnf("failed to update super load taskID(%s) clientID(%s): %v", taskID, clientID, err) + continue + } + if !updated { + continue + } + } + if err := sm.progressMgr.UpdateClientProgress(ctx, taskID, clientID, dstPID, pieceNums[i], config.PieceRUNNING); err != nil { logrus.Warnf("failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID) continue diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 859ca9646..9866fd087 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -272,6 +272,15 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str pieceRange, taskID, pieceUpdateRequest.ClientID) } + // when a peer success to download a piece from supernode, + // and the load of supernode for the taskID should be decremented by one. + if tm.cfg.IsSuperPID(pieceUpdateRequest.DstPID) { + _, err := tm.progressMgr.UpdateSuperLoad(ctx, taskID, -1, -1) + if err != nil { + logrus.Warnf("failed to update superLoad taskID(%s) clientID(%s): %v", taskID, pieceUpdateRequest.ClientID, err) + } + } + // get dfgetTask according to the CID srcDfgetTask, err := tm.dfgetTaskMgr.Get(ctx, pieceUpdateRequest.ClientID, taskID) if err != nil { diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index cfb000bea..b509fc583 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -345,6 +345,9 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peer.IP.String()).Observe(timeutils.SinceInMilliseconds(startTime)) logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) + if len(pieceResult) == 0 { + return false, nil, errortypes.ErrPeerWait + } var pieceInfos []*types.PieceInfo for _, v := range pieceResult { logrus.Debugf("get scheduler result item: %+v with taskID(%s) and clientID(%s)", v, task.ID, clientID)