Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
active supernode send dfget request to standby supernode
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyangbuaa <[email protected]>
  • Loading branch information
yunfeiyanggzq committed Jul 31, 2019
1 parent eeb9bbb commit 0a51815
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
8 changes: 7 additions & 1 deletion supernode/daemon/mgr/ha/etcd_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (etcd *EtcdMgr) Close() error {

// SendStandbySupernodesInfo sends standby supernode's info to etcd.and if the standby supernode if off,the etcd will will notice that.
func (etcd *EtcdMgr) SendStandbySupernodesInfo(key string, value string, timeout int64) error {
var respchan <-chan *clientv3.LeaseKeepAliveResponse
kv := clientv3.NewKV(etcd.client)
lease := clientv3.NewLease(etcd.client)
leaseResp, e := lease.Grant(context.TODO(), timeout)
Expand All @@ -172,10 +173,15 @@ func (etcd *EtcdMgr) SendStandbySupernodesInfo(key string, value string, timeout
return e
}
etcd.standbyLeaseResp = leaseResp
if _, e := lease.KeepAlive(context.TODO(), leaseResp.ID); e != nil {
if respchan, e = lease.KeepAlive(context.TODO(), leaseResp.ID); e != nil {
logrus.Errorf("failed to send heart beat to etcd to renew the lease %v", e)
return e
}
go func() {
for {
<-respchan
}
}()
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions supernode/daemon/mgr/ha/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func (ha *Manager) SendGetCopy(params string) error {
e := ha.copyAPI.Get(urlCopy, "")
if e != nil {
logrus.Errorf("failed to send get copy,err: %v", e)
return e
}
}
return nil
Expand All @@ -151,7 +150,6 @@ func (ha *Manager) SendPostCopy(req interface{}, path string) error {
_, _, e := ha.copyAPI.Post(url, req, 5*time.Second)
if e != nil {
logrus.Errorf("failed to send post copy,err: %v", e)
return e
}
}
return nil
Expand Down
18 changes: 17 additions & 1 deletion supernode/server/0.3_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/dragonflyoss/Dragonfly/apis/types"
Expand Down Expand Up @@ -52,7 +53,11 @@ var resultMap = map[string]string{

func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
reader := req.Body
fmt.Println("url:", req.URL)
request := &types.TaskRegisterRequest{}
if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive {
s.HaMgr.SendPostCopy(request, "/peer/registry")
}
if err := json.NewDecoder(reader).Decode(request); err != nil {
return errors.Wrap(errTypes.ErrInvalidValue, err.Error())
}
Expand Down Expand Up @@ -106,10 +111,13 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http
}

func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive {
s.HaMgr.SendGetCopy(req.URL.String())
}
params := req.URL.Query()
taskID := params.Get("taskId")
srcCID := params.Get("srcCid")

fmt.Println("url:", req.URL)
request := &types.PiecePullRequest{
DfgetTaskStatus: statusMap[params.Get("status")],
PieceRange: params.Get("range"),
Expand Down Expand Up @@ -178,7 +186,11 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req
}

func (s *Server) reportPiece(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive {
s.HaMgr.SendGetCopy(req.URL.String())
}
params := req.URL.Query()
fmt.Println("url:", req.URL)
taskID := params.Get("taskId")
srcCID := params.Get("cid")
dstCID := params.Get("dstCid")
Expand Down Expand Up @@ -206,6 +218,10 @@ func (s *Server) reportPiece(ctx context.Context, rw http.ResponseWriter, req *h
}

func (s *Server) reportServiceDown(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
if s.HaMgr.GetSupernodeStatus() == constants.SupernodeUseHaActive {
s.HaMgr.SendGetCopy(req.URL.String())
}
fmt.Println("url:", req.URL)
params := req.URL.Query()
taskID := params.Get("taskId")
cID := params.Get("cid")
Expand Down

0 comments on commit 0a51815

Please sign in to comment.