Skip to content

Commit

Permalink
fix: sync pieces hang
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma committed Apr 2, 2022
1 parent bdca38b commit b0c7ca8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
1 change: 1 addition & 0 deletions client/daemon/peer/peertask_piecetask_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (s *pieceTaskSyncManager) newMultiPieceTaskSynchronizer(
}
err := s.newPieceTaskSynchronizer(s.ctx, peer, request)
if err == nil {
s.peerTaskConductor.Infof("connected to peer: %s", peer.PeerId)
continue
}
legacyPeers = append(legacyPeers, peer)
Expand Down
9 changes: 7 additions & 2 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ func (s *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque

// sendExistPieces will send as much as possible pieces
func (s *server) sendExistPieces(request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, sent int, err error) {
return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap)
return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap, true)
}

// sendFirstPieceTasks will send as much as possible pieces, even if no available pieces
func (s *server) sendFirstPieceTasks(request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, sent int, err error) {
return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap, false)
}

func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) error {
Expand All @@ -147,7 +152,7 @@ func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) e
}
skipPieceCount := request.StartNum
var sentMap = make(map[int32]struct{})
total, sent, err := s.sendExistPieces(request, sync, sentMap)
total, sent, err := s.sendFirstPieceTasks(request, sync, sentMap)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions client/daemon/rpcserver/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func sendExistPieces(
get func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error),
request *base.PieceTaskRequest,
sync dfdaemon.Daemon_SyncPieceTasksServer,
sendMap map[int32]struct{}) (total int32, sent int, err error) {
sendMap map[int32]struct{},
skipSendZeroPiece bool) (total int32, sent int, err error) {
if request.Limit <= 0 {
request.Limit = 16
}
Expand All @@ -64,7 +65,7 @@ func sendExistPieces(
if err != nil {
return -1, -1, err
}
if len(pp.PieceInfos) == 0 {
if len(pp.PieceInfos) == 0 && skipSendZeroPiece {
return pp.TotalPiece, sent, nil
}
if err = sync.Send(pp); err != nil {
Expand All @@ -85,7 +86,7 @@ func sendExistPieces(
// sendExistPieces will send as much as possible pieces
func (s *subscriber) sendExistPieces(startNum uint32) (total int32, sent int, err error) {
s.request.StartNum = startNum
return sendExistPieces(s.sync.Context(), s.getPieces, s.request, s.sync, s.sentMap)
return sendExistPieces(s.sync.Context(), s.getPieces, s.request, s.sync, s.sentMap, true)
}

func (s *subscriber) receiveRemainingPieceTaskRequests() {
Expand Down

0 comments on commit b0c7ca8

Please sign in to comment.