From 65b0fdb80bf5084dfbd53d84f63d116aa704a84d Mon Sep 17 00:00:00 2001 From: yunfeiyangbuaa Date: Fri, 19 Jul 2019 09:39:04 +0800 Subject: [PATCH] feature:implement supernode status switch Signed-off-by: yunfeiyangbuaa --- common/constants/dfget_super_code.go | 9 ++ go.mod | 1 + go.sum | 29 ++++++ supernode/config/config.go | 13 +++ supernode/daemon/mgr/ha/etcd_tool.go | 144 +++++++++++++++++++++++++++ supernode/daemon/mgr/ha/manager.go | 129 ++++++++++++++++++++++++ supernode/daemon/mgr/ha/tool.go | 22 ++++ supernode/daemon/mgr/ha_mgr.go | 19 ++++ 8 files changed, 366 insertions(+) mode change 100644 => 100755 go.mod mode change 100644 => 100755 go.sum create mode 100644 supernode/daemon/mgr/ha/etcd_tool.go create mode 100644 supernode/daemon/mgr/ha/manager.go create mode 100644 supernode/daemon/mgr/ha/tool.go create mode 100644 supernode/daemon/mgr/ha_mgr.go diff --git a/common/constants/dfget_super_code.go b/common/constants/dfget_super_code.go index b59c3fc75..62af99e01 100644 --- a/common/constants/dfget_super_code.go +++ b/common/constants/dfget_super_code.go @@ -95,3 +95,12 @@ const ( ClientErrorFileNotExist = "FILE_NOT_EXIST" ClientErrorFileMd5NotMatch = "FILE_MD5_NOT_MATCH" ) + +/*the code od supernode ha status*/ +const ( + SupernodeUseHaFalse = 900 + SupernodeUseHaStandby = 901 + SupernodeUseHaInit = 902 + SupernodeUseHaActive = 903 + SupernodeUsehakill = 904 +) diff --git a/go.mod b/go.mod old mode 100644 new mode 100755 index 7f5675924..5f33d16d3 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/stretchr/testify v1.2.2 github.com/valyala/fasthttp v1.3.0 github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9 + go.etcd.io/etcd v3.3.13+incompatible gopkg.in/gcfg.v1 v1.2.3 gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect gopkg.in/warnings.v0 v0.1.2 diff --git a/go.sum b/go.sum old mode 100644 new mode 100755 index 405f174ed..4ee4b1354 --- a/go.sum +++ b/go.sum @@ -16,19 +16,26 @@ github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible h1:jFneRYjIvLMLhDLCzuTuU4rSJUjRplcJQ7pD7MnhC04= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.7 h1:DVS0EPFHUiaJSaX2EKlaf65HUmk9PXhOl/Xa3Go242Q= github.com/cpuguy83/go-md2man v1.0.7/go.mod h1:N6JayAiVKtlHSnuTCeuLSQVs75hb8q+dYQLjr7cDsKY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-check/check v0.0.0-20161208181325-20d25e280405 h1:0kdUKH22y+PT7ZITTEcrrHsQfGmpi4fj0XGyoDe/krQ= github.com/go-check/check v0.0.0-20161208181325-20d25e280405/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= @@ -57,8 +64,11 @@ github.com/go-openapi/validate v0.0.0-20170705144413-8a82927c942c h1:+cB2AzkH5an github.com/go-openapi/validate v0.0.0-20170705144413-8a82927c942c/go.mod h1:ve8xoSHgqBUifiKgaVbxLmOE0ckvH0oXfsJcnm6SIz0= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -67,20 +77,26 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/context v0.0.0-20181012153548-51ce91d2eadd h1:bB2XEQHhNsTTpqNzsq5ObUuqR7RNIdpm5Phb6AjeejE= github.com/gorilla/context v0.0.0-20181012153548-51ce91d2eadd/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.5.0 h1:mq8bRov+5x+pZNR/uAHyUEgovR9gLgYFwDQIeuYi9TM= github.com/gorilla/mux v1.5.0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.0 h1:bM6ZAFZmc/wPFaRDi0d5L7hGEZEx/2u+Tmr2evNHDiI= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -136,6 +152,7 @@ github.com/russross/blackfriday v0.0.0-20171011182219-6d1ef893fcb0 h1:hgS5QyP981 github.com/russross/blackfriday v0.0.0-20171011182219-6d1ef893fcb0/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= @@ -155,7 +172,9 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= @@ -164,11 +183,18 @@ github.com/valyala/fasthttp v1.3.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9 h1:WXBMTckrTcndPgRZBAEjqev+eN8MI9wbUQQUHlrUEV4= github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= +go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= @@ -195,6 +221,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -202,8 +229,10 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/supernode/config/config.go b/supernode/config/config.go index 026be599c..7d47c2bbd 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -85,6 +85,8 @@ func NewBaseProperties() *BaseProperties { MaxBandwidth: 200, EnableProfiler: false, Debug: false, + UseHA: false, + HAConfig: []string{"127.0.0.1:2379"}, } } @@ -172,6 +174,17 @@ type BaseProperties struct { // superNodePID is the ID of supernode, which is the same as peer ID of dfget. superNodePID string + + //UserhA is the mark of whether the supernode use the ha model. + //ha means that if the active supernode is off,the standby supernode can take over active supernode's work. + //and the whole system can work as before. + //default:false. + UseHA bool `yaml:"useHa"` + + //HAConfig is available when UseHa is true. + //HAConfig configs the tool's ip and port we use to implement ha. + //default:[] int {127.0.0.1:2379}. + HAConfig []string `yaml:"haConfig"` } // TransLimit trans rateLimit from MB/s to B/s. diff --git a/supernode/daemon/mgr/ha/etcd_tool.go b/supernode/daemon/mgr/ha/etcd_tool.go new file mode 100644 index 000000000..b880774f0 --- /dev/null +++ b/supernode/daemon/mgr/ha/etcd_tool.go @@ -0,0 +1,144 @@ +package ha + +import ( + "context" + "time" + + "github.com/dragonflyoss/Dragonfly/supernode/config" + + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/clientv3" +) + +//EtcdMgr is the struct to manager etcd. +type EtcdMgr struct { + config clientv3.Config + client *clientv3.Client + leaseTTL int64 + leaseKeepAliveRsp <-chan *clientv3.LeaseKeepAliveResponse + hostIP string + leaseResp *clientv3.LeaseGrantResponse +} + +const ( + //ActiveSupernodeOFF means there is no active supernode. + ActiveSupernodeOFF = "" + //ActiveSupernodeChange means active supernode change to standby supernode because of unhealthy. + ActiveSupernodeChange = 0 + //ActiveSupernodeKeep means active supernode is health. + ActiveSupernodeKeep = 1 +) + +//NewEtcdMgr produce a etcdmgr object. +func NewEtcdMgr(cfg *config.Config) (*EtcdMgr, error) { + config := clientv3.Config{ + Endpoints: cfg.HAConfig, + DialTimeout: 10 * time.Second, + } + // build connection to etcd. + client, err := clientv3.New(config) + return &EtcdMgr{ + hostIP: cfg.AdvertiseIP, + config: config, + client: client, + }, err +} + +//TODO(yunfeiyangbuaa): handle these log,errors and exceptions in the future + +//WatchActiveChange is the progress to watch the etcd,if the value of key /lock/active changes,supernode will be notified. +func (etcd *EtcdMgr) WatchActiveChange(messageChannel chan string) { + var watchStartRevision int64 + watcher := clientv3.NewWatcher(etcd.client) + watchChan := watcher.Watch(context.TODO(), "/lock/active", clientv3.WithRev(watchStartRevision)) + for watchResp := range watchChan { + for _, event := range watchResp.Events { + switch event.Type { + case ActiveSupernodeChange: + messageChannel <- "string(event.Kv.Value)" + case ActiveSupernodeKeep: + messageChannel <- ActiveSupernodeOFF + } + } + } +} + +//ObtainActiveInfo obtain the active supernode's information from etcd. +func (etcd *EtcdMgr) ObtainActiveInfo(key string) (string, error) { + kv := clientv3.NewKV(etcd.client) + var ( + getRes *clientv3.GetResponse + err error + ) + if getRes, err = kv.Get(context.TODO(), key); err != nil { + logrus.Errorf("failed to get the active supernode's info: %v", err) + } + var value string + for _, v := range getRes.Kvs { + value = string(v.Value) + } + return value, err +} + +//ActiveResureItsStatus keep look on the lease's renew response. +func (etcd *EtcdMgr) ActiveResureItsStatus(finished chan bool) bool { + for { + select { + case keepResp := <-etcd.leaseKeepAliveRsp: + if keepResp == nil { + logrus.Info("failed to renew the etcd lease") + finished <- true + return false + } + } + } +} + +//TryBeActive try to change the supernode's status from standby to active. +func (etcd *EtcdMgr) TryBeActive(finished chan bool) (bool, string, error) { + kv := clientv3.NewKV(etcd.client) + //make a lease to obtain a lock + lease := clientv3.NewLease(etcd.client) + leaseResp, err := lease.Grant(context.TODO(), etcd.leaseTTL) + if err != nil { + logrus.Errorf("failed to create a etcd lease: %v", err) + } + keepRespChan, err := lease.KeepAlive(context.TODO(), leaseResp.ID) + etcd.leaseKeepAliveRsp = keepRespChan + if err != nil { + logrus.Errorf("failed to create etcd.leaseKeepAliveRsp: %v", err) + } + etcd.leaseResp = leaseResp + //if the lock is available,get the lock. + //else read the lock + txn := kv.Txn(context.TODO()) + txn.If(clientv3.Compare(clientv3.CreateRevision("/lock/active"), "=", 0)). + Then(clientv3.OpPut("/lock/active", etcd.hostIP, clientv3.WithLease(leaseResp.ID))). + Else(clientv3.OpGet("/lock/active")) + txnResp, err := txn.Commit() + if err != nil { + logrus.Errorf("failed to commit a etcd transaction: %v", err) + } + if !txnResp.Succeeded { + _, err = lease.Revoke(context.TODO(), leaseResp.ID) + return false, string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value), err + } + return true, etcd.hostIP, err +} + +//ActiveKillItself cancels the renew of lease. +func (etcd *EtcdMgr) ActiveKillItself() bool { + _, err := etcd.client.Revoke(context.TODO(), etcd.leaseResp.ID) + if err != nil { + logrus.Errorf("failed to cancel a etcd lease: %v", err) + return false + } + logrus.Info("success to cancel a etcd lease") + return true +} + +//CloseTool close the tool used to implement supernode ha. +func (etcd *EtcdMgr) CloseTool() error { + logrus.Info("success to close a etcd client") + return etcd.client.Close() +} diff --git a/supernode/daemon/mgr/ha/manager.go b/supernode/daemon/mgr/ha/manager.go new file mode 100644 index 000000000..b25f37dfb --- /dev/null +++ b/supernode/daemon/mgr/ha/manager.go @@ -0,0 +1,129 @@ +package ha + +import ( + "github.com/dragonflyoss/Dragonfly/common/constants" + "github.com/dragonflyoss/Dragonfly/supernode/config" + + "github.com/sirupsen/logrus" +) + +//Manager is the struct to manager supernode ha. +type Manager struct { + advertiseIP string + useHa bool + nodeStatus int + tool Tool +} + +//NewManager produce the Manager object. +func NewManager(cfg *config.Config) (*Manager, error) { + toolMgr, err := NewEtcdMgr(cfg) + if err != nil { + logrus.Errorf("failed to initial the ha tool: %v", err) + return nil, err + } + return &Manager{ + advertiseIP: cfg.AdvertiseIP, + useHa: cfg.UseHA, + nodeStatus: constants.SupernodeUseHaInit, + tool: toolMgr, + }, nil +} + +//TODO(yunfeiyangbuaa): handle these log,errors and exceptions in the future + +//ElectDaemon is the main progress to implement active/standby switch. +func (ha *Manager) ElectDaemon(change chan int) { + messageChannel := make(chan string) + //a process to watch whether the active supernode is off. + go ha.watchActive(messageChannel) + //a process try to get the active supernode when the supernode is start. + go ha.tryStandbyToActive(change) + for { + if activeIP, ok := <-messageChannel; ok { + //the active node is off. + if activeIP == ActiveSupernodeOFF { + //if the previous active supernode is itself. + if ha.nodeStatus == constants.SupernodeUseHaActive { + ha.activeToStandby() + change <- constants.SupernodeUsehakill + } else { + ha.tryStandbyToActive(change) + } + } + } + } + +} + +//GetSupernodeStatus get supernode's status. +func (ha *Manager) GetSupernodeStatus() int { + if ha.useHa == false { + return constants.SupernodeUseHaFalse + } + return ha.nodeStatus +} + +//CompareAndSetSupernodeStatus set supernode's status. +func (ha *Manager) CompareAndSetSupernodeStatus(preStatus int, nowStatus int) bool { + if ha.nodeStatus == preStatus { + ha.nodeStatus = nowStatus + return true + } + logrus.Errorf("failed to set supernode status,the preStatus is %d not equal to %d", ha.nodeStatus, preStatus) + return false +} + +//CloseHaManager close the tool use to implement supernode ha. +func (ha *Manager) CloseHaManager() error { + return ha.tool.CloseTool() +} + +//GiveUpActiveStatus give up its active status because of unhealthy. +func (ha *Manager) GiveUpActiveStatus() bool { + return ha.tool.ActiveKillItself() +} + +//StandbyToActive change the status from standby to active. +func (ha *Manager) standbyToActive() { + if ha.nodeStatus == constants.SupernodeUseHaStandby { + ha.nodeStatus = constants.SupernodeUseHaActive + } else { + logrus.Warnf("%s is already active,can't set it active again", ha.advertiseIP) + } +} + +//ActiveToStandby change the status from active to standby. +func (ha *Manager) activeToStandby() { + if ha.nodeStatus == constants.SupernodeUseHaActive { + ha.nodeStatus = constants.SupernodeUseHaStandby + } else { + logrus.Warnf("%s is already standby,can't set it standby again", ha.advertiseIP) + } +} + +//TryStandbyToActive try to change the status from standby to active. +func (ha *Manager) tryStandbyToActive(change chan int) { + finished := make(chan bool, 10) + is, ip, err := ha.tool.TryBeActive(finished) + if err != nil { + logrus.Errorf("failed to try to change standby status to active status") + } + if is == true { + ha.standbyToActive() + logrus.Infof("%s obtain the active supernode status", ha.advertiseIP) + change <- constants.SupernodeUseHaActive + go ha.tool.ActiveResureItsStatus(finished) + <-finished + ha.activeToStandby() + logrus.Infof("%s finishes the active supernode status", ha.advertiseIP) + } else { + logrus.Infof("the other supernode %s obtain the active supernode status,keep watch on it", ip) + change <- constants.SupernodeUseHaStandby + } +} + +//WatchActive keep watch whether the active supernode is off. +func (ha *Manager) watchActive(messageChannel chan string) { + ha.tool.WatchActiveChange(messageChannel) +} diff --git a/supernode/daemon/mgr/ha/tool.go b/supernode/daemon/mgr/ha/tool.go new file mode 100644 index 000000000..9ff5e4a07 --- /dev/null +++ b/supernode/daemon/mgr/ha/tool.go @@ -0,0 +1,22 @@ +package ha + +//Tool is an interface that use etcd/zookeeper/yourImplement tools to make supernode be standby or active. +type Tool interface { + //WatchActiveChange keeps watching the status of active supernode. + WatchActiveChange(messageChannel chan string) + + //ObtainActiveInfo obtains the active supernode's info(Ip address and port). + ObtainActiveInfo(key string) (string, error) + + //TryBeActive try to make standby supernode to be active. + TryBeActive(finished chan bool) (bool, string, error) + + //ActiveResureItsStatus will keep to monitor to ensure this itself is still a active supernode now. + ActiveResureItsStatus(finished chan bool) bool + + //ActiveKillItself abandon the active status and the active supernode become standby supernode. + ActiveKillItself() bool + + //CloseTool close the tool. + CloseTool() error +} diff --git a/supernode/daemon/mgr/ha_mgr.go b/supernode/daemon/mgr/ha_mgr.go new file mode 100644 index 000000000..b4175b6e2 --- /dev/null +++ b/supernode/daemon/mgr/ha_mgr.go @@ -0,0 +1,19 @@ +package mgr + +//HaMgr is the interface to implement supernode Ha. +type HaMgr interface { + //ElectDaemonthe is the daemon progress to implement active/standby switch. + ElectDaemon(change chan int) + + //HagetSupernodeState get supernode's status. + GetSupernodeStatus() int + + //HaSetSupernodeState compare and set supernode's status. + CompareAndSetSupernodeStatus(preStatus int, nowStatus int) bool + + //CloseHaManager close the tool use to implement supernode ha + CloseHaManager() error + + //GiveUpActiveStatus give up its active status because of unhealthy + GiveUpActiveStatus() bool +}