Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
implement ha
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyangbuaa <[email protected]>
  • Loading branch information
yunfeiyanggzq committed Aug 1, 2019
1 parent e587b38 commit f43bd2d
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 21 deletions.
15 changes: 5 additions & 10 deletions dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
time.Sleep(sleepTime)
}

fmt.Println("pull task resp:",res)
// FIXME: try to abstract the judgement to make it more readable.
if res != nil && !(res.Code != constants.CodePeerContinue &&
res.Code != constants.CodePeerFinish &&
Expand All @@ -261,32 +260,28 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
util.Printer.Println("migrated to node:" + item.SuperNode)
} else {
item.SuperNode = p2p.findHaActiveSupernode()
fmt.Println("zhege")
}
return p2p.pullPieceTask(item)
}

//Find active supernode by ping supernodes
func (p2p *P2PDownloader) findHaActiveSupernode() string {
fmt.Println("nodes",p2p.cfg.Node)
fmt.Println("nodes", p2p.cfg.Node)
nodes, nLen := p2p.cfg.Node, len(p2p.cfg.Node)
time.Sleep(time.Millisecond * 500)
for i := 0; i < nLen; i++ {
time.Sleep(time.Millisecond * 100)
url := fmt.Sprintf("%s://%s%s","http", nodes[i],"/_ping")
if e :=p2p.API.Get(url,"");e!=nil{
fmt.Println("find new supernode!!!!!!!!!!!!!!",nodes[i])
url := fmt.Sprintf("%s://%s%s", "http", nodes[i], "/_ping")
if e := p2p.API.Get(url, ""); e != nil {
fmt.Println("find new supernode", nodes[i])
return nodes[i]
}
continue
}
fmt.Println("not find new supernpde,return the first nodes !!!!!!!!!!!!!!")
fmt.Println("can't find any active supernode,return the first supernode")
return nodes[0]
}




// getPullRate get download rate limit dynamically.
func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueData) {
if time.Since(p2p.pullRateTime).Seconds() < 3 {
Expand Down
4 changes: 1 addition & 3 deletions dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package regist

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -89,8 +88,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errors.DfE
}

result := NewRegisterResult(nodes[i], s.cfg.Node, s.cfg.URL,
resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize,resp.Data.UseHa)
fmt.Println("register result:",result)
resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.UseHa)
logrus.Infof("do register result:%s and cost:%.3fs", resp,
time.Since(start).Seconds())
return result, nil
Expand Down
2 changes: 1 addition & 1 deletion dfget/core/regist/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) {

func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) {
result := NewRegisterResult("node", []string{"1"}, "url", "taskID",
10, 1)
10, 1, false)
c.Assert(result.Node, check.Equals, "node")
c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"})
c.Assert(result.URL, check.Equals, "url")
Expand Down
2 changes: 1 addition & 1 deletion supernode/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (d *Daemon) Run() error {

case constants.SupernodeUsehakill:
fmt.Println("the supernode uses ha, kill itself")
d.server.HaMgr.StopSendHeartBeat("standby")
d.server.HaMgr.StopSendHeartBeat("standby")
if d.server.ServerPort != server.NoServerPortOpen {
d.server.Close()
}
Expand Down
10 changes: 4 additions & 6 deletions supernode/server/0.3_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,16 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http
reader := req.Body
fmt.Println("url:", req.URL)
request := &types.TaskRegisterRequest{}
if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive {
s.HaMgr.SendPostCopy(request, "/peer/registry")
}
if err := json.NewDecoder(reader).Decode(request); err != nil {
return errors.Wrap(errTypes.ErrInvalidValue, err.Error())
}

if err := request.Validate(strfmt.NewFormats()); err != nil {
return errors.Wrap(errTypes.ErrInvalidValue, err.Error())
}

if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive {
s.HaMgr.SendPostCopy(request, "/peer/registry")
}
peerCreateRequest := &types.PeerCreateRequest{
IP: request.IP,
HostName: strfmt.Hostname(request.HostName),
Expand Down Expand Up @@ -120,7 +119,6 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req
params := req.URL.Query()
taskID := params.Get("taskId")
srcCID := params.Get("srcCid")
fmt.Println("url:", req.URL)
request := &types.PiecePullRequest{
DfgetTaskStatus: statusMap[params.Get("status")],
PieceRange: params.Get("range"),
Expand Down Expand Up @@ -182,7 +180,7 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req
Path: v.Path,
})
}
fmt.Println("处理了",datas)
fmt.Println("url:", req.URL)
return EncodeResponse(rw, http.StatusOK, &types.ResultInfo{
Code: constants.CodePeerContinue,
Data: datas,
Expand Down

0 comments on commit f43bd2d

Please sign in to comment.