From f43bd2d976ee8b6900e9a83913b65cb8bb873249 Mon Sep 17 00:00:00 2001 From: yunfeiyangbuaa Date: Thu, 1 Aug 2019 14:29:34 +0800 Subject: [PATCH] implement ha Signed-off-by: yunfeiyangbuaa --- .../downloader/p2p_downloader/p2p_downloader.go | 15 +++++---------- dfget/core/regist/register.go | 4 +--- dfget/core/regist/register_test.go | 2 +- supernode/daemon/daemon.go | 2 +- supernode/server/0.3_bridge.go | 10 ++++------ 5 files changed, 12 insertions(+), 21 deletions(-) diff --git a/dfget/core/downloader/p2p_downloader/p2p_downloader.go b/dfget/core/downloader/p2p_downloader/p2p_downloader.go index a77510c8f..a65a8b972 100644 --- a/dfget/core/downloader/p2p_downloader/p2p_downloader.go +++ b/dfget/core/downloader/p2p_downloader/p2p_downloader.go @@ -239,7 +239,6 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) ( time.Sleep(sleepTime) } - fmt.Println("pull task resp:",res) // FIXME: try to abstract the judgement to make it more readable. if res != nil && !(res.Code != constants.CodePeerContinue && res.Code != constants.CodePeerFinish && @@ -261,32 +260,28 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) ( util.Printer.Println("migrated to node:" + item.SuperNode) } else { item.SuperNode = p2p.findHaActiveSupernode() - fmt.Println("zhege") } return p2p.pullPieceTask(item) } //Find active supernode by ping supernodes func (p2p *P2PDownloader) findHaActiveSupernode() string { - fmt.Println("nodes",p2p.cfg.Node) + fmt.Println("nodes", p2p.cfg.Node) nodes, nLen := p2p.cfg.Node, len(p2p.cfg.Node) time.Sleep(time.Millisecond * 500) for i := 0; i < nLen; i++ { time.Sleep(time.Millisecond * 100) - url := fmt.Sprintf("%s://%s%s","http", nodes[i],"/_ping") - if e :=p2p.API.Get(url,"");e!=nil{ - fmt.Println("find new supernode!!!!!!!!!!!!!!",nodes[i]) + url := fmt.Sprintf("%s://%s%s", "http", nodes[i], "/_ping") + if e := p2p.API.Get(url, ""); e != nil { + fmt.Println("find new supernode", nodes[i]) return nodes[i] } continue } - fmt.Println("not find new supernpde,return the first nodes !!!!!!!!!!!!!!") + fmt.Println("can't find any active supernode,return the first supernode") return nodes[0] } - - - // getPullRate get download rate limit dynamically. func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueData) { if time.Since(p2p.pullRateTime).Seconds() < 3 { diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index f4174f8e9..f8a84df83 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -17,7 +17,6 @@ package regist import ( - "fmt" "os" "time" @@ -89,8 +88,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errors.DfE } result := NewRegisterResult(nodes[i], s.cfg.Node, s.cfg.URL, - resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize,resp.Data.UseHa) - fmt.Println("register result:",result) + resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.UseHa) logrus.Infof("do register result:%s and cost:%.3fs", resp, time.Since(start).Seconds()) return result, nil diff --git a/dfget/core/regist/register_test.go b/dfget/core/regist/register_test.go index 7b3acfa72..84486c9af 100644 --- a/dfget/core/regist/register_test.go +++ b/dfget/core/regist/register_test.go @@ -57,7 +57,7 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) { func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) { result := NewRegisterResult("node", []string{"1"}, "url", "taskID", - 10, 1) + 10, 1, false) c.Assert(result.Node, check.Equals, "node") c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"}) c.Assert(result.URL, check.Equals, "url") diff --git a/supernode/daemon/daemon.go b/supernode/daemon/daemon.go index 11dbafa66..8ed84fc00 100644 --- a/supernode/daemon/daemon.go +++ b/supernode/daemon/daemon.go @@ -124,7 +124,7 @@ func (d *Daemon) Run() error { case constants.SupernodeUsehakill: fmt.Println("the supernode uses ha, kill itself") - d.server.HaMgr.StopSendHeartBeat("standby") + d.server.HaMgr.StopSendHeartBeat("standby") if d.server.ServerPort != server.NoServerPortOpen { d.server.Close() } diff --git a/supernode/server/0.3_bridge.go b/supernode/server/0.3_bridge.go index 3fbaf85d1..0c12fb836 100644 --- a/supernode/server/0.3_bridge.go +++ b/supernode/server/0.3_bridge.go @@ -56,9 +56,6 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http reader := req.Body fmt.Println("url:", req.URL) request := &types.TaskRegisterRequest{} - if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive { - s.HaMgr.SendPostCopy(request, "/peer/registry") - } if err := json.NewDecoder(reader).Decode(request); err != nil { return errors.Wrap(errTypes.ErrInvalidValue, err.Error()) } @@ -66,7 +63,9 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http if err := request.Validate(strfmt.NewFormats()); err != nil { return errors.Wrap(errTypes.ErrInvalidValue, err.Error()) } - + if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive { + s.HaMgr.SendPostCopy(request, "/peer/registry") + } peerCreateRequest := &types.PeerCreateRequest{ IP: request.IP, HostName: strfmt.Hostname(request.HostName), @@ -120,7 +119,6 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req params := req.URL.Query() taskID := params.Get("taskId") srcCID := params.Get("srcCid") - fmt.Println("url:", req.URL) request := &types.PiecePullRequest{ DfgetTaskStatus: statusMap[params.Get("status")], PieceRange: params.Get("range"), @@ -182,7 +180,7 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req Path: v.Path, }) } - fmt.Println("处理了",datas) + fmt.Println("url:", req.URL) return EncodeResponse(rw, http.StatusOK, &types.ResultInfo{ Code: constants.CodePeerContinue, Data: datas,