diff --git a/apis/types/peer_create_request.go b/apis/types/peer_create_request.go index 28d206e34..4f9d5a52b 100644 --- a/apis/types/peer_create_request.go +++ b/apis/types/peer_create_request.go @@ -39,6 +39,9 @@ type PeerCreateRequest struct { // version number of dfget binary. Version string `json:"version,omitempty"` + + // PeerID keeps peerID same between supernodes for HA.we should make every peer's peerID consistent + PeerID string `json:"peerID,omitempty"` } // Validate validates this peer create request diff --git a/apis/types/piece_pull_request.go b/apis/types/piece_pull_request.go index 94879047d..ce72c24dd 100644 --- a/apis/types/piece_pull_request.go +++ b/apis/types/piece_pull_request.go @@ -37,6 +37,9 @@ type PiecePullRequest struct { // // Enum: [FAILED SUCCESS INVALID SEMISUC] PieceResult string `json:"pieceResult,omitempty"` + + // DstCid is the id of client for HA + DstCid string `json:"dstCid"` } // Validate validates this piece pull request diff --git a/apis/types/piece_update_request.go b/apis/types/piece_update_request.go index 9620cf289..794ed2864 100644 --- a/apis/types/piece_update_request.go +++ b/apis/types/piece_update_request.go @@ -31,6 +31,15 @@ type PieceUpdateRequest struct { // // Enum: [FAILED SUCCESS INVALID SEMISUC] PieceStatus string `json:"pieceStatus,omitempty"` + + // Dst peer's cid for HA + DstCid string `json:"dstCid,omitempty"` + + // whether send the request to other supernode for HA + SendCopy bool `json:"sendCopy,omitempty"` + + // which supernode to send request copy for HA + SendCopyPeerID string `json:"sendCopyPeerID,omitempty"` } // Validate validates this piece update request diff --git a/apis/types/task_info.go b/apis/types/task_info.go index f88a43985..9a8418a4f 100644 --- a/apis/types/task_info.go +++ b/apis/types/task_info.go @@ -82,6 +82,12 @@ type TaskInfo struct { // --filter parameter of dfget. The usage of it is that different rawURL may generate the same taskID. // TaskURL string `json:"taskURL,omitempty"` + + // CDNPeerID marks which supernode the file in. + CDNPeerID string `json:"cdnPeerID,omitempty"` + + // NotifySupernodesPID record supernodes should notify after updateTaskInfo + NotifySupernodesPID []string `json:"notifySupernodesPID"` } // Validate validates this task info diff --git a/apis/types/task_register_request.go b/apis/types/task_register_request.go index 85a5def96..7bbcb860e 100644 --- a/apis/types/task_register_request.go +++ b/apis/types/task_register_request.go @@ -104,6 +104,9 @@ type TaskRegisterRequest struct { // version number of dfget binary. Version string `json:"version,omitempty"` + + // PeerID is the ID of peer for HA + PeerID string `json:"peerID,omitempty"` } // Validate validates this task register request diff --git a/go.mod b/go.mod index 18dd7feb3..c98cd9656 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/stretchr/testify v1.2.2 github.com/valyala/fasthttp v1.3.0 github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9 + go.etcd.io/etcd v3.3.13+incompatible golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect gopkg.in/gcfg.v1 v1.2.3 gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect diff --git a/go.sum b/go.sum index 583d2a606..ed1267784 100644 --- a/go.sum +++ b/go.sum @@ -16,19 +16,26 @@ github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible h1:jFneRYjIvLMLhDLCzuTuU4rSJUjRplcJQ7pD7MnhC04= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.7 h1:DVS0EPFHUiaJSaX2EKlaf65HUmk9PXhOl/Xa3Go242Q= github.com/cpuguy83/go-md2man v1.0.7/go.mod h1:N6JayAiVKtlHSnuTCeuLSQVs75hb8q+dYQLjr7cDsKY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-check/check v0.0.0-20161208181325-20d25e280405 h1:0kdUKH22y+PT7ZITTEcrrHsQfGmpi4fj0XGyoDe/krQ= github.com/go-check/check v0.0.0-20161208181325-20d25e280405/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= @@ -57,8 +64,11 @@ github.com/go-openapi/validate v0.0.0-20170705144413-8a82927c942c h1:+cB2AzkH5an github.com/go-openapi/validate v0.0.0-20170705144413-8a82927c942c/go.mod h1:ve8xoSHgqBUifiKgaVbxLmOE0ckvH0oXfsJcnm6SIz0= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -67,20 +77,26 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/context v0.0.0-20181012153548-51ce91d2eadd h1:bB2XEQHhNsTTpqNzsq5ObUuqR7RNIdpm5Phb6AjeejE= github.com/gorilla/context v0.0.0-20181012153548-51ce91d2eadd/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.5.0 h1:mq8bRov+5x+pZNR/uAHyUEgovR9gLgYFwDQIeuYi9TM= github.com/gorilla/mux v1.5.0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.0 h1:bM6ZAFZmc/wPFaRDi0d5L7hGEZEx/2u+Tmr2evNHDiI= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -136,6 +152,7 @@ github.com/russross/blackfriday v0.0.0-20171011182219-6d1ef893fcb0 h1:hgS5QyP981 github.com/russross/blackfriday v0.0.0-20171011182219-6d1ef893fcb0/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= @@ -155,7 +172,9 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= @@ -164,11 +183,18 @@ github.com/valyala/fasthttp v1.3.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9 h1:WXBMTckrTcndPgRZBAEjqev+eN8MI9wbUQQUHlrUEV4= github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= +go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= @@ -197,6 +223,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -204,8 +231,10 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/supernode/config/config.go b/supernode/config/config.go index 6c3e3f73b..a04982040 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -18,13 +18,19 @@ package config import ( "fmt" + + "net/rpc" "path/filepath" "strings" + "sync" "time" - "gopkg.in/yaml.v2" - + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/fileutils" + + "github.com/go-openapi/strfmt" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" ) // NewConfig create an instant with default values. @@ -41,6 +47,17 @@ type Config struct { Storages map[string]interface{} `yaml:"storages"` } +// SupernodeInfo store the supernode's info get from etcd. +type SupernodeInfo struct { + IP string + ListenPort int + DownloadPort int + RPCPort int + HostName strfmt.Hostname + PID string + RPCClient *rpc.Client +} + // Load loads config properties from the giving file. func (c *Config) Load(path string) error { return fileutils.LoadYaml(path, c) @@ -84,6 +101,33 @@ func (c *Config) IsSuperPID(peerID string) bool { return peerID == c.superNodePID } +// GetOtherSupernodeInfo gets the other supernodes information in supernode ha cluster +func (c *Config) GetOtherSupernodeInfo() []SupernodeInfo { + c.lock.RLock() + defer c.lock.RUnlock() + return *c.OtherSupernodes +} + +// SetOtherSupernodeInfo sets the other supernodes information in supernode ha cluster +func (c *Config) SetOtherSupernodeInfo(otherSupernodes []SupernodeInfo) { + c.lock.Lock() + defer c.lock.Unlock() + c.OtherSupernodes = &otherSupernodes +} + +// GetOtherSupernodeInfoByPID get other supernode's info by peerID +func (c *Config) GetOtherSupernodeInfoByPID(peerID string) (*SupernodeInfo, error) { + c.lock.RLock() + defer c.lock.RUnlock() + for _, node := range *c.OtherSupernodes { + if node.PID == peerID { + return &node, nil + break + } + } + return nil, errors.Wrapf(errortypes.ErrDataNotFound, "peerID: %s", peerID) +} + // NewBaseProperties create an instant with default values. func NewBaseProperties() *BaseProperties { home := filepath.Join(string(filepath.Separator), "home", "admin", "supernode") @@ -107,6 +151,9 @@ func NewBaseProperties() *BaseProperties { GCMetaInterval: DefaultGCMetaInterval, TaskExpireTime: DefaultTaskExpireTime, PeerGCDelay: DefaultPeerGCDelay, + UseHA: false, + HAConfig: []string{"127.0.0.1:2379"}, + HARpcPort: 9000, } } @@ -212,6 +259,29 @@ type BaseProperties struct { // superNodePID is the ID of supernode, which is the same as peer ID of dfget. superNodePID string + + // UseHA is the mark of whether the supernode use the ha model. + // ha means if the active supernode is off,the standby supernode can take over active supernode's work. + // and the whole system can work as before. + // default:false. + UseHA bool `yaml:"useHa"` + + // HAConfig is available when UseHa is true. + // HAConfig configs the tool's ip and port we use to implement ha. + // default:[] string {127.0.0.1:2379}. + HAConfig []string `yaml:"haConfig"` + + //HAStandbyPort is available when UseHa is true. + //HAStandbyPort configs the port the supernode use when its status is standby + //HAStandbyPort is the port to receive the active supernode's request copy to implement state synchronization + HARpcPort int `yaml:"haStandbyPort"` + + //OtherSupernodes records other supernode info in the p2p System. + OtherSupernodes *[]SupernodeInfo `yaml:"otherSupernode"` + + //lock is thr read-write lock to use change supernodeInfo, + //if OtherSupernode changes when use,supernode may panic + lock sync.RWMutex `yaml:"lock"` } // TransLimit trans rateLimit from MB/s to B/s. diff --git a/supernode/daemon/daemon.go b/supernode/daemon/daemon.go index 13e2ca8a9..fb7e34ff2 100644 --- a/supernode/daemon/daemon.go +++ b/supernode/daemon/daemon.go @@ -22,6 +22,7 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/ha" "github.com/dragonflyoss/Dragonfly/supernode/plugins" "github.com/dragonflyoss/Dragonfly/supernode/server" @@ -81,6 +82,16 @@ func (d *Daemon) RegisterSuperNode() error { // Run runs the daemon. func (d *Daemon) Run() error { + if d.config.UseHA == true { + if err := ha.StartRPCServer(d.config, d.server.CdnMgr, d.server.DfgetTaskMgr, d.server.ProgressMgr, d.server.TaskMgr, d.server.PeerMgr); err != nil { + logrus.Errorf("failed to open rpc port,err: %v", err) + return err + } + if err := d.server.HaMgr.HADaemon(context.Background()); err != nil { + logrus.Errorf("failed to start a HA daemon progress,err: %v", err) + return err + } + } if err := d.server.Start(); err != nil { logrus.Errorf("failed to start HTTP server: %v", err) return err diff --git a/supernode/daemon/mgr/ha/etcd_tool.go b/supernode/daemon/mgr/ha/etcd_tool.go new file mode 100644 index 000000000..d02ebe923 --- /dev/null +++ b/supernode/daemon/mgr/ha/etcd_tool.go @@ -0,0 +1,152 @@ +package ha + +import ( + "context" + "fmt" + "net/rpc" + "strconv" + "strings" + "time" + + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/go-openapi/strfmt" + "github.com/sirupsen/logrus" + + "go.etcd.io/etcd/clientv3" +) + +// EtcdMgr is the struct to manager etcd. +type EtcdMgr struct { + config *config.Config + client *clientv3.Client + LeaseResp *clientv3.LeaseGrantResponse +} + +const ( + // etcdTimeOut is the etcd client's timeout second. + etcdTimeOut = 10 * time.Second + + // supernodeKeyPrefix is the key prefix of supernode info + supernodeKeyPrefix = "/standby/supernode/" +) + +// NewEtcdMgr produces a etcdmgr object. +func NewEtcdMgr(cfg *config.Config) (*EtcdMgr, error) { + config := clientv3.Config{ + Endpoints: cfg.HAConfig, + DialTimeout: etcdTimeOut, + } + // build connection to etcd. + client, err := clientv3.New(config) + if err != nil { + logrus.Errorf("failed to connect to etcd server,err %v", err) + return nil, err + } + return &EtcdMgr{ + config: cfg, + client: client, + }, err + +} + +// SendSupernodesInfo send supernode info to other supernode. +func (etcd *EtcdMgr) SendSupernodesInfo(ctx context.Context, key, ip, pID string, listenPort, downloadPort, rpcPort int, hostName strfmt.Hostname, timeout int64) error { + var respchan <-chan *clientv3.LeaseKeepAliveResponse + kv := clientv3.NewKV(etcd.client) + lease := clientv3.NewLease(etcd.client) + leaseResp, e := lease.Grant(ctx, timeout) + value := fmt.Sprintf("%s@%d@%d@%d@%s@%s", ip, listenPort, downloadPort, rpcPort, hostName, pID) + if _, e = kv.Put(ctx, key, value, clientv3.WithLease(leaseResp.ID)); e != nil { + logrus.Errorf("failed to put standby supernode's info to etcd as a lease,err %v", e) + return e + } + etcd.LeaseResp = leaseResp + if respchan, e = lease.KeepAlive(ctx, leaseResp.ID); e != nil { + logrus.Errorf("failed to send heart beat to etcd to renew the lease %v", e) + return e + } + //deal with the channel full warn + //TODO(yunfeiyangbuaa):do with this code,because it is useless + go func() { + for { + <-respchan + } + }() + return nil +} + +// Close closes the tool used to implement supernode ha. +func (etcd *EtcdMgr) Close(ctx context.Context) error { + var err error + if err = etcd.client.Close(); err != nil { + logrus.Errorf("failed to close etcd client,err %v", err) + return err + } + logrus.Info("success to close a etcd client") + return nil +} + +// WatchSupernodesChange is the progress to watch the etcd,if the value of key prefix changes,supernode will be notified. +func (etcd *EtcdMgr) WatchSupernodesChange(ctx context.Context, key string) error { + //when start supernode,get supernode info + if _, err := etcd.getSupenrodesInfo(ctx, key); err != nil { + logrus.Errorf("failed to get standby supernode info,err: %v", err) + return err + } + //etcd.registerOtherSupernodesAsPeer(ctx) + watcher := clientv3.NewWatcher(etcd.client) + watchChan := watcher.Watch(ctx, key, clientv3.WithPrefix()) + + //after supernode start,if other supernode changes,do with it + for watchResp := range watchChan { + for _, event := range watchResp.Events { + logrus.Infof("success to notice supernodes changes,code(1:supernode add,0:supernode delete) %d", int(event.Type)) + if _, err := etcd.getSupenrodesInfo(ctx, key); err != nil { + logrus.Errorf("failed to get standby supernode info,err: %v", err) + return err + } + } + } + return nil +} + +// GetSupenrodesInfo gets supernode info from etcd +func (etcd *EtcdMgr) getSupenrodesInfo(ctx context.Context, key string) ([]config.SupernodeInfo, error) { + var ( + nodes []config.SupernodeInfo + getRes *clientv3.GetResponse + e error + ) + kv := clientv3.NewKV(etcd.client) + if getRes, e = kv.Get(ctx, key, clientv3.WithPrefix()); e != nil { + logrus.Errorf("failed to get other supernode's information,err %v", e) + return nil, e + } + for _, v := range getRes.Kvs { + splits := strings.Split(string(v.Value), "@") + // if the supernode is itself,skip + if splits[5] == etcd.config.GetSuperPID() { + continue + } + lPort, _ := strconv.Atoi(splits[1]) + dPort, _ := strconv.Atoi(splits[2]) + rPort, _ := strconv.Atoi(splits[3]) + rpcAddress := fmt.Sprintf("%s:%d", splits[0], rPort) + conn, err := rpc.DialHTTP("tcp", rpcAddress) + if err != nil { + logrus.Errorf("failed to connect to the rpc port %s,err: %v", rpcAddress, err) + return nil, err + } + nodes = append(nodes, config.SupernodeInfo{ + IP: splits[0], + ListenPort: lPort, + DownloadPort: dPort, + RPCPort: rPort, + HostName: strfmt.Hostname(splits[4]), + PID: splits[5], + RPCClient: conn, + }) + } + etcd.config.SetOtherSupernodeInfo(nodes) + return nodes, nil +} diff --git a/supernode/daemon/mgr/ha/manager.go b/supernode/daemon/mgr/ha/manager.go new file mode 100644 index 000000000..a21541dbc --- /dev/null +++ b/supernode/daemon/mgr/ha/manager.go @@ -0,0 +1,101 @@ +package ha + +import ( + "context" + "fmt" + "math/rand" + "os" + "time" + + apiTypes "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/dragonflyoss/Dragonfly/supernode/config" + + "github.com/go-openapi/strfmt" + "github.com/sirupsen/logrus" +) + +// Manager is the struct to manager supernode ha. +type Manager struct { + nodeStatus int + tool Tool + config *config.Config + + HTTPClient httputils.SimpleHTTPClient +} + +// NewManager produces the Manager object. +func NewManager(cfg *config.Config) (*Manager, error) { + var ( + toolMgr Tool + err error + ) + if cfg.UseHA != false { + toolMgr, err = NewEtcdMgr(cfg) + if err != nil { + logrus.Errorf("failed to init the ha tool: %v", err) + return nil, err + } + } + return &Manager{ + config: cfg, + HTTPClient: httputils.DefaultHTTPClient, + tool: toolMgr, + }, nil +} + +// HADaemon is the main progress to implement active/standby switch. +func (ha *Manager) HADaemon(ctx context.Context) error { + hostname, _ := os.Hostname() + pid := ha.config.GetSuperPID() + standbyAddress := fmt.Sprintf("%s%s:%d", supernodeKeyPrefix, ha.config.AdvertiseIP, ha.config.ListenPort) + if err := ha.tool.SendSupernodesInfo(ctx, standbyAddress, ha.config.AdvertiseIP, pid, ha.config.ListenPort, ha.config.DownloadPort, ha.config.HARpcPort, strfmt.Hostname(hostname), 2); err != nil { + logrus.Errorf("failed to send supernode info to other supernode,err %v", err) + return err + } + // a process to watch the standby supernode's status. + go ha.tool.WatchSupernodesChange(ctx, supernodeKeyPrefix) + return nil +} + +// CloseHaManager closes the tool use to implement supernode ha. +func (ha *Manager) CloseHaManager(ctx context.Context) error { + return ha.tool.Close(ctx) +} + +// SendPostCopy sends post request to other supernode like dfget +func (ha *Manager) SendPostCopy(ctx context.Context, req interface{}, path string, node *config.SupernodeInfo) error { + url := fmt.Sprintf("%s://%s:%d%s", "http", node.IP, node.ListenPort, path) + if _, _, e := ha.post(url, req, 5*time.Second); e != nil { + logrus.Errorf("failed to send post copy,err: %v", e) + return e + } + return nil +} + +// TriggerOtherSupernodeDownload randomly sends dfget register req copy to other supernode to trigger and download +func (ha *Manager) TriggerOtherSupernodeDownload(ctx context.Context, req *apiTypes.TaskRegisterRequest) error { + index := ha.randomSelectSupernodeTriggerCDN(ctx) + if index == -1 { + return nil + } + err := ha.config.GetOtherSupernodeInfo()[index].RPCClient.Call("RPCManager.RPCOnlyTriggerCDNDownload", req, nil) + if err != nil { + logrus.Errorf("failed to trigger CDN download via rpc,err: %v", err) + return err + } + return nil +} + +// randomSelectSupernodeTriggerCDN randomly select supernode +func (ha *Manager) randomSelectSupernodeTriggerCDN(ctx context.Context) int { + if supernodeNum := len(ha.config.GetOtherSupernodeInfo()); supernodeNum == 0 { + return -1 + } + return rand.Intn(len(ha.config.GetOtherSupernodeInfo())) +} + +// Post sends post request to supernode +func (ha *Manager) post(url string, body interface{}, timeout time.Duration) (code int, res []byte, e error) { + return ha.HTTPClient.PostJSON(url, body, 5*time.Second) +} diff --git a/supernode/daemon/mgr/ha/rpc_manager.go b/supernode/daemon/mgr/ha/rpc_manager.go new file mode 100644 index 000000000..9384c324a --- /dev/null +++ b/supernode/daemon/mgr/ha/rpc_manager.go @@ -0,0 +1,235 @@ +package ha + +import ( + "context" + "fmt" + "net" + "net/http" + "net/rpc" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + + "github.com/go-openapi/strfmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// RPCManager is the struct of rpc task +type RPCManager struct { + cfg *config.Config + CdnMgr mgr.CDNMgr + DfgetTaskMgr mgr.DfgetTaskMgr + ProgressMgr mgr.ProgressMgr + TaskMgr mgr.TaskMgr + PeerMgr mgr.PeerMgr +} + +// RPCReportPieceRequest is the report piece rpc request +type RPCReportPieceRequest struct { + TaskID string + PieceNum int + Md5 string + PieceStatus int + CID string + SrcPID string + DstCID string +} + +// RPCUpdateTaskInfoRequest is the dateTaskInfo rpc request +type RPCUpdateTaskInfoRequest struct { + CdnStatus string + FileLength int64 + RealMd5 string + TaskID string + CDNPeerID string +} + +// RPCGetPieceRequest is the getpiece rpc request +type RPCGetPieceRequest struct { + DfgetTaskStatus string + PieceRange string + PieceResult string + TaskID string + Cid string + DstCID string +} + +// RPCGetPieceResponse is the get piece rpc response +type RPCGetPieceResponse struct { + IsFinished bool + Data []*types.PieceInfo + ErrCode int + ErrMsg string +} + +// RPCServerDownRequest is the server down rpc request +type RPCServerDownRequest struct { + TaskID string + CID string +} + +// RPCAddSupernodeWatchRequest is the supernode task info update change add notify rpc request +type RPCAddSupernodeWatchRequest struct { + TaskID string + SupernodePID string +} + +// NewRPCMgr produces a RPCManager +func NewRPCMgr(cfg *config.Config, CdnMgr mgr.CDNMgr, DfgetTaskMgr mgr.DfgetTaskMgr, + ProgressMgr mgr.ProgressMgr, TaskMgr mgr.TaskMgr, PeerMgr mgr.PeerMgr) *RPCManager { + rpcMgr := &RPCManager{ + CdnMgr: CdnMgr, + ProgressMgr: ProgressMgr, + DfgetTaskMgr: DfgetTaskMgr, + TaskMgr: TaskMgr, + PeerMgr: PeerMgr, + cfg: cfg, + } + return rpcMgr +} + +// StartRPCServer starts rpc server +func StartRPCServer(cfg *config.Config, CdnMgr mgr.CDNMgr, DfgetTaskMgr mgr.DfgetTaskMgr, ProgressMgr mgr.ProgressMgr, + TaskMgr mgr.TaskMgr, PeerMgr mgr.PeerMgr) error { + rpc.Register(NewRPCMgr(cfg, CdnMgr, DfgetTaskMgr, ProgressMgr, TaskMgr, PeerMgr)) + rpc.HandleHTTP() + rpcAddress := fmt.Sprintf("%s:%d", cfg.AdvertiseIP, cfg.HARpcPort) + lis, err := net.Listen("tcp", rpcAddress) + if err != nil { + logrus.Errorf("failed to start a rpc server,err %v", err) + return err + } + go http.Serve(lis, nil) + return nil +} + +// RPCUpdateProgress updates progress +func (rpc *RPCManager) RPCUpdateProgress(req RPCReportPieceRequest, res *bool) error { + dstDfgetTask, err := rpc.DfgetTaskMgr.Get(context.TODO(), req.DstCID, req.TaskID) + if err != nil { + logrus.Errorf("failed to get dfgetTask by cid(%s) and taskID(%s) from rpc request,err: %v", req.DstCID, req.TaskID, err) + return err + } + return rpc.ProgressMgr.UpdateProgress(context.TODO(), req.TaskID, req.CID, req.SrcPID, dstDfgetTask.PeerID, req.PieceNum, req.PieceStatus) +} + +// RPCGetTaskInfo get task info according a req +func (rpc *RPCManager) RPCGetTaskInfo(task string, resp *RPCUpdateTaskInfoRequest) error { + taskInfo, err := rpc.TaskMgr.Get(context.TODO(), task) + if err != nil { + logrus.Errorf("failed to get Task by task(%s) from rpc request,err: %v", task, err) + return err + } + resp.CdnStatus = taskInfo.CdnStatus + resp.FileLength = taskInfo.FileLength + resp.RealMd5 = taskInfo.RealMd5 + resp.CDNPeerID = taskInfo.CDNPeerID + resp.TaskID = taskInfo.ID + return nil +} + +// RPCGetPiece gets pieces according req +func (rpc *RPCManager) RPCGetPiece(req RPCGetPieceRequest, resp *RPCGetPieceResponse) error { + piecePullRequest := &types.PiecePullRequest{ + DfgetTaskStatus: req.DfgetTaskStatus, + PieceRange: req.PieceRange, + PieceResult: req.PieceResult, + DstCid: req.DstCID, + } + if !stringutils.IsEmptyStr(req.DstCID) { + dstDfgetTask, err := rpc.DfgetTaskMgr.Get(context.TODO(), req.Cid, req.TaskID) + if err != nil { + logrus.Warnf("failed to get dfget task by dstCID(%s) and taskID(%s) from rpc request, and the srcCID is %s, err: %v", + req.DstCID, req.TaskID, req.Cid, err) + } else { + piecePullRequest.DstPID = dstDfgetTask.PeerID + } + } + isFinished, data, err := rpc.TaskMgr.GetPieces(context.TODO(), req.TaskID, req.Cid, piecePullRequest) + if err != nil { + e, ok := errors.Cause(err).(errortypes.DfError) + if ok { + resp.ErrCode = e.Code + resp.ErrMsg = e.Msg + } + } else { + pieceInfos, _ := data.([]*types.PieceInfo) + resp.Data = pieceInfos + resp.IsFinished = isFinished + } + return nil +} + +// RPCDfgetServerDown report dfserver if off +func (rpc *RPCManager) RPCDfgetServerDown(request RPCServerDownRequest, resp *bool) error { + dfgetTask, err := rpc.DfgetTaskMgr.Get(context.TODO(), request.CID, request.TaskID) + if err != nil { + logrus.Errorf("failed to get dfgetTask by cid(%s) and taskID(%s) from rpc request,err: %v", request.CID, request.TaskID, err) + return err + } + if err := rpc.ProgressMgr.UpdatePeerServiceDown(context.TODO(), dfgetTask.PeerID); err != nil { + return err + } + return nil +} + +// RPCOnlyTriggerCDNDownload trigger a cdn download +func (rpc *RPCManager) RPCOnlyTriggerCDNDownload(req types.TaskRegisterRequest, resp *bool) error { + if err := req.Validate(strfmt.NewFormats()); err != nil { + return errors.Wrap(errortypes.ErrInvalidValue, err.Error()) + } + taskCreateRequest := &types.TaskCreateRequest{ + CID: req.CID, + CallSystem: req.CallSystem, + Dfdaemon: req.Dfdaemon, + Headers: netutils.ConvertHeaders(req.Headers), + Identifier: req.Identifier, + Md5: req.Md5, + Path: req.Path, + PeerID: "", + RawURL: req.RawURL, + TaskURL: req.TaskURL, + SupernodeIP: req.SuperNodeIP, + } + if err := rpc.TaskMgr.OnlyTriggerDownload(context.TODO(), taskCreateRequest, &req); err != nil { + logrus.Errorf("failed to trigger CDN download by rpc,err: %v", err) + return err + } + return nil +} + +// RPCUpdateTaskInfo uodate supernode's task info via rpc +func (rpc *RPCManager) RPCUpdateTaskInfo(req RPCUpdateTaskInfoRequest, resp *bool) error { + var ( + updateTask *types.TaskInfo + err error + ) + updateTask = &types.TaskInfo{ + CdnStatus: req.CdnStatus, + FileLength: req.FileLength, + RealMd5: req.RealMd5, + } + + if err = rpc.TaskMgr.Update(context.TODO(), req.TaskID, updateTask); err != nil { + logrus.Errorf("failed to update task %v via rpc,err: %v", updateTask, req) + return err + } + logrus.Debugf("success to update task cdn via rpc %+v", updateTask) + return nil +} + +// RPCAddSupernodeWatch register a watch to other supernode,if other supernode's task update,it can be notified +func (rpc *RPCManager) RPCAddSupernodeWatch(req RPCAddSupernodeWatchRequest, resp *bool) error { + task, err := rpc.TaskMgr.Get(context.TODO(), req.TaskID) + if err != nil { + logrus.Errorf("failed to get Task by task(%s) from rpc request,err: %v", req.TaskID, err) + return err + } + task.NotifySupernodesPID = append(task.NotifySupernodesPID, req.SupernodePID) + return nil +} diff --git a/supernode/daemon/mgr/ha/tool.go b/supernode/daemon/mgr/ha/tool.go new file mode 100644 index 000000000..680dcf66d --- /dev/null +++ b/supernode/daemon/mgr/ha/tool.go @@ -0,0 +1,19 @@ +package ha + +import ( + "context" + + "github.com/go-openapi/strfmt" +) + +// Tool is an interface that use etcd/zookeeper/yourImplement tools to manager supernode cluster. +type Tool interface { + // WatchStandbySupernodesChange watches other supernodes,supernode will be notified if other superodes changes. + WatchSupernodesChange(ctx context.Context, key string) error + + // Close closes the tool. + Close(ctx context.Context) error + + // SendSupernodesInfo send supernode info to other supernode. + SendSupernodesInfo(ctx context.Context, key, ip, pID string, listenPort, downloadPort, rpcPort int, hostName strfmt.Hostname, timeout int64) error +} diff --git a/supernode/daemon/mgr/ha_mgr.go b/supernode/daemon/mgr/ha_mgr.go new file mode 100644 index 000000000..e5feca054 --- /dev/null +++ b/supernode/daemon/mgr/ha_mgr.go @@ -0,0 +1,24 @@ +package mgr + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/config" +) + +// HaMgr is the interface to implement supernode Ha. +type HaMgr interface { + + // CloseHaManager closes the tool used to implement supernode ha. + CloseHaManager(ctx context.Context) error + + // HADaemon is the etcd daemon progress to manager superodes cluster + HADaemon(ctx context.Context) error + + // SendPostCopy sends post request to other supernode like dfget + SendPostCopy(ctx context.Context, req interface{}, path string, node *config.SupernodeInfo) error + + // TriggerOtherSupernodeDownload triggers other superode only download file + TriggerOtherSupernodeDownload(ctx context.Context, req *types.TaskRegisterRequest) error +} diff --git a/supernode/daemon/mgr/mock/mock_ha_mgr.go b/supernode/daemon/mgr/mock/mock_ha_mgr.go new file mode 100644 index 000000000..e924313e3 --- /dev/null +++ b/supernode/daemon/mgr/mock/mock_ha_mgr.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ha_mgr.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + types "github.com/dragonflyoss/Dragonfly/apis/types" + config "github.com/dragonflyoss/Dragonfly/supernode/config" + + gomock "github.com/golang/mock/gomock" +) + +// MockHaMgr is a mock of HaMgr interface +type MockHaMgr struct { + ctrl *gomock.Controller + recorder *MockHaMgrMockRecorder +} + +// MockHaMgrMockRecorder is the mock recorder for MockHaMgr +type MockHaMgrMockRecorder struct { + mock *MockHaMgr +} + +// NewMockHaMgr creates a new mock instance +func NewMockHaMgr(ctrl *gomock.Controller) *MockHaMgr { + mock := &MockHaMgr{ctrl: ctrl} + mock.recorder = &MockHaMgrMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockHaMgr) EXPECT() *MockHaMgrMockRecorder { + return m.recorder +} + +// CloseHaManager mocks base method +func (m *MockHaMgr) CloseHaManager(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseHaManager", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseHaManager indicates an expected call of CloseHaManager +func (mr *MockHaMgrMockRecorder) CloseHaManager(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseHaManager", reflect.TypeOf((*MockHaMgr)(nil).CloseHaManager), ctx) +} + +// HADaemon mocks base method +func (m *MockHaMgr) HADaemon(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HADaemon", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// HADaemon indicates an expected call of HADaemon +func (mr *MockHaMgrMockRecorder) HADaemon(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HADaemon", reflect.TypeOf((*MockHaMgr)(nil).HADaemon), ctx) +} + +// SendPostCopy mocks base method +func (m *MockHaMgr) SendPostCopy(ctx context.Context, req interface{}, path string, node *config.SupernodeInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendPostCopy", ctx, req, path, node) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendPostCopy indicates an expected call of SendPostCopy +func (mr *MockHaMgrMockRecorder) SendPostCopy(ctx, req, path, node interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPostCopy", reflect.TypeOf((*MockHaMgr)(nil).SendPostCopy), ctx, req, path, node) +} + +// TriggerOtherSupernodeDownload mocks base method +func (m *MockHaMgr) TriggerOtherSupernodeDownload(ctx context.Context, req *types.TaskRegisterRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TriggerOtherSupernodeDownload", ctx, req) + ret0, _ := ret[0].(error) + return ret0 +} + +// TriggerOtherSupernodeDownload indicates an expected call of TriggerOtherSupernodeDownload +func (mr *MockHaMgrMockRecorder) TriggerOtherSupernodeDownload(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerOtherSupernodeDownload", reflect.TypeOf((*MockHaMgr)(nil).TriggerOtherSupernodeDownload), ctx, req) +} diff --git a/supernode/daemon/mgr/peer/manager.go b/supernode/daemon/mgr/peer/manager.go index c0fadbe27..58c3b275d 100644 --- a/supernode/daemon/mgr/peer/manager.go +++ b/supernode/daemon/mgr/peer/manager.go @@ -65,6 +65,7 @@ func NewManager(register prometheus.Registerer) (*Manager, error) { // Register a peer and generate a unique ID as returned. func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCreateRequest) (peerCreateResponse *types.PeerCreateResponse, err error) { + var id string if peerCreateRequest == nil { return nil, errors.Wrap(errortypes.ErrEmptyValue, "peer create request") } @@ -74,7 +75,12 @@ func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCr return nil, errors.Wrapf(errortypes.ErrInvalidValue, "peer IP: %s", ipString) } - id := generatePeerID(peerCreateRequest) + if peerCreateRequest.PeerID == "" { + id = generatePeerID(peerCreateRequest) + } else { + id = peerCreateRequest.PeerID + } + peerInfo := &types.PeerInfo{ ID: id, IP: peerCreateRequest.IP, diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 859ca9646..889cdcc81 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -27,6 +27,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/syncmap" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/ha" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" "github.com/dragonflyoss/Dragonfly/supernode/httpclient" "github.com/dragonflyoss/Dragonfly/supernode/util" @@ -83,6 +84,7 @@ type Manager struct { progressMgr mgr.ProgressMgr cdnMgr mgr.CDNMgr schedulerMgr mgr.SchedulerMgr + haMgr mgr.HaMgr OriginClient httpclient.OriginHTTPClient metrics *metrics } @@ -90,7 +92,7 @@ type Manager struct { // NewManager returns a new Manager Object. func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr, progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr, - originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) { + originClient httpclient.OriginHTTPClient, register prometheus.Registerer, haMgr mgr.HaMgr) (*Manager, error) { return &Manager{ cfg: cfg, taskStore: dutil.NewStore(), @@ -102,12 +104,13 @@ func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetT accessTimeMap: syncmap.NewSyncMap(), taskURLUnReachableStore: syncmap.NewSyncMap(), OriginClient: originClient, + haMgr: haMgr, metrics: newMetrics(register), }, nil } // Register will not only register a task. -func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) (taskCreateResponse *types.TaskCreateResponse, err error) { +func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest, httpReq *types.TaskRegisterRequest) (taskCreateResponse *types.TaskCreateResponse, err error) { // Step1: validate params if err := validateParams(req); err != nil { return nil, err @@ -157,7 +160,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( // TODO: defer rollback init Progress // Step5: trigger CDN - if err := tm.triggerCdnSyncAction(ctx, task); err != nil { + if err := tm.triggerCdnSyncAction(ctx, task, false, httpReq); err != nil { return nil, errors.Wrapf(errortypes.ErrSystemError, "failed to trigger cdn: %v", err) } @@ -245,6 +248,11 @@ func (tm *Manager) GetPieces(ctx context.Context, taskID, clientID string, req * } logrus.Debugf("success to get task: %+v", task) + local, _ := tm.IsDownloadLocal(ctx, task.ID) + if tm.cfg.UseHA && !local { + return tm.getPiecesFromOtherSupernode(ctx, taskID, task, clientID, req, dfgetTaskStatus) + } + // update accessTime for taskID if err := tm.accessTimeMap.Add(task.ID, time.Now()); err != nil { logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err) @@ -284,6 +292,53 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str return errors.Wrapf(errortypes.ErrInvalidValue, "result: %s", pieceUpdateRequest.PieceStatus) } + if tm.cfg.UseHA && pieceUpdateRequest.SendCopy { + req := ha.RPCReportPieceRequest{ + TaskID: taskID, + CID: pieceUpdateRequest.ClientID, + SrcPID: srcDfgetTask.PeerID, + DstCID: pieceUpdateRequest.DstCid, + PieceNum: pieceNum, + PieceStatus: pieceStatus, + } + node, err := tm.cfg.GetOtherSupernodeInfoByPID(pieceUpdateRequest.SendCopyPeerID) + if err != nil { + logrus.Errorf("failed to get other supernode info by peerID %s,err: %v", pieceUpdateRequest.SendCopyPeerID, err) + return err + } + err = node.RPCClient.Call("RPCManager.RPCUpdateProgress", req, nil) + if err != nil { + logrus.Errorf("failed send report request %v to other supernode %s,err: %v", req, node.PID, err) + } + + } + return tm.progressMgr.UpdateProgress(ctx, taskID, pieceUpdateRequest.ClientID, srcDfgetTask.PeerID, pieceUpdateRequest.DstPID, pieceNum, pieceStatus) } + +// OnlyTriggerDownload only trigger a cdn download. +func (tm *Manager) OnlyTriggerDownload(ctx context.Context, req *types.TaskCreateRequest, httpREQ *types.TaskRegisterRequest) error { + failAccessInterval := tm.cfg.FailAccessInterval * time.Minute + task, err := tm.addOrUpdateTask(ctx, req, failAccessInterval) + if err != nil { + logrus.Infof("failed to add or update task with req %+v: %v", req, err) + return err + } + if err := tm.triggerCdnSyncAction(ctx, task, true, httpREQ); err != nil { + return errors.Wrapf(errortypes.ErrSystemError, "failed to trigger cdn: %v", err) + } + return nil +} + +// IsDownloadLocal judges wether the task download from this supernode for HA +func (tm *Manager) IsDownloadLocal(ctx context.Context, taskID string) (bool, string) { + task, err := tm.getTask(taskID) + if err != nil { + return false, "" + } + if task.CDNPeerID == "" || task.CDNPeerID == tm.cfg.GetSuperPID() { + return true, tm.cfg.GetSuperPID() + } + return false, task.CDNPeerID +} diff --git a/supernode/daemon/mgr/task/manager_test.go b/supernode/daemon/mgr/task/manager_test.go index 59c2b604d..3e481a622 100644 --- a/supernode/daemon/mgr/task/manager_test.go +++ b/supernode/daemon/mgr/task/manager_test.go @@ -48,6 +48,7 @@ type TaskMgrTestSuite struct { mockPeerMgr *mock.MockPeerMgr mockProgressMgr *mock.MockProgressMgr mockSchedulerMgr *mock.MockSchedulerMgr + mockHaMgr *mock.MockHaMgr mockOriginClient *cMock.MockOriginHTTPClient taskManager *Manager @@ -61,6 +62,7 @@ func (s *TaskMgrTestSuite) SetUpSuite(c *check.C) { s.mockDfgetTaskMgr = mock.NewMockDfgetTaskMgr(s.mockCtl) s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl) s.mockSchedulerMgr = mock.NewMockSchedulerMgr(s.mockCtl) + s.mockHaMgr = mock.NewMockHaMgr(s.mockCtl) s.mockOriginClient = cMock.NewMockOriginHTTPClient(s.mockCtl) s.mockCDNMgr.EXPECT().TriggerCDN(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() @@ -69,7 +71,7 @@ func (s *TaskMgrTestSuite) SetUpSuite(c *check.C) { s.mockOriginClient.EXPECT().GetContentLength(gomock.Any(), gomock.Any()).Return(int64(1000), 200, nil) cfg := config.NewConfig() s.taskManager, _ = NewManager(cfg, s.mockPeerMgr, s.mockDfgetTaskMgr, - s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient, prometheus.NewRegistry()) + s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient, prometheus.NewRegistry(), s.mockHaMgr) } func (s *TaskMgrTestSuite) TearDownSuite(c *check.C) { @@ -87,7 +89,8 @@ func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) { RawURL: "http://aa.bb.com", PeerID: "fooPeerID", } - resp, err := s.taskManager.Register(context.Background(), req) + httpReq := &types.TaskRegisterRequest{} + resp, err := s.taskManager.Register(context.Background(), req, httpReq) c.Check(err, check.IsNil) c.Assert(1, check.Equals, int(prom_testutil.ToFloat64(tasksRegisterCount.WithLabelValues()))) @@ -118,7 +121,8 @@ func (s *TaskMgrTestSuite) TestUpdateTaskInfo(c *check.C) { RawURL: "http://aa.bb.com", PeerID: "fooPeerID", } - resp, err := s.taskManager.Register(context.Background(), req) + httpReq := &types.TaskRegisterRequest{} + resp, err := s.taskManager.Register(context.Background(), req, httpReq) c.Check(err, check.IsNil) // return error when taskInfo equals nil diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index cfb000bea..ab18c3eff 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -30,6 +30,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/ha" "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/pkg/errors" @@ -71,6 +72,13 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq if v, err := tm.taskStore.Get(taskID); err == nil { task = v.(*types.TaskInfo) + if tm.cfg.UseHA { + local, _ := tm.IsDownloadLocal(ctx, taskID) + if _, err := tm.cfg.GetOtherSupernodeInfoByPID(task.CDNPeerID); err != nil && !local { + tm.taskStore.Delete(taskID) + task = newTask + } + } if !equalsTask(task, newTask) { return nil, errors.Wrapf(errortypes.ErrTaskIDDuplicate, "%s", taskID) } @@ -209,12 +217,34 @@ func (tm *Manager) addDfgetTask(ctx context.Context, req *types.TaskCreateReques return dfgetTask, nil } -func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.TaskInfo) error { +func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.TaskInfo, onlyTriggerDownload bool, httpReq *types.TaskRegisterRequest) error { if !isFrozen(task.CdnStatus) { logrus.Infof("CDN(%s) is running or has been downloaded successfully for taskID: %s", task.CdnStatus, task.ID) + if tm.cfg.UseHA { + // means the supernode download this task from other supernode + local, _ := tm.IsDownloadLocal(ctx, task.ID) + if !local { + return tm.sendRegisterCopyToOtherSupernode(ctx, task, httpReq) + } + } return nil } + if tm.cfg.UseHA && onlyTriggerDownload == false { + // ask cdn resource from other supernodes + tm.addCdnResourceFromOtherSupernodes(ctx, task, onlyTriggerDownload, httpReq) + } + + if tm.cfg.UseHA && !isFrozen(task.CdnStatus) { + logrus.Infof("CDN(%s) is running or has been downloaded successfully for taskID for other supernode: %s", task.CdnStatus, task.CDNPeerID) + return nil + } + + if tm.cfg.UseHA && onlyTriggerDownload == false { + //trigger other supernode download + tm.haMgr.TriggerOtherSupernodeDownload(ctx, httpReq) + } + if isWait(task.CdnStatus) { if err := tm.initCdnNode(ctx, task); err != nil { logrus.Errorf("failed to init cdn node for taskID %s: %v", task.ID, err) @@ -534,3 +564,128 @@ func (tm *Manager) getHTTPFileLength(taskID, url string, headers map[string]stri return fileLength, nil } + +// addCdnResourceFromOtherSupernodes try to find a existing downloading file from other supernode +func (tm *Manager) addCdnResourceFromOtherSupernodes(ctx context.Context, task *types.TaskInfo, onlyTriggerDownload bool, httpREQ *types.TaskRegisterRequest) { + for _, node := range tm.cfg.GetOtherSupernodeInfo() { + var ( + resp ha.RPCUpdateTaskInfoRequest + srcNode *config.SupernodeInfo + err error + ) + //judge whether other supernode download successfully + if err = node.RPCClient.Call("RPCManager.RPCGetTaskInfo", task.ID, &resp); err != nil { + logrus.Debugf("failed to find cdn resource from supernode %s", node.PID) + continue + } + if resp.CdnStatus != types.DfGetTaskStatusFAILED { + srcNode, err = tm.cfg.GetOtherSupernodeInfoByPID(resp.CDNPeerID) + if err != nil { + continue + } + if err := tm.haMgr.SendPostCopy(context.TODO(), httpREQ, "/peer/registry", srcNode); err != nil { + //if task is + logrus.Errorf("failed to send register copy to supernode %s,err: %v", srcNode.PID, err) + continue + } + } else { + continue + } + if resp.CdnStatus == types.TaskInfoCdnStatusSUCCESS { + if err := tm.updateTask(task.ID, &types.TaskInfo{ + CdnStatus: resp.CdnStatus, + CDNPeerID: resp.CDNPeerID, + Md5: resp.RealMd5, + ID: task.ID, + FileLength: resp.FileLength, + }); err != nil { + logrus.Errorf("failed to update task %s, err: %v", task.ID, err) + continue + } + break + } + if err := srcNode.RPCClient.Call("RPCManager.RPCGetTaskInfo", task.ID, &resp); err != nil { + logrus.Errorf("failed to get task %s info from other supernode %s", task.ID, srcNode.PID) + continue + } + if resp.CdnStatus == types.DfGetTaskStatusRUNNING || resp.CdnStatus == types.TaskInfoCdnStatusWAITING { + if err := tm.updateTask(task.ID, &types.TaskInfo{ + CdnStatus: resp.CdnStatus, + CDNPeerID: resp.CDNPeerID, + }); err != nil { + logrus.Errorf("failed to update task %s, err: %v", task.ID, err) + continue + } + req := ha.RPCAddSupernodeWatchRequest{ + SupernodePID: tm.cfg.GetSuperPID(), + TaskID: task.ID, + } + if err := srcNode.RPCClient.Call("RPCManager.RPCAddSupernodeWatch", req, nil); err != nil { + continue + } + } + break + } +} + +// sendRegisterCopyToOtherSupernode sends a register request to other supernode +func (tm *Manager) sendRegisterCopyToOtherSupernode(ctx context.Context, task *types.TaskInfo, httpREQ *types.TaskRegisterRequest) error { + local, _ := tm.IsDownloadLocal(ctx, task.ID) + if tm.cfg.UseHA && !local { + node, err := tm.cfg.GetOtherSupernodeInfoByPID(task.CDNPeerID) + if err != nil { + return err + } + if err := tm.haMgr.SendPostCopy(context.TODO(), httpREQ, "/peer/registry", node); err != nil { + return err + } + } + return nil +} + +// getPiecesFromOtherSupernode gets schedule from other supernode +func (tm *Manager) getPiecesFromOtherSupernode(ctx context.Context, taskID string, task *types.TaskInfo, clientID string, + req *types.PiecePullRequest, dfgetTaskStatus string) (bool, interface{}, error) { + var RPCResponse ha.RPCGetPieceResponse + if dfgetTaskStatus == types.DfGetTaskStatusWAITING { + if err := tm.dfgetTaskMgr.UpdateStatus(ctx, clientID, task.ID, types.DfGetTaskStatusRUNNING); err != nil { + return false, nil, err + } + } else if dfgetTaskStatus == types.DfGetTaskStatusRUNNING { + dfgetTask, _ := tm.dfgetTaskMgr.Get(ctx, clientID, taskID) + pieceNum := util.CalculatePieceNum(req.PieceRange) + if pieceNum == -1 { + return false, nil, errors.Wrapf(errortypes.ErrInvalidValue, "pieceRange: %s", req.PieceRange) + } + pieceStatus, success := convertToPeerPieceStatus(req.PieceResult, req.DfgetTaskStatus) + if !success { + return false, nil, errors.Wrapf(errortypes.ErrInvalidValue, "failed to convert result: %s and status %s to pieceStatus", req.PieceResult, req.DfgetTaskStatus) + } + if err := tm.progressMgr.UpdateProgress(ctx, task.ID, clientID, dfgetTask.PeerID, req.DstPID, pieceNum, pieceStatus); err != nil { + return false, nil, errors.Wrap(err, "failed to update progress") + } + } else { + return true, nil, tm.processTaskFinish(ctx, taskID, clientID, dfgetTaskStatus) + } + RPCReq := ha.RPCGetPieceRequest{ + DfgetTaskStatus: req.DfgetTaskStatus, + PieceResult: req.PieceResult, + PieceRange: req.PieceRange, + TaskID: taskID, + Cid: clientID, + } + node, err := tm.cfg.GetOtherSupernodeInfoByPID(task.CDNPeerID) + if err != nil { + return false, nil, err + } + err = node.RPCClient.Call("RPCManager.RPCGetPiece", RPCReq, &RPCResponse) + if RPCResponse.ErrMsg != "" { + return false, nil, errors.Wrapf(errortypes.DfError{RPCResponse.ErrCode, RPCResponse.ErrMsg}, + "failed to send pull task request %v to other supernode %s,err: %v", RPCReq, node.PID, err) + } + return RPCResponse.IsFinished, RPCResponse.Data, err +} + +func (tm *Manager) isDownloadFromOtherSupernode(cdnPeerID string) bool { + return cdnPeerID != "" && cdnPeerID != tm.cfg.GetSuperPID() +} diff --git a/supernode/daemon/mgr/task/manager_util_test.go b/supernode/daemon/mgr/task/manager_util_test.go index 8de07404d..8abf734bc 100644 --- a/supernode/daemon/mgr/task/manager_util_test.go +++ b/supernode/daemon/mgr/task/manager_util_test.go @@ -41,6 +41,7 @@ type TaskUtilTestSuite struct { mockPeerMgr *mock.MockPeerMgr mockProgressMgr *mock.MockProgressMgr mockSchedulerMgr *mock.MockSchedulerMgr + mockHaMgr *mock.MockHaMgr mockOriginClient *cMock.MockOriginHTTPClient taskManager *Manager @@ -55,8 +56,9 @@ func (s *TaskUtilTestSuite) SetUpSuite(c *check.C) { s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl) s.mockSchedulerMgr = mock.NewMockSchedulerMgr(s.mockCtl) s.mockOriginClient = cMock.NewMockOriginHTTPClient(s.mockCtl) + s.mockHaMgr = mock.NewMockHaMgr(s.mockCtl) s.taskManager, _ = NewManager(config.NewConfig(), s.mockPeerMgr, s.mockDfgetTaskMgr, - s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient, prometheus.NewRegistry()) + s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient, prometheus.NewRegistry(), s.mockHaMgr) s.mockOriginClient.EXPECT().GetContentLength(gomock.Any(), gomock.Any()).Return(int64(1000), 200, nil) } @@ -173,7 +175,8 @@ func (s *TaskUtilTestSuite) TestTriggerCdnSyncAction(c *check.C) { } for _, tc := range cases { - err = s.taskManager.triggerCdnSyncAction(context.Background(), tc.task) + httpReq := &types.TaskRegisterRequest{} + err = s.taskManager.triggerCdnSyncAction(context.Background(), tc.task, false, httpReq) c.Assert(err, check.Equals, tc.err) if !tc.skip { c.Assert(tc.total, check.Equals, diff --git a/supernode/daemon/mgr/task_mgr.go b/supernode/daemon/mgr/task_mgr.go index 515289d0c..071025575 100644 --- a/supernode/daemon/mgr/task_mgr.go +++ b/supernode/daemon/mgr/task_mgr.go @@ -38,7 +38,7 @@ type TaskMgr interface { // Register a task represents that someone wants to download a file. // Supernode will get the task file meta and return taskID. // NOTE: If supernode cannot find the task file, the CDN download will be triggered. - Register(ctx context.Context, taskCreateRequest *types.TaskCreateRequest) (taskCreateResponse *types.TaskCreateResponse, err error) + Register(ctx context.Context, req *types.TaskCreateRequest, httpReq *types.TaskRegisterRequest) (taskCreateResponse *types.TaskCreateResponse, err error) // Get the task Info with specified taskID. Get(ctx context.Context, taskID string) (*types.TaskInfo, error) @@ -71,4 +71,10 @@ type TaskMgr interface { // We use a sting called pieceRange to identify a piece. // A pieceRange separated by a dash, like this: 0-45565, etc. UpdatePieceStatus(ctx context.Context, taskID, pieceRange string, pieceUpdateRequest *types.PieceUpdateRequest) error + + // OnlyTriggerDownload triggers a cdn download for HA + OnlyTriggerDownload(ctx context.Context, req *types.TaskCreateRequest, httpREQ *types.TaskRegisterRequest) error + + // IsDownloadLocal judges wether the task download from this supernode for HA + IsDownloadLocal(ctx context.Context, taskID string) (isLocal bool, cdnPeerID string) } diff --git a/supernode/server/0.3_bridge.go b/supernode/server/0.3_bridge.go index bc3724a03..7740f5c23 100644 --- a/supernode/server/0.3_bridge.go +++ b/supernode/server/0.3_bridge.go @@ -26,6 +26,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/ha" sutil "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/go-openapi/strfmt" @@ -83,6 +84,7 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http HostName: strfmt.Hostname(request.HostName), Port: request.Port, Version: request.Version, + PeerID: request.PeerID, } peerCreateResponse, err := s.PeerMgr.Register(ctx, peerCreateRequest) if err != nil { @@ -106,7 +108,11 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http SupernodeIP: request.SuperNodeIP, } s.OriginClient.RegisterTLSConfig(taskCreateRequest.RawURL, request.Insecure, request.RootCAs) - resp, err := s.TaskMgr.Register(ctx, taskCreateRequest) + if s.Config.UseHA { + // record the PeerID and for sending request copy to other supernode + request.PeerID = peerID + } + resp, err := s.TaskMgr.Register(ctx, taskCreateRequest, request) if err != nil { logrus.Errorf("failed to register task %+v: %v", taskCreateRequest, err) return err @@ -127,22 +133,27 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req params := req.URL.Query() taskID := params.Get("taskId") srcCID := params.Get("srcCid") - + // try to get dstPID + dstCID := params.Get("dstCid") request := &types.PiecePullRequest{ DfgetTaskStatus: statusMap[params.Get("status")], PieceRange: params.Get("range"), PieceResult: resultMap[params.Get("result")], + DstCid: dstCID, } - // try to get dstPID - dstCID := params.Get("dstCid") - if !stringutils.IsEmptyStr(dstCID) { - dstDfgetTask, err := s.DfgetTaskMgr.Get(ctx, dstCID, taskID) - if err != nil { - logrus.Warnf("failed to get dfget task by dstCID(%s) and taskID(%s), and the srcCID is %s, err: %v", - dstCID, taskID, srcCID, err) - } else { - request.DstPID = dstDfgetTask.PeerID + local, _ := s.TaskMgr.IsDownloadLocal(ctx, taskID) + if s.Config.UseHA && !local { + request.DstPID = s.Config.GetSuperPID() + } else { + if !stringutils.IsEmptyStr(dstCID) { + dstDfgetTask, err := s.DfgetTaskMgr.Get(ctx, dstCID, taskID) + if err != nil { + logrus.Warnf("failed to get dfget task by dstCID(%s) and taskID(%s), and the srcCID is %s, err: %v", + dstCID, taskID, srcCID, err) + } else { + request.DstPID = dstDfgetTask.PeerID + } } } @@ -204,15 +215,24 @@ func (s *Server) reportPiece(ctx context.Context, rw http.ResponseWriter, req *h dstCID := params.Get("dstCid") pieceRange := params.Get("pieceRange") - dstDfgetTask, err := s.DfgetTaskMgr.Get(ctx, dstCID, taskID) - if err != nil { - return err - } - request := &types.PieceUpdateRequest{ ClientID: srcCID, - DstPID: dstDfgetTask.PeerID, PieceStatus: types.PieceUpdateRequestPieceStatusSUCCESS, + DstCid: dstCID, + } + + local, cdnPID := s.TaskMgr.IsDownloadLocal(ctx, taskID) + if s.Config.UseHA && !local { + request.SendCopy = true + request.SendCopyPeerID = cdnPID + request.DstPID = s.Config.GetSuperPID() + } else { + dstDfgetTask, err := s.DfgetTaskMgr.Get(ctx, dstCID, taskID) + if err != nil { + return err + } + request.SendCopy = false + request.DstPID = dstDfgetTask.PeerID } if err := s.TaskMgr.UpdatePieceStatus(ctx, taskID, pieceRange, request); err != nil { @@ -230,6 +250,24 @@ func (s *Server) reportServiceDown(ctx context.Context, rw http.ResponseWriter, taskID := params.Get("taskId") cID := params.Get("cid") + local, cdnPID := s.TaskMgr.IsDownloadLocal(ctx, taskID) + if s.Config.UseHA && !local { + node, err := s.Config.GetOtherSupernodeInfoByPID(cdnPID) + if err != nil { + logrus.Errorf("failed to get supernode info by peerID %s,err: %v", cdnPID, err) + return err + } + request := ha.RPCServerDownRequest{ + TaskID: taskID, + CID: cID, + } + err = node.RPCClient.Call("RPCManager.RPCDfgetServerDown", request, nil) + if err != nil { + logrus.Errorf("failed to send server down request to supernode %s,err: %v", node.PID, err) + return err + } + } + // get peerID according to the CID and taskID dfgetTask, err := s.DfgetTaskMgr.Get(ctx, cID, taskID) if err != nil { diff --git a/supernode/server/server.go b/supernode/server/server.go index 7a313f5a0..d1ce37b2a 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -28,6 +28,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/cdn" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/dfgettask" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/gc" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/ha" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/progress" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/scheduler" @@ -50,6 +51,8 @@ type Server struct { GCMgr mgr.GCMgr OriginClient httpclient.OriginHTTPClient + HaMgr mgr.HaMgr + CdnMgr mgr.CDNMgr } // New creates a brand new server instance. @@ -92,8 +95,13 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) { return nil, err } + haMgr, err := ha.NewManager(cfg) + if err != nil { + return nil, err + } + taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, - schedulerMgr, originClient, register) + schedulerMgr, originClient, register, haMgr) if err != nil { return nil, err } @@ -110,6 +118,7 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) { DfgetTaskMgr: dfgetTaskMgr, ProgressMgr: progressMgr, GCMgr: GCMgr, + HaMgr: haMgr, OriginClient: originClient, }, nil }