From 2804129003abcee8d3f68e8334e8c3d6f38202b8 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 10:45:25 +0800 Subject: [PATCH 1/7] chore: optimize sync pieces Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 2 +- .../peer/peertask_piecetask_synchronizer.go | 62 +++++++++++-------- client/daemon/rpcserver/rpcserver.go | 1 + client/daemon/rpcserver/subscriber.go | 37 +++++------ 4 files changed, 55 insertions(+), 47 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 89851420382..89e57944872 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -851,7 +851,7 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *base.PiecePacket) { } // update content length - if piecePacket.ContentLength > -1 { + if piecePacket.ContentLength > -1 && pt.GetContentLength() == -1 { metadataChanged = true pt.SetContentLength(piecePacket.ContentLength) pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength)) diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index f1132cf0be9..84b77c4cf62 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -103,11 +103,16 @@ func (s *pieceTaskSyncManager) cleanStaleWorker(destPeers []*scheduler.PeerPacke // cancel old workers if len(s.workers) != len(peers) { + var peersToRemove []string for p, worker := range s.workers { if !peers[p] { worker.close() + peersToRemove = append(peersToRemove, p) } } + for _, p := range peersToRemove { + delete(s.workers, p) + } } } @@ -118,10 +123,13 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( request.DstPid = dstPeer.PeerId if worker, ok := s.workers[dstPeer.PeerId]; ok { - // clean error worker - if worker.error.Load() != nil { - delete(s.workers, dstPeer.PeerId) + // worker is okay, keep it go on + if worker.error.Load() == nil { + s.peerTaskConductor.Infof("reuse PieceTaskSynchronizer %s", dstPeer.PeerId) + return nil } + // clean error worker + delete(s.workers, dstPeer.PeerId) } request.DstPid = dstPeer.PeerId @@ -216,12 +224,13 @@ func (s *pieceTaskSyncManager) acquire(request *base.PieceTaskRequest) { } func (s *pieceTaskSyncManager) cancel() { - s.RLock() + s.ctxCancel() + s.Lock() for _, p := range s.workers { p.close() } - s.RUnlock() - s.ctxCancel() + s.workers = map[string]*pieceTaskSynchronizer{} + s.Unlock() } func (s *pieceTaskSynchronizer) close() { @@ -242,10 +251,11 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePack if finished { s.peerTaskConductor.Done() } + return } for _, piece := range piecePacket.PieceInfos { - s.Infof("got piece %d from %s/%s, digest: %s, start: %d, size: %d, dest peer: %s", - piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize, s.dstPeer.PeerId) + s.Infof("got piece %d from %s/%s, digest: %s, start: %d, size: %d", + piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) // FIXME when set total piece but no total digest, fetch again s.peerTaskConductor.requestedPiecesLock.Lock() if !s.peerTaskConductor.requestedPieces.IsSet(piece.PieceNum) { @@ -272,40 +282,40 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePack } func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) { - var ( - err error - ) - s.dispatchPieceRequest(piecePacket) + var err error for { + s.dispatchPieceRequest(piecePacket) piecePacket, err = s.client.Recv() - if err == io.EOF { - s.Debugf("synchronizer receives io.EOF") - return - } if err != nil { - if s.canceled(err) { - s.Debugf("synchronizer receives canceled") - return - } - s.error.Store(err) - s.reportError() - s.Errorf("synchronizer receives with error: %s", err) - return + break } + } - s.dispatchPieceRequest(piecePacket) + if err == io.EOF { + s.Debugf("synchronizer receives io.EOF") + } else if s.canceled(err) { + s.error.Store(err) + s.Debugf("synchronizer receives canceled") + } else { + s.error.Store(err) + s.reportError() + s.Errorf("synchronizer receives with error: %s", err) } } func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) { + if s.error.Load() != nil { + s.Debugf("synchronizer already error %s, skip acquire more pieces", s.error.Load()) + return + } err := s.client.Send(request) if err != nil { + s.error.Store(err) if s.canceled(err) { s.Debugf("synchronizer sends canceled") return } s.Errorf("synchronizer sends with error: %s", err) - s.error.Store(err) s.reportError() return } diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 4f9987118ef..474216f44e5 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -155,6 +155,7 @@ func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) e } skipPieceCount := request.StartNum var sentMap = make(map[int32]struct{}) + // TODO if not found, try to send to peer task conductor, then download it first total, sent, err := s.sendFirstPieceTasks(request, sync, sentMap) if err != nil { return err diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index a2fbbf2138c..9a24510f564 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -94,7 +94,7 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() { for { request, err := s.sync.Recv() if err == io.EOF { - s.Infof("SyncPieceTasks done, exit receiving") + s.Infof("remote SyncPieceTasks done, exit receiving") return } if err != nil { @@ -145,7 +145,7 @@ loop: for { select { case <-s.done: - s.Infof("SyncPieceTasks done, exit sending, local task is running") + s.Infof("remote SyncPieceTasks done, exit sending, local task is running") return nil case info := <-s.PieceInfoChannel: // not desired piece @@ -157,15 +157,7 @@ loop: total, _, err := s.sendExistPieces(uint32(info.Num)) if err != nil { - if stat, ok := status.FromError(err); !ok { - // not grpc error - s.Errorf("sent exist pieces error: %s", err) - } else if stat.Code() == codes.Canceled { - err = nil - s.Debugf("SyncPieceTasks canceled, exit sending") - } else { - s.Warnf("SyncPieceTasks send error code %d/%s", stat.Code(), stat.Message()) - } + err = s.saveError(err) s.Unlock() return err } @@ -191,15 +183,7 @@ loop: } total, _, err := s.sendExistPieces(nextPieceNum) if err != nil { - if stat, ok := status.FromError(err); !ok { - // not grpc error - s.Errorf("sent exist pieces error: %s", err) - } else if stat.Code() == codes.Canceled { - err = nil - s.Debugf("SyncPieceTasks canceled, exit sending") - } else { - s.Warnf("SyncPieceTasks send error code %d/%s", stat.Code(), stat.Message()) - } + err = s.saveError(err) s.Unlock() return err } @@ -223,6 +207,19 @@ loop: } } +func (s *subscriber) saveError(err error) error { + if stat, ok := status.FromError(err); !ok { + // not grpc error + s.Errorf("sent exist pieces error: %s", err) + } else if stat.Code() == codes.Canceled { + err = nil + s.Debugf("SyncPieceTasks canceled, exit sending") + } else { + s.Warnf("SyncPieceTasks send error code %d/%s", stat.Code(), stat.Message()) + } + return err +} + func (s *subscriber) searchNextPieceNum(cur uint32) (nextPieceNum uint32) { for i := int32(cur); ; i++ { if _, ok := s.sentMap[i]; !ok { From 38d994bafb623ba491eb4ce80f10fe8b693133ba Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 11:51:40 +0800 Subject: [PATCH 2/7] fix: pieceTaskSynchronizer sync/atomic error Signed-off-by: Jim Ma --- .../daemon/peer/peertask_piecetask_synchronizer.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 84b77c4cf62..f36f5ac92d6 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -50,6 +50,10 @@ type pieceTaskSynchronizer struct { pieceRequestCh chan *DownloadPieceRequest } +type pieceTaskSynchronizerError struct { + err error +} + // FIXME for compatibility, sync will be called after the dfclient.GetPieceTasks deprecated and the pieceTaskPoller removed func (s *pieceTaskSyncManager) sync(pp *scheduler.PeerPacket, request *base.PieceTaskRequest) error { var ( @@ -235,7 +239,7 @@ func (s *pieceTaskSyncManager) cancel() { func (s *pieceTaskSynchronizer) close() { if err := s.client.CloseSend(); err != nil { - s.error.Store(err) + s.error.Store(&pieceTaskSynchronizerError{err}) s.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId) } } @@ -294,10 +298,10 @@ func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) { if err == io.EOF { s.Debugf("synchronizer receives io.EOF") } else if s.canceled(err) { - s.error.Store(err) + s.error.Store(&pieceTaskSynchronizerError{err}) s.Debugf("synchronizer receives canceled") } else { - s.error.Store(err) + s.error.Store(&pieceTaskSynchronizerError{err}) s.reportError() s.Errorf("synchronizer receives with error: %s", err) } @@ -305,12 +309,12 @@ func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) { func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) { if s.error.Load() != nil { - s.Debugf("synchronizer already error %s, skip acquire more pieces", s.error.Load()) + s.Debugf("synchronizer already error %s, skip acquire more pieces", s.error.Load().(*pieceTaskSynchronizerError).err) return } err := s.client.Send(request) if err != nil { - s.error.Store(err) + s.error.Store(&pieceTaskSynchronizerError{err}) if s.canceled(err) { s.Debugf("synchronizer sends canceled") return From bccda9ce0006b26a123ca546ed43e14e90372dac Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 12:21:39 +0800 Subject: [PATCH 3/7] feat: cancel piece download worker when back source Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 47 ++++++++++++++++++------ 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 89e57944872..81e37e9e4e5 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -65,6 +65,10 @@ type peerTaskConductor struct { // ctx is with span info for tracing // we use successCh and failCh mark task success or fail ctx context.Context + // piece download uses this context + pieceDownloadCtx context.Context + // when back source, cancel all piece download action + pieceDownloadCancel context.CancelFunc // host info about current host host *scheduler.PeerHost @@ -230,6 +234,8 @@ func (ptm *peerTaskManager) newPeerTaskConductor( peerTaskConductor: ptc, } + ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx) + return ptc } @@ -391,7 +397,26 @@ func (pt *peerTaskConductor) cancel(code base.Code, reason string) { }) } +func (pt *peerTaskConductor) markBackSource() { + pt.needBackSource.Store(true) + // when close peerPacketReady, pullPiecesFromPeers will invoke backSource + close(pt.peerPacketReady) +} + +// only use when schedule timeout +func (pt *peerTaskConductor) forceBackSource() { + pt.needBackSource.Store(true) + pt.backSource() +} + func (pt *peerTaskConductor) backSource() { + // cancel all piece download + pt.pieceDownloadCancel() + // cancel all sync pieces + if pt.pieceTaskSyncManager != nil { + pt.pieceTaskSyncManager.cancel() + } + ctx, span := tracer.Start(pt.ctx, config.SpanBackSource) pt.contentLength.Store(-1) err := pt.pieceManager.DownloadSource(ctx, pt, pt.request) @@ -570,9 +595,7 @@ loop: pt.Debugf("receive peerPacket %v", peerPacket) if peerPacket.Code != base.Code_Success { if peerPacket.Code == base.Code_SchedNeedBackSource { - pt.needBackSource.Store(true) - pt.pieceTaskSyncManager.cancel() - close(pt.peerPacketReady) + pt.markBackSource() pt.Infof("receive back source code") return } @@ -664,8 +687,7 @@ func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) { ) de, ok := err.(*dferrors.DfError) if ok && de.Code == base.Code_SchedNeedBackSource { - pt.needBackSource.Store(true) - close(pt.peerPacketReady) + pt.markBackSource() pt.Infof("receive back source code") return } else if ok && de.Code != base.Code_SchedNeedBackSource { @@ -900,8 +922,7 @@ func (pt *peerTaskConductor) waitFirstPeerPacket() (done bool, backSource bool) } pt.Warnf("start download from source due to %s", reasonScheduleTimeout) pt.span.AddEvent("back source due to schedule timeout") - pt.needBackSource.Store(true) - pt.backSource() + pt.forceBackSource() return false, true } } @@ -1039,7 +1060,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec pt.runningPiecesLock.Unlock() }() - ctx, span := tracer.Start(pt.ctx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum)) + ctx, span := tracer.Start(pt.pieceDownloadCtx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum)) span.SetAttributes(config.AttributePiece.Int(int(request.piece.PieceNum))) span.SetAttributes(config.AttributePieceWorker.Int(int(workerID))) @@ -1057,6 +1078,13 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec // result is always not nil, pieceManager will report begin and end time result, err := pt.pieceManager.DownloadPiece(ctx, request) if err != nil { + pt.ReportPieceResult(request, result, err) + span.SetAttributes(config.AttributePieceSuccess.Bool(false)) + span.End() + if pt.needBackSource.Load() { + pt.Infof("switch to back source, skip send failed piece") + return + } pt.pieceTaskSyncManager.acquire( &base.PieceTaskRequest{ TaskId: pt.taskID, @@ -1074,9 +1102,6 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec pt.failedPieceCh <- request.piece.PieceNum }() } - pt.ReportPieceResult(request, result, err) - span.SetAttributes(config.AttributePieceSuccess.Bool(false)) - span.End() return } // broadcast success piece From b908ca195c68e329453e94fbc643e5160a696848 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 12:22:33 +0800 Subject: [PATCH 4/7] chore: optimize acquire logic for failed piece Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 10 +++++++- .../peer/peertask_piecetask_synchronizer.go | 23 +++++++++++-------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 81e37e9e4e5..dad6d935a0c 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -1033,6 +1033,9 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo } pt.readyPiecesLock.RUnlock() pt.downloadPiece(id, request) + case <-pt.pieceDownloadCtx.Done(): + pt.Infof("piece download cancelled, peer download worker #%d exit", id) + return case <-pt.successCh: pt.Infof("peer task success, peer download worker #%d exit", id) return @@ -1085,13 +1088,18 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec pt.Infof("switch to back source, skip send failed piece") return } - pt.pieceTaskSyncManager.acquire( + attempt, success := pt.pieceTaskSyncManager.acquire( &base.PieceTaskRequest{ TaskId: pt.taskID, SrcPid: pt.peerID, StartNum: uint32(request.piece.PieceNum), Limit: 1, }) + pt.Infof("send failed piece to remote, attempt: %d, success: %s", attempt, success) + // when send to remote peer ok, skip send to failedPieceCh + if success > 0 { + return + } // Deprecated // send to fail chan and retry // try to send directly first, if failed channel is busy, create a new goroutine to do this diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index f36f5ac92d6..e1d6f06dc44 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -219,12 +219,16 @@ func (s *pieceTaskSyncManager) reportError(destPeer *scheduler.PeerPacket_DestPe } // acquire send the target piece to other peers -func (s *pieceTaskSyncManager) acquire(request *base.PieceTaskRequest) { +func (s *pieceTaskSyncManager) acquire(request *base.PieceTaskRequest) (attempt int, success int) { s.RLock() for _, p := range s.workers { - p.acquire(request) + attempt++ + if p.acquire(request) == nil { + success++ + } } s.RUnlock() + return } func (s *pieceTaskSyncManager) cancel() { @@ -307,22 +311,23 @@ func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) { } } -func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) { +func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) error { if s.error.Load() != nil { - s.Debugf("synchronizer already error %s, skip acquire more pieces", s.error.Load().(*pieceTaskSynchronizerError).err) - return + err := s.error.Load().(*pieceTaskSynchronizerError).err + s.Debugf("synchronizer already error %s, skip acquire more pieces", err) + return err } err := s.client.Send(request) if err != nil { s.error.Store(&pieceTaskSynchronizerError{err}) if s.canceled(err) { s.Debugf("synchronizer sends canceled") - return + } else { + s.Errorf("synchronizer sends with error: %s", err) + s.reportError() } - s.Errorf("synchronizer sends with error: %s", err) - s.reportError() - return } + return err } func (s *pieceTaskSynchronizer) reportError() { From f8188738f3ed2d1c08b3c5b2e2250b9047ee9999 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 14:03:43 +0800 Subject: [PATCH 5/7] chore: skip send failed to legacy peers when no legacy peers Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index dad6d935a0c..eb0e8608e46 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -107,7 +107,8 @@ type peerTaskConductor struct { // peerPacketStream stands schedulerclient.PeerPacketStream from scheduler peerPacketStream schedulerclient.PeerPacketStream // peerPacket is the latest available peers from peerPacketCh - peerPacket atomic.Value // *scheduler.PeerPacket + peerPacket atomic.Value // *scheduler.PeerPacket + legacyPeerCount *atomic.Int64 // peerPacketReady will receive a ready signal for peerPacket ready peerPacketReady chan bool // pieceTaskPoller pulls piece task from other peers @@ -209,6 +210,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor( taskID: taskID, successCh: make(chan struct{}), failCh: make(chan struct{}), + legacyPeerCount: atomic.NewInt64(0), span: span, readyPieces: NewBitmap(), runningPieces: NewBitmap(), @@ -640,7 +642,9 @@ loop: continue } - pt.Debugf("connect to %d legacy peers", len(peerPacket.StealPeers)) + legacyPeerCount := int64(len(peerPacket.StealPeers)) + pt.Debugf("connect to %d legacy peers", legacyPeerCount) + pt.legacyPeerCount.Store(legacyPeerCount) pt.peerPacket.Store(peerPacket) // legacy mode: send peerPacketReady @@ -1090,14 +1094,17 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec } attempt, success := pt.pieceTaskSyncManager.acquire( &base.PieceTaskRequest{ + Limit: 1, TaskId: pt.taskID, SrcPid: pt.peerID, StartNum: uint32(request.piece.PieceNum), - Limit: 1, }) - pt.Infof("send failed piece to remote, attempt: %d, success: %s", attempt, success) - // when send to remote peer ok, skip send to failedPieceCh - if success > 0 { + pt.Infof("send failed piece %d to remote, attempt: %d, success: %d", + request.piece.PieceNum, attempt, success) + + // when there is no legacy peers, skip send to failedPieceCh for legacy peers in background + if pt.legacyPeerCount.Load() == 0 { + pt.Infof("there is no legacy peers, skip send to failedPieceCh for legacy peers") return } // Deprecated @@ -1105,9 +1112,12 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec // try to send directly first, if failed channel is busy, create a new goroutine to do this select { case pt.failedPieceCh <- request.piece.PieceNum: + pt.Infof("success to send failed piece %d to failedPieceCh", request.piece.PieceNum) default: + pt.Infof("start to send failed piece %d to failedPieceCh in background", request.piece.PieceNum) go func() { pt.failedPieceCh <- request.piece.PieceNum + pt.Infof("success to send failed piece %d to failedPieceCh in background", request.piece.PieceNum) }() } return From adb9e4fb8a268045846b25bf27ec07e06aaa7d5a Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 15:35:01 +0800 Subject: [PATCH 6/7] chore: when sync pieces, send not ready pieces num instead of requested pieces num Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 47 ++++++++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index eb0e8608e46..45789332bfb 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -147,7 +147,7 @@ type peerTaskConductor struct { // requestedPieces stands all pieces requested from peers requestedPieces *Bitmap // lock used by piece download worker - requestedPiecesLock sync.Mutex + requestedPiecesLock sync.RWMutex // lock used by send piece result sendPieceResultLock sync.Mutex // limiter will be used when enable per peer task rate limit @@ -546,7 +546,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { func (pt *peerTaskConductor) receivePeerPacket(pieceRequestCh chan *DownloadPieceRequest) { var ( - lastPieceNum int32 = 0 + lastNotReadyPiece int32 = 0 peerPacket *scheduler.PeerPacket err error firstPacketReceived bool @@ -634,11 +634,13 @@ loop: firstPeerSpan.End() } // updateSynchronizer will update legacy peers to peerPacket.StealPeers only - pt.updateSynchronizer(lastPieceNum, peerPacket) + lastNotReadyPiece = pt.updateSynchronizer(lastNotReadyPiece, peerPacket) if !firstPacketReceived { // trigger legacy get piece once to avoid first schedule timeout firstPacketReceived = true } else if len(peerPacket.StealPeers) == 0 { + pt.Debugf("no legacy peers, skip to send peerPacketReady") + pt.legacyPeerCount.Store(0) continue } @@ -662,11 +664,11 @@ loop: } // updateSynchronizer will convert peers to synchronizer, if failed, will update failed peers to scheduler.PeerPacket -func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *scheduler.PeerPacket) { - num, ok := pt.getNextPieceNum(lastNum) +func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *scheduler.PeerPacket) int32 { + num, ok := pt.getNextNotReadyPieceNum(lastNum) if !ok { - pt.Infof("peer task completed") - return + pt.Infof("all pieces is ready, peer task completed, skip to synchronize") + return num } var peers = []*scheduler.PeerPacket_DestPeer{p.MainPeer} peers = append(peers, p.StealPeers...) @@ -675,6 +677,7 @@ func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *scheduler.Peer p.MainPeer = nil p.StealPeers = legacyPeers + return num } func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) { @@ -840,7 +843,7 @@ loop: // 3. dispatch piece request to all workers pt.dispatchPieceRequest(pieceRequestCh, piecePacket) - // 4. get next piece + // 4. get next not request piece if num, ok = pt.getNextPieceNum(num); ok { // get next piece success limit = config.DefaultPieceChanSize @@ -1056,6 +1059,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec if pt.runningPieces.IsSet(request.piece.PieceNum) { pt.runningPiecesLock.Unlock() pt.Log().Debugf("piece %d is downloading, skip", request.piece.PieceNum) + // TODO save to queue for failed pieces return } pt.runningPieces.Set(request.piece.PieceNum) @@ -1165,12 +1169,16 @@ func (pt *peerTaskConductor) isCompleted() bool { return pt.completedLength.Load() == pt.contentLength.Load() } +// for legacy peers only func (pt *peerTaskConductor) getNextPieceNum(cur int32) (int32, bool) { if pt.isCompleted() { return -1, false } i := cur // try to find next not requested piece + pt.requestedPiecesLock.Unlock() + defer pt.requestedPiecesLock.RUnlock() + for ; pt.requestedPieces.IsSet(i); i++ { } totalPiece := pt.GetTotalPieces() @@ -1185,6 +1193,29 @@ func (pt *peerTaskConductor) getNextPieceNum(cur int32) (int32, bool) { return i, true } +func (pt *peerTaskConductor) getNextNotReadyPieceNum(cur int32) (int32, bool) { + if pt.isCompleted() { + return 0, false + } + i := cur + // try to find next not ready piece + pt.readyPiecesLock.RLock() + defer pt.readyPiecesLock.RUnlock() + + for ; pt.readyPieces.IsSet(i); i++ { + } + totalPiece := pt.GetTotalPieces() + if totalPiece > 0 && i >= totalPiece { + // double check, re-search + for i = int32(0); pt.readyPieces.IsSet(i); i++ { + } + if totalPiece > 0 && i >= totalPiece { + return 0, false + } + } + return i, true +} + func (pt *peerTaskConductor) recoverFromPanic() { if r := recover(); r != nil { pt.Errorf("recovered from panic %q. Call stack:\n%v", r, string(debug.Stack())) From 8e06695c48090512fdfca562bfbdd6d4234d268f Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 15 Apr 2022 17:50:49 +0800 Subject: [PATCH 7/7] fix: Unlock of unlocked RWMutex Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 45789332bfb..bf057b6c813 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -1176,7 +1176,7 @@ func (pt *peerTaskConductor) getNextPieceNum(cur int32) (int32, bool) { } i := cur // try to find next not requested piece - pt.requestedPiecesLock.Unlock() + pt.requestedPiecesLock.RLock() defer pt.requestedPiecesLock.RUnlock() for ; pt.requestedPieces.IsSet(i); i++ {