Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add a cli config
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyangbuaa <[email protected]>
  • Loading branch information
yunfeiyanggzq committed Jul 31, 2019
1 parent 7bfa08f commit f0e31cd
Show file tree
Hide file tree
Showing 17 changed files with 817 additions and 26 deletions.
9 changes: 9 additions & 0 deletions cmd/supernode/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func setupFlags(cmd *cobra.Command, opt *Options) {

flagSet.StringVar(&opt.AdvertiseIP, "advertise-ip", "",
"the supernode ip that we advertise to other peer in the p2p-network")

flagSet.BoolVarP(&opt.UseHA, "use-ha", "H", opt.UseHA,
"set whether to use supernode HA")

flagSet.StringSliceVar(&opt.HAConfig, "etcd-address", opt.HAConfig,
"if you use supernode HA,you should set the etcd address to implement ha")

flagSet.IntVar(&opt.HAStandbyPort, "standby-port", opt.HAStandbyPort,
"if you use supernode HA,you should set the standby port to implement ha")
}

// runSuperNode prepares configs, setups essential details and runs supernode daemon.
Expand Down
16 changes: 16 additions & 0 deletions common/constants/dfget_super_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ const (
CodeNeedAuth = 608
CodeWaitAuth = 609
CodeSourceError = 610
CodeGetPieceReport = 611
CodeGetPeerDown = 612
)

/* the code of task result that dfget will report to supernode */
Expand All @@ -95,3 +97,17 @@ const (
ClientErrorFileNotExist = "FILE_NOT_EXIST"
ClientErrorFileMd5NotMatch = "FILE_MD5_NOT_MATCH"
)

/*the code od supernode ha status*/
const (
// SupernodeUseHaFalse means supernode don't use ha
SupernodeUseHaFalse = 900
// SupernodeUseHaInit means supernode use ha and the status is init
SupernodeUseHaInit = 901
// SupernodeUseHaStandby means supernode use ha and the status is standby
SupernodeUseHaStandby = 902
// SupernodeUseHaActive means supernode use ha and the status is active
SupernodeUseHaActive = 903
// SupernodeUsehakill means supernode use ha and this supernode give up iys active status
SupernodeUsehakill = 904
)
40 changes: 33 additions & 7 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"time"

"github.com/dragonflyoss/Dragonfly/common/constants"
"github.com/dragonflyoss/Dragonfly/common/util"
"github.com/dragonflyoss/Dragonfly/dfget/types"

"github.com/sirupsen/logrus"
)

/* the url paths of supernode APIs*/
Expand Down Expand Up @@ -50,6 +53,8 @@ type SupernodeAPI interface {
ReportPiece(node string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
Get(url string, resp interface{}) error
Post(url string, body interface{}, timeout time.Duration) (code int, res []byte, e error)
}

type supernodeAPI struct {
Expand Down Expand Up @@ -77,7 +82,9 @@ func (api *supernodeAPI) Register(node string, req *types.RegisterRequest) (
return nil, fmt.Errorf("%d:%s", code, body)
}
resp = new(types.RegisterResponse)
e = json.Unmarshal(body, resp)
if e = json.Unmarshal(body, resp); e != nil {
return nil, e
}
return resp, e
}

Expand All @@ -90,7 +97,9 @@ func (api *supernodeAPI) PullPieceTask(node string, req *types.PullPieceTaskRequ
api.Scheme, node, peerPullPieceTaskPath, util.ParseQuery(req))

resp = new(types.PullPieceTaskResponse)
e = api.get(url, resp)
if e = api.Get(url, resp); e != nil {
return nil, e
}
return
}

Expand All @@ -100,9 +109,14 @@ func (api *supernodeAPI) ReportPiece(node string, req *types.ReportPieceRequest)

url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerReportPiecePath, util.ParseQuery(req))

resp = new(types.BaseResponse)
e = api.get(url, resp)
if e = api.Get(url, resp); e != nil {
logrus.Errorf("failed to report piece{taskid:%s,range:%s},err: %v", req.TaskID, req.PieceRange, e)
return nil, e
}
if resp != nil && resp.Code != constants.CodeGetPieceReport {
logrus.Errorf("failed to report piece{taskid:%s,range:%s} to supernode: api response code is %d not equal to %d", req.TaskID, req.PieceRange, resp.Code, constants.CodeGetPieceReport)
}
return
}

Expand All @@ -114,7 +128,13 @@ func (api *supernodeAPI) ServiceDown(node string, taskID string, cid string) (
api.Scheme, node, peerServiceDownPath, taskID, cid)

resp = new(types.BaseResponse)
e = api.get(url, resp)
if e = api.Get(url, resp); e != nil {
logrus.Errorf("failed to send service down,err: %v", e)
return nil, e
}
if resp != nil && resp.Code != constants.CodeGetPeerDown {
logrus.Errorf("failed to send service down to supernode: api response code is %d not equal to %d", resp.Code, constants.CodeGetPeerDown)
}
return
}

