From 7d44095b7bde3a3a6973d48da25cb4edc95453cb Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Wed, 23 Mar 2022 11:15:07 +0800 Subject: [PATCH] feat: implement grpc client side sync pieces (#1167) * feat: implement grpc client side sync pieces Signed-off-by: Jim Ma * fix: client unit test Signed-off-by: Jim Ma * chore: report sync piece error Signed-off-by: Jim Ma * chore: optimize pieceTaskSyncManager cancel Signed-off-by: Jim Ma --- client/daemon/peer/peertask_bitmap.go | 5 + client/daemon/peer/peertask_conductor.go | 337 +++++--- client/daemon/peer/peertask_manager_test.go | 239 +++--- .../daemon/peer/peertask_piecetask_poller.go | 25 +- .../peer/peertask_piecetask_synchronizer.go | 311 +++++++ ...peertask_stream_backsource_partial_test.go | 6 + client/daemon/rpcserver/subscriber.go | 46 +- .../mock/daemongrpc/daemon_server_grpc.go | 761 ++++++++++++++++++ 8 files changed, 1493 insertions(+), 237 deletions(-) create mode 100644 client/daemon/test/mock/daemongrpc/daemon_server_grpc.go diff --git a/client/daemon/peer/peertask_bitmap.go b/client/daemon/peer/peertask_bitmap.go index d79278a3516..cd4374a4124 100644 --- a/client/daemon/peer/peertask_bitmap.go +++ b/client/daemon/peer/peertask_bitmap.go @@ -56,6 +56,11 @@ func (b *Bitmap) Set(i int32) { b.bits[i/8] |= 1 << uint(7-i%8) } +func (b *Bitmap) Clean(i int32) { + b.settled.Dec() + b.bits[i/8] ^= 1 << uint(7-i%8) +} + func (b *Bitmap) Settled() int32 { return b.settled.Load() } diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 10ee48499f4..09c5741eb4a 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -62,7 +62,7 @@ var _ Task = (*peerTaskConductor)(nil) type peerTaskConductor struct { *logger.SugaredLoggerOnWith // ctx is with span info for tracing - // we did not use cancel with ctx, use successCh and failCh instead + // we use successCh and failCh mark task success or fail ctx context.Context // host info about current host @@ -87,8 +87,8 @@ type peerTaskConductor struct { // peer task meta info peerID string taskID string - totalPiece int32 - digest string + totalPiece *atomic.Int32 + digest *atomic.String contentLength *atomic.Int64 completedLength *atomic.Int64 usedTraffic *atomic.Uint64 @@ -105,10 +105,11 @@ type peerTaskConductor struct { peerPacket atomic.Value // *scheduler.PeerPacket // peerPacketReady will receive a ready signal for peerPacket ready peerPacketReady chan bool - // pieceParallelCount stands the piece parallel count from peerPacket - pieceParallelCount *atomic.Int32 // pieceTaskPoller pulls piece task from other peers + // Deprecated: pieceTaskPoller is deprecated, use pieceTaskSyncManager pieceTaskPoller *pieceTaskPoller + // pieceTaskSyncManager syncs piece task from other peers + pieceTaskSyncManager *pieceTaskSyncManager // same actions must be done only once, like close done channel and so on statusOnce sync.Once @@ -129,12 +130,18 @@ type peerTaskConductor struct { // failedReason will be set when peer task failed failedCode base.Code - // readyPieces stands all pieces download status + // readyPieces stands all downloaded pieces readyPieces *Bitmap + // lock used by piece result manage, when update readyPieces, lock first + readyPiecesLock sync.RWMutex + // runningPieces stands all downloading pieces + runningPieces *Bitmap + // lock used by piece download worker + runningPiecesLock sync.Mutex // requestedPieces stands all pieces requested from peers requestedPieces *Bitmap - // lock used by piece result manage, when update readyPieces, lock first - lock sync.RWMutex + // lock used by piece download worker + requestedPiecesLock sync.Mutex // limiter will be used when enable per peer task rate limit limiter *rate.Limiter @@ -197,13 +204,14 @@ func (ptm *peerTaskManager) newPeerTaskConductor( failCh: make(chan struct{}), span: span, readyPieces: NewBitmap(), + runningPieces: NewBitmap(), requestedPieces: NewBitmap(), failedPieceCh: make(chan int32, config.DefaultPieceChanSize), failedReason: failedReasonNotSet, failedCode: base.Code_UnknownError, contentLength: atomic.NewInt64(-1), - pieceParallelCount: atomic.NewInt32(0), - totalPiece: -1, + totalPiece: atomic.NewInt32(-1), + digest: atomic.NewString(""), schedulerOption: ptm.schedulerOption, limiter: rate.NewLimiter(limit, int(limit)), completedLength: atomic.NewInt64(0), @@ -350,19 +358,19 @@ func (pt *peerTaskConductor) GetTraffic() uint64 { } func (pt *peerTaskConductor) GetTotalPieces() int32 { - return pt.totalPiece + return pt.totalPiece.Load() } func (pt *peerTaskConductor) SetTotalPieces(i int32) { - pt.totalPiece = i + pt.totalPiece.Store(i) } func (pt *peerTaskConductor) SetPieceMd5Sign(md5 string) { - pt.digest = md5 + pt.digest.Store(md5) } func (pt *peerTaskConductor) GetPieceMd5Sign() string { - return pt.digest + return pt.digest.Load() } func (pt *peerTaskConductor) Context() context.Context { @@ -409,15 +417,33 @@ func (pt *peerTaskConductor) pullPieces() { case base.SizeScope_TINY: pt.storeTinyPeerTask() case base.SizeScope_SMALL: - go pt.pullSinglePiece() + pt.pullSinglePiece() case base.SizeScope_NORMAL: - go pt.receivePeerPacket() - go pt.pullPiecesFromPeers() + pt.pullPiecesWithP2P() default: pt.cancel(base.Code_ClientError, fmt.Sprintf("unknown size scope: %d", pt.sizeScope)) } } +func (pt *peerTaskConductor) pullPiecesWithP2P() { + var ( + // keep same size with pt.failedPieceCh for avoiding dead lock + pieceBufferSize = uint32(config.DefaultPieceChanSize) + pieceRequestCh = make(chan *DownloadPieceRequest, pieceBufferSize) + ) + ctx, cancel := context.WithCancel(pt.ctx) + + pt.pieceTaskSyncManager = &pieceTaskSyncManager{ + ctx: ctx, + ctxCancel: cancel, + peerTaskConductor: pt, + pieceRequestCh: pieceRequestCh, + workers: map[string]*pieceTaskSynchronizer{}, + } + go pt.pullPiecesFromPeers(pieceRequestCh) + pt.receivePeerPacket(pieceRequestCh) +} + func (pt *peerTaskConductor) storeTinyPeerTask() { l := int64(len(pt.tinyData.Content)) pt.SetContentLength(l) @@ -485,17 +511,18 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { pt.PublishPieceInfo(0, uint32(l)) } -func (pt *peerTaskConductor) receivePeerPacket() { +func (pt *peerTaskConductor) receivePeerPacket(pieceRequestCh chan *DownloadPieceRequest) { var ( - peerPacket *scheduler.PeerPacket - err error - firstSpanDone bool + lastPieceNum int32 = 0 + peerPacket *scheduler.PeerPacket + err error + firstPacketReceived bool ) // only record first schedule result // other schedule result will record as an event in peer task span _, firstPeerSpan := tracer.Start(pt.ctx, config.SpanFirstSchedule) defer func() { - if !firstSpanDone { + if !firstPacketReceived { firstPeerSpan.End() } if pt.needBackSource.Load() { @@ -528,7 +555,7 @@ loop: } if err != nil { pt.confirmReceivePeerPacketError(err) - if !firstSpanDone { + if !firstPacketReceived { firstPeerSpan.RecordError(err) } break loop @@ -538,6 +565,7 @@ loop: if peerPacket.Code != base.Code_Success { if peerPacket.Code == base.Code_SchedNeedBackSource { pt.needBackSource.Store(true) + pt.pieceTaskSyncManager.cancel() close(pt.peerPacketReady) pt.Infof("receive back source code") return @@ -546,7 +574,7 @@ loop: if pt.isExitPeerPacketCode(peerPacket) { pt.Errorf(pt.failedReason) pt.cancel(pt.failedCode, pt.failedReason) - if !firstSpanDone { + if !firstPacketReceived { firstPeerSpan.RecordError(fmt.Errorf(pt.failedReason)) } pt.span.AddEvent("receive exit peer packet", @@ -568,16 +596,25 @@ loop: peerPacket.MainPeer.PeerId, peerPacket.ParallelCount) pt.span.AddEvent("receive new peer packet", trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))) - if !firstSpanDone { - firstSpanDone = true + + if !firstPacketReceived { + pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestCh) firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)) firstPeerSpan.End() } + // updateSynchronizer will update legacy peers to peerPacket.StealPeers only + pt.updateSynchronizer(lastPieceNum, peerPacket) + if !firstPacketReceived { + // trigger legacy get piece once to avoid first schedule timeout + firstPacketReceived = true + } else if len(peerPacket.StealPeers) == 0 { + continue + } + pt.Debugf("connect to %d legacy peers", len(peerPacket.StealPeers)) pt.peerPacket.Store(peerPacket) - pt.pieceParallelCount.Store(peerPacket.ParallelCount) - // send peerPacketReady + // legacy mode: send peerPacketReady select { case pt.peerPacketReady <- true: case <-pt.successCh: @@ -591,6 +628,22 @@ loop: } } +// updateSynchronizer will convert peers to synchronizer, if failed, will update failed peers to scheduler.PeerPacket +func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *scheduler.PeerPacket) { + num, ok := pt.getNextPieceNum(lastNum) + if !ok { + pt.Infof("peer task completed") + return + } + var peers = []*scheduler.PeerPacket_DestPeer{p.MainPeer} + peers = append(peers, p.StealPeers...) + + legacyPeers := pt.pieceTaskSyncManager.newMultiPieceTaskSynchronizer(peers, num) + + p.MainPeer = nil + p.StealPeers = legacyPeers +} + func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) { select { case <-pt.successCh: @@ -683,12 +736,12 @@ func (pt *peerTaskConductor) pullSinglePiece() { pt.Warnf("single piece download failed, switch to download from other peers") pt.ReportPieceResult(request, result, err) - go pt.receivePeerPacket() - pt.pullPiecesFromPeers() + pt.pullPiecesWithP2P() } } -func (pt *peerTaskConductor) pullPiecesFromPeers() { +// Deprecated +func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestCh chan *DownloadPieceRequest) { if ok, backSource := pt.waitFirstPeerPacket(); !ok { if backSource { return @@ -697,15 +750,21 @@ func (pt *peerTaskConductor) pullPiecesFromPeers() { return } var ( - num int32 - ok bool - limit uint32 - initialized bool - pieceRequestCh chan *DownloadPieceRequest - // keep same size with pt.failedPieceCh for avoiding dead-lock - pieceBufferSize = uint32(config.DefaultPieceChanSize) + num int32 + ok bool + limit uint32 ) - limit = pieceBufferSize + + // ensure first peer packet is not nil + peerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) + if len(peerPacket.StealPeers) == 0 { + num, ok = pt.waitAvailablePeerPacket() + if !ok { + return + } + } + + limit = config.DefaultPieceChanSize loop: for { // 1, check whether catch exit signal or get a failed piece @@ -743,33 +802,7 @@ loop: continue loop } - if !initialized { - initialized = true - if pieceRequestCh, ok = pt.init(piecePacket, pieceBufferSize); !ok { - break loop - } - } - - // update total piece - if piecePacket.TotalPiece > pt.totalPiece { - pt.totalPiece = piecePacket.TotalPiece - _ = pt.UpdateStorage() - pt.Debugf("update total piece count: %d", pt.totalPiece) - } - - // update digest - if len(piecePacket.PieceMd5Sign) > 0 && len(pt.digest) == 0 { - pt.digest = piecePacket.PieceMd5Sign - _ = pt.UpdateStorage() - pt.Debugf("update digest: %s", pt.digest) - } - - // update content length - if piecePacket.ContentLength > -1 { - pt.SetContentLength(piecePacket.ContentLength) - _ = pt.UpdateStorage() - pt.Debugf("update content length: %d", pt.GetContentLength()) - } + pt.updateMetadata(piecePacket) // 3. dispatch piece request to all workers pt.dispatchPieceRequest(pieceRequestCh, piecePacket) @@ -777,7 +810,7 @@ loop: // 4. get next piece if num, ok = pt.getNextPieceNum(num); ok { // get next piece success - limit = pieceBufferSize + limit = config.DefaultPieceChanSize continue } @@ -793,18 +826,45 @@ loop: } } -func (pt *peerTaskConductor) init(piecePacket *base.PiecePacket, pieceBufferSize uint32) (chan *DownloadPieceRequest, bool) { - pt.contentLength.Store(piecePacket.ContentLength) +func (pt *peerTaskConductor) updateMetadata(piecePacket *base.PiecePacket) { + // update total piece + var metadataChanged bool + if piecePacket.TotalPiece > pt.GetTotalPieces() { + metadataChanged = true + pt.SetTotalPieces(piecePacket.TotalPiece) + pt.Debugf("update total piece count: %d", piecePacket.TotalPiece) + } + + // update digest + if len(piecePacket.PieceMd5Sign) > 0 && len(pt.GetPieceMd5Sign()) == 0 { + metadataChanged = true + pt.SetPieceMd5Sign(piecePacket.PieceMd5Sign) + pt.Debugf("update digest: %s", piecePacket.PieceMd5Sign) + } + + // update content length if piecePacket.ContentLength > -1 { + metadataChanged = true + pt.SetContentLength(piecePacket.ContentLength) pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength)) + pt.Debugf("update content length: %d", piecePacket.ContentLength) + } + + if metadataChanged { + err := pt.UpdateStorage() + if err != nil { + pt.Errorf("update storage error: %s", err) + } } +} - pc := pt.peerPacket.Load().(*scheduler.PeerPacket).ParallelCount - pieceRequestCh := make(chan *DownloadPieceRequest, pieceBufferSize) - for i := int32(0); i < pc; i++ { +func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestCh chan *DownloadPieceRequest) { + if count < 1 { + count = 4 + } + for i := int32(0); i < count; i++ { go pt.downloadPieceWorker(i, pieceRequestCh) } - return pieceRequestCh, true } func (pt *peerTaskConductor) waitFirstPeerPacket() (done bool, backSource bool) { @@ -838,6 +898,7 @@ func (pt *peerTaskConductor) waitFirstPeerPacket() (done bool, backSource bool) } } +// Deprecated func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) { // only <-pt.peerPacketReady continue loop, others break select { @@ -874,6 +935,7 @@ func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) { return -1, false } +// Deprecated func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) { pieceCount := len(piecePacket.PieceInfos) pt.Debugf("dispatch piece request, piece count: %d", pieceCount) @@ -888,9 +950,11 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d", piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) // FIXME when set total piece but no total digest, fetch again + pt.requestedPiecesLock.Lock() if !pt.requestedPieces.IsSet(piece.PieceNum) { pt.requestedPieces.Set(piece.PieceNum) } + pt.requestedPiecesLock.Unlock() req := &DownloadPieceRequest{ storage: pt.GetStorage(), piece: piece, @@ -932,46 +996,14 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo for { select { case request := <-requests: - pt.lock.RLock() + pt.readyPiecesLock.RLock() if pt.readyPieces.IsSet(request.piece.PieceNum) { - pt.lock.RUnlock() + pt.readyPiecesLock.RUnlock() pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum) continue } - pt.lock.RUnlock() - - ctx, span := tracer.Start(pt.ctx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum)) - span.SetAttributes(config.AttributePiece.Int(int(request.piece.PieceNum))) - span.SetAttributes(config.AttributePieceWorker.Int(int(id))) - - // wait limit - if pt.limiter != nil && !pt.waitLimit(ctx, request) { - span.SetAttributes(config.AttributePieceSuccess.Bool(false)) - span.End() - return - } - - pt.Debugf("peer download worker #%d receive piece task, "+ - "dest peer id: %s, piece num: %d, range start: %d, range size: %d", - id, request.DstPid, request.piece.PieceNum, request.piece.RangeStart, request.piece.RangeSize) - // download piece - // result is always not nil, pieceManager will report begin and end time - result, err := pt.pieceManager.DownloadPiece(ctx, request) - if err != nil { - // send to fail chan and retry - pt.failedPieceCh <- request.piece.PieceNum - pt.ReportPieceResult(request, result, err) - span.SetAttributes(config.AttributePieceSuccess.Bool(false)) - span.End() - continue - } else { - // broadcast success piece - pt.reportSuccessResult(request, result) - pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) - } - - span.SetAttributes(config.AttributePieceSuccess.Bool(true)) - span.End() + pt.readyPiecesLock.RUnlock() + pt.downloadPiece(id, request) case <-pt.successCh: pt.Infof("peer task success, peer download worker #%d exit", id) return @@ -982,6 +1014,70 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo } } +func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) { + // only downloading piece in one worker at same time + pt.runningPiecesLock.Lock() + if pt.runningPieces.IsSet(request.piece.PieceNum) { + pt.runningPiecesLock.Unlock() + pt.Log().Debugf("piece %d is downloading, skip", request.piece.PieceNum) + return + } + pt.runningPieces.Set(request.piece.PieceNum) + pt.runningPiecesLock.Unlock() + + defer func() { + pt.runningPiecesLock.Lock() + pt.runningPieces.Clean(request.piece.PieceNum) + pt.runningPiecesLock.Unlock() + }() + + ctx, span := tracer.Start(pt.ctx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum)) + span.SetAttributes(config.AttributePiece.Int(int(request.piece.PieceNum))) + span.SetAttributes(config.AttributePieceWorker.Int(int(workerID))) + + // wait limit + if pt.limiter != nil && !pt.waitLimit(ctx, request) { + span.SetAttributes(config.AttributePieceSuccess.Bool(false)) + span.End() + return + } + + pt.Debugf("peer download worker #%d receive piece task, "+ + "dest peer id: %s, piece num: %d, range start: %d, range size: %d", + workerID, request.DstPid, request.piece.PieceNum, request.piece.RangeStart, request.piece.RangeSize) + // download piece + // result is always not nil, pieceManager will report begin and end time + result, err := pt.pieceManager.DownloadPiece(ctx, request) + if err != nil { + pt.pieceTaskSyncManager.acquire( + &base.PieceTaskRequest{ + TaskId: pt.taskID, + SrcPid: pt.peerID, + StartNum: uint32(request.piece.PieceNum), + Limit: 1, + }) + // send to fail chan and retry + // try to send directly first, if failed channel is busy, create a new goroutine to do this + select { + case pt.failedPieceCh <- request.piece.PieceNum: + default: + go func() { + pt.failedPieceCh <- request.piece.PieceNum + }() + } + pt.ReportPieceResult(request, result, err) + span.SetAttributes(config.AttributePieceSuccess.Bool(false)) + span.End() + return + } + // broadcast success piece + pt.reportSuccessResult(request, result) + pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) + + span.SetAttributes(config.AttributePieceSuccess.Bool(true)) + span.End() +} + func (pt *peerTaskConductor) waitLimit(ctx context.Context, request *DownloadPieceRequest) bool { _, waitSpan := tracer.Start(ctx, config.SpanWaitPieceLimit) err := pt.limiter.WaitN(pt.ctx, int(request.piece.RangeSize)) @@ -1025,11 +1121,12 @@ func (pt *peerTaskConductor) getNextPieceNum(cur int32) (int32, bool) { // try to find next not requested piece for ; pt.requestedPieces.IsSet(i); i++ { } - if pt.totalPiece > 0 && i >= pt.totalPiece { + totalPiece := pt.GetTotalPieces() + if totalPiece > 0 && i >= totalPiece { // double check, re-search not success or not requested pieces for i = int32(0); pt.requestedPieces.IsSet(i); i++ { } - if pt.totalPiece > 0 && i >= pt.totalPiece { + if totalPiece > 0 && i >= totalPiece { return -1, false } } @@ -1168,6 +1265,9 @@ func (pt *peerTaskConductor) done() { defer func() { pt.broker.Stop() pt.span.End() + if pt.pieceTaskSyncManager != nil { + pt.pieceTaskSyncManager.cancel() + } }() var ( cost = time.Since(pt.startTime).Milliseconds() @@ -1229,7 +1329,7 @@ func (pt *peerTaskConductor) done() { Url: pt.request.Url, ContentLength: pt.GetContentLength(), Traffic: pt.GetTraffic(), - TotalPieceCount: pt.totalPiece, + TotalPieceCount: pt.GetTotalPieces(), Cost: uint32(cost), Success: success, Code: code, @@ -1252,6 +1352,9 @@ func (pt *peerTaskConductor) fail() { close(pt.failCh) pt.broker.Stop() pt.span.End() + if pt.pieceTaskSyncManager != nil { + pt.pieceTaskSyncManager.cancel() + } }() pt.peerTaskManager.PeerTaskDone(pt.taskID) var end = time.Now() @@ -1276,7 +1379,7 @@ func (pt *peerTaskConductor) fail() { Url: pt.request.Url, ContentLength: pt.GetContentLength(), Traffic: pt.GetTraffic(), - TotalPieceCount: pt.totalPiece, + TotalPieceCount: pt.GetTotalPieces(), Cost: uint32(end.Sub(pt.startTime).Milliseconds()), Success: false, Code: pt.failedCode, @@ -1302,7 +1405,7 @@ func (pt *peerTaskConductor) Validate() error { TaskID: pt.taskID, }, MetadataOnly: true, - TotalPieces: pt.totalPiece, + TotalPieces: pt.GetTotalPieces(), }) if err != nil { pt.Errorf("store metadata error: %s", err) @@ -1328,16 +1431,16 @@ func (pt *peerTaskConductor) Validate() error { func (pt *peerTaskConductor) PublishPieceInfo(pieceNum int32, size uint32) { // mark piece ready - pt.lock.Lock() + pt.readyPiecesLock.Lock() if pt.readyPieces.IsSet(pieceNum) { - pt.lock.Unlock() + pt.readyPiecesLock.Unlock() pt.Warnf("piece %d is already reported, skipped", pieceNum) return } // mark piece processed pt.readyPieces.Set(pieceNum) pt.completedLength.Add(int64(size)) - pt.lock.Unlock() + pt.readyPiecesLock.Unlock() finished := pt.isCompleted() if finished { diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 7cb8892fb18..6eb2def623c 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -36,8 +36,11 @@ import ( "github.com/phayes/freeport" testifyassert "github.com/stretchr/testify/assert" testifyrequire "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" @@ -51,6 +54,7 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -60,6 +64,11 @@ import ( "d7y.io/dragonfly/v2/pkg/util/digestutils" ) +func TestMain(m *testing.M) { + logger.SetLevel(zapcore.DebugLevel) + os.Exit(m.Run()) +} + type componentsOption struct { taskID string contentLength int64 @@ -71,8 +80,10 @@ type componentsOption struct { backSource bool scope base.SizeScope content []byte + getPieceTasks bool } +//go:generate mockgen -package mock_server_grpc -source ../../../pkg/rpc/dfdaemon/dfdaemon_grpc.pb.go -destination ../test/mock/daemongrpc/daemon_server_grpc.go func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) ( schedulerclient.SchedulerClient, storage.Manager) { port := int32(freeport.GetPort()) @@ -86,37 +97,72 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio pieces[i] = digestutils.Md5Reader(io.LimitReader(r, int64(opt.pieceSize))) } totalDigests := digestutils.Sha256(pieces...) - daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) { - var tasks []*base.PieceInfo - for i := uint32(0); i < request.Limit; i++ { - start := opt.pieceSize * (request.StartNum + i) - if int64(start)+1 > opt.contentLength { + genPiecePacket := func(request *base.PieceTaskRequest) *base.PiecePacket { + var tasks []*base.PieceInfo + for i := uint32(0); i < request.Limit; i++ { + start := opt.pieceSize * (request.StartNum + i) + if int64(start)+1 > opt.contentLength { + break + } + size := opt.pieceSize + if int64(start+opt.pieceSize) > opt.contentLength { + size = uint32(opt.contentLength) - start + } + tasks = append(tasks, + &base.PieceInfo{ + PieceNum: int32(request.StartNum + i), + RangeStart: uint64(start), + RangeSize: size, + PieceMd5: pieces[request.StartNum+i], + PieceOffset: 0, + PieceStyle: 0, + }) + } + return &base.PiecePacket{ + TaskId: request.TaskId, + DstPid: "peer-x", + PieceInfos: tasks, + ContentLength: opt.contentLength, + TotalPiece: int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize))), + PieceMd5Sign: totalDigests, + } + } + if opt.getPieceTasks { + daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) { + return genPiecePacket(request), nil + }) + daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemon.Daemon_SyncPieceTasksServer) error { + return status.Error(codes.Unimplemented, "TODO") + }) + } else { + daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) { + return nil, status.Error(codes.Unimplemented, "TODO") + }) + daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemon.Daemon_SyncPieceTasksServer) error { + request, err := s.Recv() + if err != nil { + return err + } + if err = s.Send(genPiecePacket(request)); err != nil { + return err + } + for { + request, err = s.Recv() + if err == io.EOF { break } - size := opt.pieceSize - if int64(start+opt.pieceSize) > opt.contentLength { - size = uint32(opt.contentLength) - start + if err != nil { + return err + } + if err = s.Send(genPiecePacket(request)); err != nil { + return err } - tasks = append(tasks, - &base.PieceInfo{ - PieceNum: int32(request.StartNum + i), - RangeStart: uint64(start), - RangeSize: size, - PieceMd5: pieces[request.StartNum+i], - PieceOffset: 0, - PieceStyle: 0, - }) } - return &base.PiecePacket{ - TaskId: request.TaskId, - DstPid: "peer-x", - PieceInfos: tasks, - ContentLength: opt.contentLength, - TotalPiece: int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize))), - PieceMd5Sign: totalDigests, - }, nil + return nil }) + } ln, _ := rpc.Listen(dfnet.NetAddr{ Type: "tcp", Addr: fmt.Sprintf("0.0.0.0:%d", port), @@ -289,6 +335,7 @@ type testSpec struct { sizeScope base.SizeScope peerID string url string + legacyFeature bool // when urlGenerator is not nil, use urlGenerator instead url // it's useful for httptest server urlGenerator func(ts *testSpec) string @@ -324,7 +371,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { return downloader } - taskTypes := []int{taskTypeFile, taskTypeStream, taskTypeConductor} + taskTypes := []int{taskTypeConductor, taskTypeFile, taskTypeStream} + taskTypeNames := []string{"conductor", "file", "stream"} testCases := []testSpec{ { @@ -500,72 +548,77 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { t.Run(_tc.name, func(t *testing.T) { assert := testifyassert.New(t) require := testifyrequire.New(t) - for _, typ := range taskTypes { - // dup a new test case with the task type - logger.Infof("-------------------- test %s - type %d, started --------------------", _tc.name, typ) - tc := _tc - tc.taskType = typ - func() { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockContentLength := len(tc.taskData) - - urlMeta := &base.UrlMeta{ - Tag: "d7y-test", - } - - if tc.httpRange != nil { - urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") - } - - if tc.urlGenerator != nil { - tc.url = tc.urlGenerator(&tc) - } - taskID := idgen.TaskID(tc.url, urlMeta) - - var ( - downloader PieceDownloader - sourceClient source.ResourceClient - ) - - if tc.mockPieceDownloader != nil { - downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) - } - - if tc.mockHTTPSourceClient != nil { - source.UnRegister("http") - defer func() { - // reset source client + for _, legacy := range []bool{true, false} { + for _, typ := range taskTypes { + // dup a new test case with the task type + logger.Infof("-------------------- test %s - type %s, legacy feature: %v started --------------------", + _tc.name, taskTypeNames[typ], legacy) + tc := _tc + tc.taskType = typ + tc.legacyFeature = legacy + func() { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockContentLength := len(tc.taskData) + + urlMeta := &base.UrlMeta{ + Tag: "d7y-test", + } + + if tc.httpRange != nil { + urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") + } + + if tc.urlGenerator != nil { + tc.url = tc.urlGenerator(&tc) + } + taskID := idgen.TaskID(tc.url, urlMeta) + + var ( + downloader PieceDownloader + sourceClient source.ResourceClient + ) + + if tc.mockPieceDownloader != nil { + downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) + } + + if tc.mockHTTPSourceClient != nil { source.UnRegister("http") - require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) - }() - // replace source client - sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url) - require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) - } - - option := componentsOption{ - taskID: taskID, - contentLength: int64(mockContentLength), - pieceSize: uint32(tc.pieceSize), - pieceParallelCount: tc.pieceParallelCount, - pieceDownloader: downloader, - sourceClient: sourceClient, - content: tc.taskData, - scope: tc.sizeScope, - peerPacketDelay: tc.peerPacketDelay, - backSource: tc.backSource, - } - // keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same - if tc.taskType == taskTypeConductor { - option.peerPacketDelay = []time.Duration{time.Second} - } - mm := setupMockManager(ctrl, &tc, option) - defer mm.CleanUp() - - tc.run(assert, require, mm, urlMeta) - }() - logger.Infof("-------------------- test %s - type %d, finished --------------------", _tc.name, typ) + defer func() { + // reset source client + source.UnRegister("http") + require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) + }() + // replace source client + sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url) + require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) + } + + option := componentsOption{ + taskID: taskID, + contentLength: int64(mockContentLength), + pieceSize: uint32(tc.pieceSize), + pieceParallelCount: tc.pieceParallelCount, + pieceDownloader: downloader, + sourceClient: sourceClient, + content: tc.taskData, + scope: tc.sizeScope, + peerPacketDelay: tc.peerPacketDelay, + backSource: tc.backSource, + getPieceTasks: tc.legacyFeature, + } + // keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same + if tc.taskType == taskTypeConductor { + option.peerPacketDelay = []time.Duration{time.Second} + } + mm := setupMockManager(ctrl, &tc, option) + defer mm.CleanUp() + + tc.run(assert, require, mm, urlMeta) + }() + logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ]) + } } }) } @@ -665,7 +718,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * go func(ptc *peerTaskConductor) { defer wg.Done() select { - case <-time.After(10 * time.Second): + case <-time.After(5 * time.Minute): ptc.Fail() case <-ptc.successCh: return diff --git a/client/daemon/peer/peertask_piecetask_poller.go b/client/daemon/peer/peertask_piecetask_poller.go index 6dab074e9d4..62714cca0a3 100644 --- a/client/daemon/peer/peertask_piecetask_poller.go +++ b/client/daemon/peer/peertask_piecetask_poller.go @@ -47,24 +47,11 @@ prepare: retryCount++ poller.peerTaskConductor.Debugf("prepare piece tasks, retry count: %d", retryCount) peerPacket := ptc.peerPacket.Load().(*scheduler.PeerPacket) - ptc.pieceParallelCount.Store(peerPacket.ParallelCount) if poller.peerTaskConductor.needBackSource.Load() { return nil, fmt.Errorf("need back source") } - request.DstPid = peerPacket.MainPeer.PeerId - pp, err = poller.preparePieceTasksByPeer(peerPacket, peerPacket.MainPeer, request) - if err == nil { - return - } - if err == errPeerPacketChanged { - if poller.getPiecesMaxRetry > 0 && retryCount > poller.getPiecesMaxRetry { - err = fmt.Errorf("get pieces max retry count reached") - return - } - goto prepare - } for _, peer := range peerPacket.StealPeers { if poller.peerTaskConductor.needBackSource.Load() { return nil, fmt.Errorf("need back source") @@ -193,9 +180,9 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer( // fast way 2 to exit retry lastPeerPacket := ptc.peerPacket.Load().(*scheduler.PeerPacket) - if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId { + if curPeerPacket.StealPeers[0].PeerId != lastPeerPacket.StealPeers[0].PeerId { ptc.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getError, - curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId) + curPeerPacket.StealPeers[0].PeerId, lastPeerPacket.StealPeers[0].PeerId) peerPacketChanged = true return nil, true, nil } @@ -206,7 +193,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer( return piecePacket, false, nil } // need update metadata - if piecePacket.ContentLength > ptc.contentLength.Load() || piecePacket.TotalPiece > ptc.totalPiece { + if piecePacket.ContentLength > ptc.contentLength.Load() || piecePacket.TotalPiece > ptc.GetTotalPieces() { return piecePacket, false, nil } // invalid request num @@ -227,16 +214,16 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer( FinishedCount: ptc.readyPieces.Settled(), }) if sendError != nil { - ptc.cancel(base.Code_SchedError, sendError.Error()) + ptc.cancel(base.Code_ClientPieceRequestFail, sendError.Error()) span.RecordError(sendError) ptc.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", sendError) return nil, true, sendError } // fast way to exit retry lastPeerPacket := ptc.peerPacket.Load().(*scheduler.PeerPacket) - if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId { + if curPeerPacket.StealPeers[0].PeerId != lastPeerPacket.StealPeers[0].PeerId { ptc.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", - curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId) + curPeerPacket.StealPeers[0].PeerId, lastPeerPacket.StealPeers[0].PeerId) peerPacketChanged = true return nil, true, nil } diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 305c4401d4b..a0b76d16a0e 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -15,3 +15,314 @@ */ package peer + +import ( + "context" + "io" + "sync" + + "go.uber.org/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" + dfclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" +) + +type pieceTaskSyncManager struct { + sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc + peerTaskConductor *peerTaskConductor + pieceRequestCh chan *DownloadPieceRequest + workers map[string]*pieceTaskSynchronizer +} + +type pieceTaskSynchronizer struct { + peerTaskConductor *peerTaskConductor + pieceRequestCh chan *DownloadPieceRequest + dstPeer *scheduler.PeerPacket_DestPeer + client dfdaemon.Daemon_SyncPieceTasksClient + error atomic.Value +} + +// FIXME for compatibility, sync will be called after the dfclient.GetPieceTasks deprecated and the pieceTaskPoller removed +func (s *pieceTaskSyncManager) sync(pp *scheduler.PeerPacket, request *base.PieceTaskRequest) error { + var ( + peers = map[string]bool{} + errors []error + ) + peers[pp.MainPeer.PeerId] = true + // TODO if the worker failed, reconnect and retry + s.Lock() + defer s.Unlock() + if _, ok := s.workers[pp.MainPeer.PeerId]; !ok { + err := s.newPieceTaskSynchronizer(s.ctx, pp.MainPeer, request) + if err != nil { + s.peerTaskConductor.Errorf("main peer SyncPieceTasks error: %s", err) + errors = append(errors, err) + } + } + for _, p := range pp.StealPeers { + peers[p.PeerId] = true + if _, ok := s.workers[p.PeerId]; !ok { + err := s.newPieceTaskSynchronizer(s.ctx, p, request) + if err != nil { + s.peerTaskConductor.Errorf("steal peer SyncPieceTasks error: %s", err) + errors = append(errors, err) + } + } + } + + // cancel old workers + if len(s.workers) != len(peers) { + for p, worker := range s.workers { + if !peers[p] { + worker.close() + } + } + } + + if len(errors) > 0 { + return errors[0] + } + return nil +} + +func (s *pieceTaskSyncManager) cleanStaleWorker(destPeers []*scheduler.PeerPacket_DestPeer) { + var ( + peers = map[string]bool{} + ) + for _, p := range destPeers { + peers[p.PeerId] = true + } + + // cancel old workers + if len(s.workers) != len(peers) { + for p, worker := range s.workers { + if !peers[p] { + worker.close() + } + } + } +} + +func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( + ctx context.Context, + dstPeer *scheduler.PeerPacket_DestPeer, + request *base.PieceTaskRequest) error { + + request.DstPid = dstPeer.PeerId + if worker, ok := s.workers[dstPeer.PeerId]; ok { + // clean error worker + if worker.error.Load() != nil { + delete(s.workers, dstPeer.PeerId) + } + } + + request.DstPid = dstPeer.PeerId + client, err := dfclient.SyncPieceTasks(ctx, dstPeer, request) + if err != nil { + return err + } + + // TODO the codes.Unimplemented is received only in client.Recv() + // when remove legacy get piece grpc, can move this check into synchronizer.receive + piecePacket, err := client.Recv() + if err != nil { + _ = client.CloseSend() + return err + } + + synchronizer := &pieceTaskSynchronizer{ + peerTaskConductor: s.peerTaskConductor, + pieceRequestCh: s.pieceRequestCh, + client: client, + dstPeer: dstPeer, + error: atomic.Value{}, + } + s.workers[dstPeer.PeerId] = synchronizer + go synchronizer.receive(piecePacket) + return nil +} + +func (s *pieceTaskSyncManager) newMultiPieceTaskSynchronizer( + destPeers []*scheduler.PeerPacket_DestPeer, + lastNum int32) (legacyPeers []*scheduler.PeerPacket_DestPeer) { + s.Lock() + defer s.Unlock() + for _, peer := range destPeers { + request := &base.PieceTaskRequest{ + TaskId: s.peerTaskConductor.taskID, + SrcPid: s.peerTaskConductor.peerID, + DstPid: "", + StartNum: uint32(lastNum), + Limit: 16, + } + err := s.newPieceTaskSynchronizer(s.ctx, peer, request) + if err == nil { + continue + } + // when err is codes.Unimplemented, fallback to legacy get piece grpc + stat, ok := status.FromError(err) + if ok && stat.Code() == codes.Unimplemented { + legacyPeers = append(legacyPeers, peer) + s.peerTaskConductor.Warnf("connect peer %s error: %s, fallback to legacy get piece grpc", peer.PeerId, err) + } else { + s.reportError(peer) + s.peerTaskConductor.Errorf("connect peer %s error: %s, not codes.Unimplemented, did not fallback to legacy", peer.PeerId, err) + } + } + s.cleanStaleWorker(destPeers) + return legacyPeers +} + +func compositePieceResult(peerTaskConductor *peerTaskConductor, destPeer *scheduler.PeerPacket_DestPeer) *scheduler.PieceResult { + return &scheduler.PieceResult{ + TaskId: peerTaskConductor.taskID, + SrcPid: peerTaskConductor.peerID, + DstPid: destPeer.PeerId, + PieceInfo: &base.PieceInfo{}, + Success: false, + Code: base.Code_ClientPieceRequestFail, + HostLoad: nil, + FinishedCount: peerTaskConductor.readyPieces.Settled(), + } +} + +func (s *pieceTaskSyncManager) reportError(destPeer *scheduler.PeerPacket_DestPeer) { + sendError := s.peerTaskConductor.peerPacketStream.Send(compositePieceResult(s.peerTaskConductor, destPeer)) + if sendError != nil { + s.peerTaskConductor.cancel(base.Code_SchedError, sendError.Error()) + s.peerTaskConductor.Errorf("connect peer %s failed and send piece result with error: %s", destPeer.PeerId, sendError) + } +} + +// acquire send the target piece to other peers +func (s *pieceTaskSyncManager) acquire(request *base.PieceTaskRequest) { + s.RLock() + for _, p := range s.workers { + p.acquire(request) + } + s.RUnlock() +} + +func (s *pieceTaskSyncManager) cancel() { + s.RLock() + for _, p := range s.workers { + p.close() + } + s.RUnlock() + s.ctxCancel() +} + +func (s *pieceTaskSynchronizer) close() { + if err := s.client.CloseSend(); err != nil { + s.error.Store(err) + s.peerTaskConductor.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId) + } +} + +func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePacket) { + s.peerTaskConductor.updateMetadata(piecePacket) + + pieceCount := len(piecePacket.PieceInfos) + s.peerTaskConductor.Debugf("dispatch piece request, piece count: %d, dest peer: %s", pieceCount, s.dstPeer.PeerId) + // fix cdn return zero piece info, but with total piece count and content length + if pieceCount == 0 { + finished := s.peerTaskConductor.isCompleted() + if finished { + s.peerTaskConductor.Done() + } + } + for _, piece := range piecePacket.PieceInfos { + s.peerTaskConductor.Infof("got piece %d from %s/%s, digest: %s, start: %d, size: %d, dest peer: %s", + piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize, s.dstPeer.PeerId) + // FIXME when set total piece but no total digest, fetch again + s.peerTaskConductor.requestedPiecesLock.Lock() + if !s.peerTaskConductor.requestedPieces.IsSet(piece.PieceNum) { + s.peerTaskConductor.requestedPieces.Set(piece.PieceNum) + } + s.peerTaskConductor.requestedPiecesLock.Unlock() + req := &DownloadPieceRequest{ + storage: s.peerTaskConductor.GetStorage(), + piece: piece, + log: s.peerTaskConductor.Log(), + TaskID: s.peerTaskConductor.GetTaskID(), + PeerID: s.peerTaskConductor.GetPeerID(), + DstPid: piecePacket.DstPid, + DstAddr: piecePacket.DstAddr, + } + select { + case s.pieceRequestCh <- req: + case <-s.peerTaskConductor.successCh: + s.peerTaskConductor.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) + case <-s.peerTaskConductor.failCh: + s.peerTaskConductor.Warnf("peer task fail, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) + } + } +} + +func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) { + var ( + err error + ) + s.dispatchPieceRequest(piecePacket) + for { + piecePacket, err = s.client.Recv() + if err == io.EOF { + s.peerTaskConductor.Debugf("synchronizer receives io.EOF") + return + } + if err != nil { + if s.canceled(err) { + s.peerTaskConductor.Debugf("synchronizer receives canceled") + return + } + s.error.Store(err) + s.reportError() + s.peerTaskConductor.Errorf("synchronizer receives with error: %s", err) + return + } + + s.dispatchPieceRequest(piecePacket) + } +} + +func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) { + err := s.client.Send(request) + if err != nil { + if s.canceled(err) { + s.peerTaskConductor.Debugf("synchronizer sends canceled") + return + } + s.peerTaskConductor.Errorf("synchronizer sends with error: %s", err) + s.error.Store(err) + s.reportError() + return + } +} + +func (s *pieceTaskSynchronizer) reportError() { + sendError := s.peerTaskConductor.peerPacketStream.Send(compositePieceResult(s.peerTaskConductor, s.dstPeer)) + if sendError != nil { + s.peerTaskConductor.cancel(base.Code_SchedError, sendError.Error()) + s.peerTaskConductor.Errorf("sync piece info failed and send piece result with error: %s", sendError) + } +} + +func (s *pieceTaskSynchronizer) canceled(err error) bool { + if err == context.Canceled { + s.peerTaskConductor.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId) + return true + } + if stat, ok := err.(interface{ GRPCStatus() *status.Status }); ok { + if stat.GRPCStatus().Code() == codes.Canceled { + s.peerTaskConductor.Debugf("grpc canceled, dst peer: %s", s.dstPeer.PeerId) + return true + } + } + return false +} diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index 80b93449d91..a1e7163754f 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -35,6 +35,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" @@ -46,6 +48,7 @@ import ( "d7y.io/dragonfly/v2/internal/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -93,6 +96,9 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, TotalPiece: pieceCount, }, nil }) + daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemon.Daemon_SyncPieceTasksServer) error { + return status.Error(codes.Unimplemented, "TODO") + }) ln, _ := rpc.Listen(dfnet.NetAddr{ Type: "tcp", Addr: fmt.Sprintf("0.0.0.0:%d", port), diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 0e392d636b2..745ea90b0ad 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -21,6 +21,9 @@ import ( "io" "sync" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "d7y.io/dragonfly/v2/client/daemon/peer" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -53,6 +56,9 @@ func sendExistPieces( request *base.PieceTaskRequest, sync dfdaemon.Daemon_SyncPieceTasksServer, sendMap map[int32]struct{}) (total int32, sent int, err error) { + if request.Limit <= 0 { + request.Limit = 16 + } for { pp, err := get(ctx, request) if err != nil { @@ -91,8 +97,15 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() { return } if err != nil { - s.Errorf("SyncPieceTasks receive error: %s", err) - return + stat, ok := status.FromError(err) + if !ok { + s.Errorf("SyncPieceTasks receive error: %s", err) + return + } + if stat.Code() == codes.Canceled { + s.Debugf("SyncPieceTasks canceled, exit receiving") + return + } } s.Debugf("receive request: %#v", request) pp, err := s.getPieces(s.sync.Context(), request) @@ -141,10 +154,19 @@ loop: } s.Lock() total, _, err := s.sendExistPieces(uint32(info.Num)) + if err != nil { - s.Unlock() - s.Errorf("sent exist pieces error: %s", err) - return err + stat, ok := status.FromError(err) + if !ok { + s.Unlock() + s.Errorf("sent exist pieces error: %s", err) + return err + } + if stat.Code() == codes.Canceled { + s.Debugf("SyncPieceTasks canceled, exit sending") + s.Unlock() + return nil + } } if total > -1 && s.totalPieces == -1 { s.totalPieces = total @@ -168,9 +190,17 @@ loop: } total, _, err := s.sendExistPieces(nextPieceNum) if err != nil { - s.Unlock() - s.Errorf("sent exist pieces error: %s", err) - return err + stat, ok := status.FromError(err) + if !ok { + s.Unlock() + s.Errorf("sent exist pieces error: %s", err) + return err + } + if stat.Code() == codes.Canceled { + s.Debugf("SyncPieceTasks canceled, exit sending") + s.Unlock() + return nil + } } if total > -1 && s.totalPieces == -1 { s.totalPieces = total diff --git a/client/daemon/test/mock/daemongrpc/daemon_server_grpc.go b/client/daemon/test/mock/daemongrpc/daemon_server_grpc.go new file mode 100644 index 00000000000..6f8b834778a --- /dev/null +++ b/client/daemon/test/mock/daemongrpc/daemon_server_grpc.go @@ -0,0 +1,761 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ../../../pkg/rpc/dfdaemon/dfdaemon_grpc.pb.go + +// Package mock_server_grpc is a generated GoMock package. +package mock_server_grpc + +import ( + context "context" + reflect "reflect" + + base "d7y.io/dragonfly/v2/pkg/rpc/base" + dfdaemon "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// MockDaemonClient is a mock of DaemonClient interface. +type MockDaemonClient struct { + ctrl *gomock.Controller + recorder *MockDaemonClientMockRecorder +} + +// MockDaemonClientMockRecorder is the mock recorder for MockDaemonClient. +type MockDaemonClientMockRecorder struct { + mock *MockDaemonClient +} + +// NewMockDaemonClient creates a new mock instance. +func NewMockDaemonClient(ctrl *gomock.Controller) *MockDaemonClient { + mock := &MockDaemonClient{ctrl: ctrl} + mock.recorder = &MockDaemonClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDaemonClient) EXPECT() *MockDaemonClientMockRecorder { + return m.recorder +} + +// CheckHealth mocks base method. +func (m *MockDaemonClient) CheckHealth(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CheckHealth", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CheckHealth indicates an expected call of CheckHealth. +func (mr *MockDaemonClientMockRecorder) CheckHealth(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockDaemonClient)(nil).CheckHealth), varargs...) +} + +// Download mocks base method. +func (m *MockDaemonClient) Download(ctx context.Context, in *dfdaemon.DownRequest, opts ...grpc.CallOption) (dfdaemon.Daemon_DownloadClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Download", varargs...) + ret0, _ := ret[0].(dfdaemon.Daemon_DownloadClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Download indicates an expected call of Download. +func (mr *MockDaemonClientMockRecorder) Download(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonClient)(nil).Download), varargs...) +} + +// GetPieceTasks mocks base method. +func (m *MockDaemonClient) GetPieceTasks(ctx context.Context, in *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetPieceTasks", varargs...) + ret0, _ := ret[0].(*base.PiecePacket) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPieceTasks indicates an expected call of GetPieceTasks. +func (mr *MockDaemonClientMockRecorder) GetPieceTasks(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockDaemonClient)(nil).GetPieceTasks), varargs...) +} + +// SyncPieceTasks mocks base method. +func (m *MockDaemonClient) SyncPieceTasks(ctx context.Context, opts ...grpc.CallOption) (dfdaemon.Daemon_SyncPieceTasksClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SyncPieceTasks", varargs...) + ret0, _ := ret[0].(dfdaemon.Daemon_SyncPieceTasksClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncPieceTasks indicates an expected call of SyncPieceTasks. +func (mr *MockDaemonClientMockRecorder) SyncPieceTasks(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockDaemonClient)(nil).SyncPieceTasks), varargs...) +} + +// MockDaemon_DownloadClient is a mock of Daemon_DownloadClient interface. +type MockDaemon_DownloadClient struct { + ctrl *gomock.Controller + recorder *MockDaemon_DownloadClientMockRecorder +} + +// MockDaemon_DownloadClientMockRecorder is the mock recorder for MockDaemon_DownloadClient. +type MockDaemon_DownloadClientMockRecorder struct { + mock *MockDaemon_DownloadClient +} + +// NewMockDaemon_DownloadClient creates a new mock instance. +func NewMockDaemon_DownloadClient(ctrl *gomock.Controller) *MockDaemon_DownloadClient { + mock := &MockDaemon_DownloadClient{ctrl: ctrl} + mock.recorder = &MockDaemon_DownloadClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDaemon_DownloadClient) EXPECT() *MockDaemon_DownloadClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockDaemon_DownloadClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockDaemon_DownloadClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockDaemon_DownloadClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockDaemon_DownloadClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockDaemon_DownloadClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockDaemon_DownloadClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockDaemon_DownloadClient) Recv() (*dfdaemon.DownResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*dfdaemon.DownResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockDaemon_DownloadClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m_2 *MockDaemon_DownloadClient) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockDaemon_DownloadClientMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).RecvMsg), m) +} + +// SendMsg mocks base method. +func (m_2 *MockDaemon_DownloadClient) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockDaemon_DownloadClientMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).SendMsg), m) +} + +// Trailer mocks base method. +func (m *MockDaemon_DownloadClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockDaemon_DownloadClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockDaemon_DownloadClient)(nil).Trailer)) +} + +// MockDaemon_SyncPieceTasksClient is a mock of Daemon_SyncPieceTasksClient interface. +type MockDaemon_SyncPieceTasksClient struct { + ctrl *gomock.Controller + recorder *MockDaemon_SyncPieceTasksClientMockRecorder +} + +// MockDaemon_SyncPieceTasksClientMockRecorder is the mock recorder for MockDaemon_SyncPieceTasksClient. +type MockDaemon_SyncPieceTasksClientMockRecorder struct { + mock *MockDaemon_SyncPieceTasksClient +} + +// NewMockDaemon_SyncPieceTasksClient creates a new mock instance. +func NewMockDaemon_SyncPieceTasksClient(ctrl *gomock.Controller) *MockDaemon_SyncPieceTasksClient { + mock := &MockDaemon_SyncPieceTasksClient{ctrl: ctrl} + mock.recorder = &MockDaemon_SyncPieceTasksClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDaemon_SyncPieceTasksClient) EXPECT() *MockDaemon_SyncPieceTasksClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockDaemon_SyncPieceTasksClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockDaemon_SyncPieceTasksClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockDaemon_SyncPieceTasksClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockDaemon_SyncPieceTasksClient) Recv() (*base.PiecePacket, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*base.PiecePacket) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m_2 *MockDaemon_SyncPieceTasksClient) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).RecvMsg), m) +} + +// Send mocks base method. +func (m *MockDaemon_SyncPieceTasksClient) Send(arg0 *base.PieceTaskRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockDaemon_SyncPieceTasksClient) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).SendMsg), m) +} + +// Trailer mocks base method. +func (m *MockDaemon_SyncPieceTasksClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockDaemon_SyncPieceTasksClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockDaemon_SyncPieceTasksClient)(nil).Trailer)) +} + +// MockDaemonServer is a mock of DaemonServer interface. +type MockDaemonServer struct { + ctrl *gomock.Controller + recorder *MockDaemonServerMockRecorder +} + +// MockDaemonServerMockRecorder is the mock recorder for MockDaemonServer. +type MockDaemonServerMockRecorder struct { + mock *MockDaemonServer +} + +// NewMockDaemonServer creates a new mock instance. +func NewMockDaemonServer(ctrl *gomock.Controller) *MockDaemonServer { + mock := &MockDaemonServer{ctrl: ctrl} + mock.recorder = &MockDaemonServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDaemonServer) EXPECT() *MockDaemonServerMockRecorder { + return m.recorder +} + +// CheckHealth mocks base method. +func (m *MockDaemonServer) CheckHealth(arg0 context.Context, arg1 *emptypb.Empty) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckHealth", arg0, arg1) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CheckHealth indicates an expected call of CheckHealth. +func (mr *MockDaemonServerMockRecorder) CheckHealth(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockDaemonServer)(nil).CheckHealth), arg0, arg1) +} + +// Download mocks base method. +func (m *MockDaemonServer) Download(arg0 *dfdaemon.DownRequest, arg1 dfdaemon.Daemon_DownloadServer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Download", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Download indicates an expected call of Download. +func (mr *MockDaemonServerMockRecorder) Download(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonServer)(nil).Download), arg0, arg1) +} + +// GetPieceTasks mocks base method. +func (m *MockDaemonServer) GetPieceTasks(arg0 context.Context, arg1 *base.PieceTaskRequest) (*base.PiecePacket, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPieceTasks", arg0, arg1) + ret0, _ := ret[0].(*base.PiecePacket) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPieceTasks indicates an expected call of GetPieceTasks. +func (mr *MockDaemonServerMockRecorder) GetPieceTasks(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockDaemonServer)(nil).GetPieceTasks), arg0, arg1) +} + +// SyncPieceTasks mocks base method. +func (m *MockDaemonServer) SyncPieceTasks(arg0 dfdaemon.Daemon_SyncPieceTasksServer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncPieceTasks", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncPieceTasks indicates an expected call of SyncPieceTasks. +func (mr *MockDaemonServerMockRecorder) SyncPieceTasks(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockDaemonServer)(nil).SyncPieceTasks), arg0) +} + +// mustEmbedUnimplementedDaemonServer mocks base method. +func (m *MockDaemonServer) mustEmbedUnimplementedDaemonServer() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "mustEmbedUnimplementedDaemonServer") +} + +// mustEmbedUnimplementedDaemonServer indicates an expected call of mustEmbedUnimplementedDaemonServer. +func (mr *MockDaemonServerMockRecorder) mustEmbedUnimplementedDaemonServer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedDaemonServer", reflect.TypeOf((*MockDaemonServer)(nil).mustEmbedUnimplementedDaemonServer)) +} + +// MockUnsafeDaemonServer is a mock of UnsafeDaemonServer interface. +type MockUnsafeDaemonServer struct { + ctrl *gomock.Controller + recorder *MockUnsafeDaemonServerMockRecorder +} + +// MockUnsafeDaemonServerMockRecorder is the mock recorder for MockUnsafeDaemonServer. +type MockUnsafeDaemonServerMockRecorder struct { + mock *MockUnsafeDaemonServer +} + +// NewMockUnsafeDaemonServer creates a new mock instance. +func NewMockUnsafeDaemonServer(ctrl *gomock.Controller) *MockUnsafeDaemonServer { + mock := &MockUnsafeDaemonServer{ctrl: ctrl} + mock.recorder = &MockUnsafeDaemonServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockUnsafeDaemonServer) EXPECT() *MockUnsafeDaemonServerMockRecorder { + return m.recorder +} + +// mustEmbedUnimplementedDaemonServer mocks base method. +func (m *MockUnsafeDaemonServer) mustEmbedUnimplementedDaemonServer() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "mustEmbedUnimplementedDaemonServer") +} + +// mustEmbedUnimplementedDaemonServer indicates an expected call of mustEmbedUnimplementedDaemonServer. +func (mr *MockUnsafeDaemonServerMockRecorder) mustEmbedUnimplementedDaemonServer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedDaemonServer", reflect.TypeOf((*MockUnsafeDaemonServer)(nil).mustEmbedUnimplementedDaemonServer)) +} + +// MockDaemon_DownloadServer is a mock of Daemon_DownloadServer interface. +type MockDaemon_DownloadServer struct { + ctrl *gomock.Controller + recorder *MockDaemon_DownloadServerMockRecorder +} + +// MockDaemon_DownloadServerMockRecorder is the mock recorder for MockDaemon_DownloadServer. +type MockDaemon_DownloadServerMockRecorder struct { + mock *MockDaemon_DownloadServer +} + +// NewMockDaemon_DownloadServer creates a new mock instance. +func NewMockDaemon_DownloadServer(ctrl *gomock.Controller) *MockDaemon_DownloadServer { + mock := &MockDaemon_DownloadServer{ctrl: ctrl} + mock.recorder = &MockDaemon_DownloadServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDaemon_DownloadServer) EXPECT() *MockDaemon_DownloadServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockDaemon_DownloadServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockDaemon_DownloadServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).Context)) +} + +// RecvMsg mocks base method. +func (m_2 *MockDaemon_DownloadServer) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockDaemon_DownloadServerMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).RecvMsg), m) +} + +// Send mocks base method. +func (m *MockDaemon_DownloadServer) Send(arg0 *dfdaemon.DownResult) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockDaemon_DownloadServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *MockDaemon_DownloadServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockDaemon_DownloadServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockDaemon_DownloadServer) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockDaemon_DownloadServerMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).SendMsg), m) +} + +// SetHeader mocks base method. +func (m *MockDaemon_DownloadServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockDaemon_DownloadServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockDaemon_DownloadServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockDaemon_DownloadServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockDaemon_DownloadServer)(nil).SetTrailer), arg0) +} + +// MockDaemon_SyncPieceTasksServer is a mock of Daemon_SyncPieceTasksServer interface. +type MockDaemon_SyncPieceTasksServer struct { + ctrl *gomock.Controller + recorder *MockDaemon_SyncPieceTasksServerMockRecorder +} + +// MockDaemon_SyncPieceTasksServerMockRecorder is the mock recorder for MockDaemon_SyncPieceTasksServer. +type MockDaemon_SyncPieceTasksServerMockRecorder struct { + mock *MockDaemon_SyncPieceTasksServer +} + +// NewMockDaemon_SyncPieceTasksServer creates a new mock instance. +func NewMockDaemon_SyncPieceTasksServer(ctrl *gomock.Controller) *MockDaemon_SyncPieceTasksServer { + mock := &MockDaemon_SyncPieceTasksServer{ctrl: ctrl} + mock.recorder = &MockDaemon_SyncPieceTasksServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDaemon_SyncPieceTasksServer) EXPECT() *MockDaemon_SyncPieceTasksServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockDaemon_SyncPieceTasksServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).Context)) +} + +// Recv mocks base method. +func (m *MockDaemon_SyncPieceTasksServer) Recv() (*base.PieceTaskRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*base.PieceTaskRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m_2 *MockDaemon_SyncPieceTasksServer) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).RecvMsg), m) +} + +// Send mocks base method. +func (m *MockDaemon_SyncPieceTasksServer) Send(arg0 *base.PiecePacket) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *MockDaemon_SyncPieceTasksServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockDaemon_SyncPieceTasksServer) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).SendMsg), m) +} + +// SetHeader mocks base method. +func (m *MockDaemon_SyncPieceTasksServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockDaemon_SyncPieceTasksServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockDaemon_SyncPieceTasksServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockDaemon_SyncPieceTasksServer)(nil).SetTrailer), arg0) +}