Expand All @@ -126,11 +146,11 @@ func (api *supernodeAPI) ReportClientError(node string, req *types.ClientErrorRe
api.Scheme, node, peerClientErrorPath, util.ParseQuery(req))

resp = new(types.BaseResponse)
e = api.get(url, resp)
e = api.Get(url, resp)
return
}

func (api *supernodeAPI) get(url string, resp interface{}) error {
func (api *supernodeAPI) Get(url string, resp interface{}) error {
var (
code int
body []byte
Expand All @@ -140,11 +160,17 @@ func (api *supernodeAPI) get(url string, resp interface{}) error {
return fmt.Errorf("invalid url")
}
if code, body, e = api.HTTPClient.Get(url, api.Timeout); e != nil {

return e
}
if !util.HTTPStatusOk(code) {

return fmt.Errorf("%d:%s", code, body)
}
e = json.Unmarshal(body, resp)
return e
}

func (api *supernodeAPI) Post(url string, body interface{}, timeout time.Duration) (code int, res []byte, e error) {
return api.HTTPClient.PostJSON(url, body, api.Timeout)
}
13 changes: 8 additions & 5 deletions dfget/core/api/supernode_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_PullPieceTask(c *check.C) {

func (s *SupernodeAPITestSuite) TestSupernodeAPI_ReportPiece(c *check.C) {
ip := "127.0.0.1"

req := &types.ReportPieceRequest{
TaskID: "sssss",
PieceRange: "0-11",
}
s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":700}`), nil)
r, e := s.api.ReportPiece(ip, nil)
r, e := s.api.ReportPiece(ip, req)
c.Check(e, check.IsNil)
c.Check(r.Code, check.Equals, 700)
}
Expand All @@ -125,7 +128,7 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_ReportClientError(c *check.C) {
c.Check(r.Code, check.Equals, 700)
}

func (s *SupernodeAPITestSuite) TestSupernodeAPI_get(c *check.C) {
func (s *SupernodeAPITestSuite) TestSupernodeAPI_Get(c *check.C) {
type testRes struct {
A int
}
Expand All @@ -135,7 +138,7 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_get(c *check.C) {
s.mock.GetFunc = s.mock.CreateGetFunc(code, []byte(res), e)
msg := fmt.Sprintf("code:%d res:%s e:%v", code, res, e)
resp := new(testRes)
err := api.get("http://localhost", resp)
err := api.Get("http://localhost", resp)
return resp, err, msg
}

Expand All @@ -155,7 +158,7 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_get(c *check.C) {
c.Assert(r.A, check.Equals, 1, check.Commentf(m))
c.Assert(e, check.IsNil, check.Commentf(m))

e = api.get("", nil)
e = api.Get("", nil)
c.Assert(e.Error(), check.Equals, "invalid url")
}

Expand Down
25 changes: 25 additions & 0 deletions dfget/core/helper/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path"
"time"

"github.com/dragonflyoss/Dragonfly/common/constants"
"github.com/dragonflyoss/Dragonfly/common/util"
Expand Down Expand Up @@ -109,17 +110,41 @@ type ServiceDownFuncType func(ip string, taskID string, cid string) (*types.Base
// ClientErrorFuncType function type of SupernodeAPI#ReportClientError
type ClientErrorFuncType func(ip string, req *types.ClientErrorRequest) (*types.BaseResponse, error)

// GetFuncType function type of SupernodeAPI#Get
type GetFuncType func(url string, resp interface{}) error

// PostFuncType function type of SupernodeAPI#Post
type PostFuncType func(url string, body interface{}, timeout time.Duration) (code int, res []byte, e error)

// MockSupernodeAPI mock SupernodeAPI
type MockSupernodeAPI struct {
RegisterFunc RegisterFuncType
PullFunc PullFuncType
ReportFunc ReportFuncType
ServiceDownFunc ServiceDownFuncType
ClientErrorFunc ClientErrorFuncType
GetFunc GetFuncType
PostFunc PostFuncType
}

var _ api.SupernodeAPI = &MockSupernodeAPI{}

// Post implement SupernodeAPI#Post
func (m *MockSupernodeAPI) Post(url string, body interface{}, timeout time.Duration) (code int, res []byte, e error) {
if m.GetFunc != nil {
return m.PostFunc(url, body, timeout)
}
return 200, nil, nil
}

// Get implement SupernodeAPI#Get
func (m *MockSupernodeAPI) Get(url string, resp interface{}) error {
if m.GetFunc != nil {
return m.GetFunc(url, resp)
}
return nil
}

// Register implements SupernodeAPI#Register
func (m *MockSupernodeAPI) Register(ip string, req *types.RegisterRequest) (
*types.RegisterResponse, error) {
Expand Down
Binary file added docs/images/supernode_ha.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
67 changes: 67 additions & 0 deletions docs/user_guide/supernode_ha.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Supernode High Availability

This doc contains all the design and usage document of supernode HA.

## Two Goals Need to be Achieved

To implement HA,we must guarantee the following two goals to be achieved:

- Leader election: If the active supernode breaks down,the HA implement should elect another active supernode from standby supernodes.And it is important to consider how to avoid the brain split problem.
- Active and standby node synchronization: Because the supernode is stateful, we should keep the standby supernode’s status and active supernode’s status constant,otherwise the standby supernode can not take over present work after bean activated.

## Tool Introduction

We can use distributed key-value store system such as etcd,zookeeper,consul and so on to achieve leader election,which have been applied in the many HA implements like hadoop,spark.

Let’s focus on etcd because we will use it to construct our plan. etcd is a strongly consistent,distributed key-value store that provides a reliable way to store data that needs to
be accessed by a distributed system or cluster of machines. It gracefully handles leader elections during network partitions and can tolerate machine failure, even in the leader node.

## HA Design

Below is a diagram illustrating that how to implement HA.

![ha_design.png](../images/supernode_ha.png)

### Leader Election

Every supernode has four status,they are init,standby,active,kill_itself and only the ha_mgr can change the status.If supernode don't use HA,it will change from init to active directly.
If the supernode applies HA.it will setup by following steps:

- First: Every supernode will try to get a distributed lock(go.etcd.io/etcd/clientv3.Txn) in etcd.On success,it will return a unique key that exists so long as the lock is held by the caller.Supernode should keep a lease(go.etcd.io/etcd/clientv3.lease) alive by periodically sending keep-alive messages otherwise the lease will expire thus all resources (such as key-values and locks) it previously attached to will expire.The lock is held until Unlock is called on the key or the lease associate with the owner expires.Multiple candidates are requiring for the same lock while each has attached their lock request with its own lease id.The one who successfully gets the lock turns to be the leader.Once the leader loses its connection with etcd server, its lock will be revoked.As a result, another candidate who has a valid lease will get the lock and thus turn to be the leader.
- Second: Every supernode will get the result after the election.Every supernode will be informed who is active supernode.If the active supernode is itself,it will change its status from init to active,Otherwise,it will change its status from init to standby and do these two work:
- change its status from init to standby.
- create a etcd watch on the lock owned by active supernode,so that if active supernode loses the lock,it can be notified and take part in next election.
- Third:Standby supernode will create a lease on etcd and store its info(ip and standby listen port) in the form of key-value.if it beaks down,the information in etcd will disappear because of etcd lease.
- Fourth:active supernode will create a etcd watch on these keys.Active supernode will know every standby supernode's info.So standby supernode can take part in the ha system dynamically.If a standby supernode break down,active supernode will be notified immediately with the help of etcd lease and watch.

### Active and Standby Node Synchronization

We implement status synchronization as follows:

- First: Every supernode has two listen port,one for active and one for standby.If supernode is active,it use active port to provide service for dfget.If the supernode is standby,it open standby port only to receive request from active supernode.Dfget only know active supernode,which means standby supernode is unachievable for dfget.
- Second: Active supernode receives dfget's request and send all these request to standby supernodes,standby supernode will do with these request just like active supernode,which means standby supernodes's status is the same as the status of active supernode.
- Third: If Active supernode breaks down,dfget will try to ping every supernodes he knows.After standby supernode takes over work and open active port,dfget will success and go on previous work.

### Virtual IP

Dfget can use the supernodes cluster's address instead of every supernode's ip to download document.

We can use nginx to implement this.these work is in progress.

## Usage

We can use these cli commend to config supernode HA:

```
-H, --use-ha set whether to use supernode HA
--etcd-address strings if you use supernode HA,you should set the etcd address to implement ha (default [127.0.0.1:2379])
--standby-port int if you use supernode HA,you should set the standby port to implement ha (default 8003)
```

we can use these commend to deploy a supernodes HA cluster:

```sh
supernode --home-dir /home/admin/supernode --port=8002 --download-port=8001 --advertise-ip=127.0.0.1 -H --standby-port 8003 --etcd-address 127.0.0.1:2379
supernode --home-dir /home/admin/supernode --port=8004 --download-port=8001 --advertise-ip=127.0.0.2 -H --standby-port 8005 --etcd-address 127.0.0.1:2379
supernode --home-dir /home/admin/supernode --port=8006 --download-port=8001 --advertise-ip=127.0.0.3 -H --standby-port 8007 --etcd-address 127.0.0.1:2379
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/stretchr/testify v1.2.2
github.com/valyala/fasthttp v1.3.0
github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9
go.etcd.io/etcd v3.3.13+incompatible
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect
gopkg.in/warnings.v0 v0.1.2
Expand Down
Loading

0 comments on commit f0e31cd

Please sign in to comment.