diff --git a/.chglog/CHANGELOG.tpl.md b/.chglog/CHANGELOG.tpl.md new file mode 100755 index 0000000..d08c458 --- /dev/null +++ b/.chglog/CHANGELOG.tpl.md @@ -0,0 +1,41 @@ +{{ if .Versions -}} + + +{{ if .Unreleased.CommitGroups -}} +{{ range .Unreleased.CommitGroups -}} +### {{ .Title }} +{{ range .Commits -}} +- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }} +{{ end }} +{{ end -}} +{{ end -}} +{{ end -}} + +{{ range .Versions }} + +## {{ if .Tag.Previous }}[{{ .Tag.Name }}]{{ else }}{{ .Tag.Name }}{{ end }} - {{ datetime "2006-01-02" .Tag.Date }} +{{ range .CommitGroups -}} +### {{ .Title }} +{{ range .Commits -}} +- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }} +{{ end }} +{{ end -}} + +{{- if .NoteGroups -}} +{{ range .NoteGroups -}} +### {{ .Title }} +{{ range .Notes }} +{{ .Body }} +{{ end }} +{{ end -}} +{{ end -}} +{{ end -}} + +{{- if .Versions }} +[Unreleased]: {{ .Info.RepositoryURL }}/compare/{{ $latest := index .Versions 0 }}{{ $latest.Tag.Name }}...HEAD +{{ range .Versions -}} +{{ if .Tag.Previous -}} +[{{ .Tag.Name }}]: {{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }} +{{ end -}} +{{ end -}} +{{ end -}} \ No newline at end of file diff --git a/.chglog/config.yml b/.chglog/config.yml new file mode 100755 index 0000000..ee1e4cd --- /dev/null +++ b/.chglog/config.yml @@ -0,0 +1,27 @@ +style: github +template: CHANGELOG.tpl.md +info: + title: CHANGELOG + repository_url: https://github.com/miun173/rebalance +options: + commits: + # filters: + # Type: + # - feat + # - fix + # - perf + # - refactor + commit_groups: + # title_maps: + # feat: Features + # fix: Bug Fixes + # perf: Performance Improvements + # refactor: Code Refactoring + header: + pattern: "^(\\w*)\\:\\s(.*)$" + pattern_maps: + - Type + - Subject + notes: + keywords: + - BREAKING CHANGE \ No newline at end of file diff --git a/.gitignore b/.gitignore index 0dc7b4b..76203a6 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/.vscode \ No newline at end of file +/.vscode +/output diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..daeb1ce --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ + + + + +## v0.1.0 - 2019-12-27 +### Feature +- add round robin load balancer + + +[Unreleased]: https://github.com/miun173/rebalance/compare/v0.1.0...HEAD diff --git a/Makefile b/Makefile index e1c83c5..0dd3817 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,24 @@ +bench_cmd := go test -run=^$ github.com/miun173/rebalance/${package} -bench=. +run_cmd := go run cmd/* + +test: + go test -v -covermode=atomic ./... + run-proxy: - @go run cmd/proxy/main.go + @$(run_cmd) proxy run-sidecar: - @go run cmd/sidecar/main.go $(args) \ No newline at end of file + @$(run_cmd) sidecar join --url $(url) --service-ports $(service-ports) + +build: + @go build -ldflags="-w -s" -o output/rebalance ./cmd/... + +bench: + @cd proxy && $(bench_cmd) + +changelog: +ifdef version + @git-chglog --next-tag $(version) -o CHANGELOG.md +else + @git-chglog -o CHANGELOG.md +endif diff --git a/README.md b/README.md index 501aab1..81f2363 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,23 @@ Experimentation on server load balancing using Round Robin algorithm with self join. ## TODO -- [ ] proxy request to several ip - - [?] proxy request to an ip -- [ ] add self join & discovery \ No newline at end of file +- [x] proxy request to several ip + - [x] proxy request to an ip +- [ ] add self join & discovery + - [x] can join from proxied services +- [x] handle concurrent proxy requests + - [reff](https://kasvith.github.io/posts/lets-create-a-simple-lb-go) + +# Benchmarks +`$ make bench package=proxy` +``` +goos: linux +goarch: amd64 +pkg: github.com/miun173/rebalance/proxy +Benchmark4Upstream/200_microsecond/response/1000_req-4 1000000000 0.87 ns/op +Benchmark4Upstream/200_microsecond/response/10000_req-4 1 8659798397 ns/op +Benchmark4Upstream/20_microsecond/response/1000_req-4 1000000000 0.87 ns/op +Benchmark4Upstream/20_microsecond/response/10000_req-4 1 8673090255 ns/op +PASS +ok github.com/miun173/rebalance/proxy 79.571s +``` diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..4985185 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + rootCMD.Execute() +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..139ab77 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,78 @@ +package main + +import ( + "errors" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/miun173/rebalance/proxy" + "github.com/miun173/rebalance/sidecar" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + // used for flags + url string + servicePorts string + + rootCMD = &cobra.Command{} + proxyCMD = &cobra.Command{ + Use: "proxy", + Short: "a reverse proxy", + Run: runProxy, + } + + sidecarCMD = &cobra.Command{ + Use: "sidecar", + Short: "a sidecar proxy", + } + + joinCMD = &cobra.Command{ + Use: "join", + Short: "join a service into proxy", + Example: "join --url 'http://127.0.0.1:9000'", + Run: runJoinProxy, + } +) + +func init() { + joinCMD.PersistentFlags().StringVar(&url, "url", "", "proxy service url") + joinCMD.PersistentFlags().StringVar(&servicePorts, "service-ports", "80", "services ports that will be proxied 80,8080,9000") + + sidecarCMD.AddCommand(joinCMD) + rootCMD.AddCommand(sidecarCMD) + rootCMD.AddCommand(proxyCMD) +} + +func runProxy(cmd *cobra.Command, args []string) { + fmt.Println("starting loadbalancer at :9000") + + sp := proxy.NewServiceProxy() + + signalCh := make(chan os.Signal, 1) + defer close(signalCh) + + signal.Notify(signalCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + go sp.Start() + go sp.RunHealthCheck() + + <-signalCh + log.Println("exiting...") +} + +func runJoinProxy(cmd *cobra.Command, args []string) { + if url == "" { + log.Fatal(errors.New("url should be in form 'http://127.0.0.1:9000'")) + } + + ports := strings.Split(servicePorts, ",") + sc := sidecar.NewSideCar(url) + if err := sc.Join(ports...); err != nil { + log.Fatal(err) + } +} diff --git a/go.mod b/go.mod index 9fdb5cf..426109b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,9 @@ module github.com/miun173/rebalance go 1.12 + +require ( + github.com/sirupsen/logrus v1.4.2 + github.com/spf13/cobra v0.0.5 + github.com/stretchr/testify v1.4.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3095d3b --- /dev/null +++ b/go.sum @@ -0,0 +1,57 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/proxy/service.go b/proxy/service.go new file mode 100644 index 0000000..922d9c4 --- /dev/null +++ b/proxy/service.go @@ -0,0 +1,39 @@ +package proxy + +import ( + "net/http/httputil" + "net/url" + "sync" +) + +// Service :nodoc: +type Service struct { + Proxy *httputil.ReverseProxy + URL *url.URL + isAlive bool + mutex sync.RWMutex +} + +// NewService :nodoc: +func NewService(p *httputil.ReverseProxy, u *url.URL) *Service { + return &Service{ + Proxy: p, + URL: u, + isAlive: true, + } +} + +// SetAlive :nodoc: +func (s *Service) SetAlive(alive bool) { + s.mutex.Lock() + s.isAlive = alive + s.mutex.Unlock() +} + +// IsAlive :nodoc: +func (s *Service) IsAlive() (alive bool) { + s.mutex.Lock() + alive = s.isAlive + s.mutex.Unlock() + return +} diff --git a/proxy/service_pool.go b/proxy/service_pool.go new file mode 100644 index 0000000..4b84897 --- /dev/null +++ b/proxy/service_pool.go @@ -0,0 +1,263 @@ +package proxy + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "net/http/httputil" + "net/http/pprof" + "net/url" + "time" + + // pprof + _ "net/http/pprof" + + log "github.com/sirupsen/logrus" +) + +type key int + +const ( + _Attempts key = 0 + _Retry key = 1 + + _MaxRetries int = 3 + _MaxAttempt int = 3 +) + +// ServiceProxy :nodoc: +type ServiceProxy struct { + mapURL map[string]string + services []*Service + currentService int +} + +// NewServiceProxy :nodoc: +func NewServiceProxy() *ServiceProxy { + return &ServiceProxy{ + mapURL: make(map[string]string), + services: make([]*Service, 0), + } +} + +// NextIndex :nodoc: +func (sp *ServiceProxy) NextIndex() int { + return sp.currentService % len(sp.services) +} + +// Start round robin server :nodoc: +func (sp *ServiceProxy) Start() { + m := &http.ServeMux{} + m.HandleFunc("/", sp.Handler) + m.HandleFunc("/rebalance/join", sp.HandleJoin) + + // Register pprof handlers + m.HandleFunc("/debug/pprof/", pprof.Index) + m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + m.HandleFunc("/debug/pprof/profile", pprof.Profile) + m.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + m.HandleFunc("/debug/pprof/trace", pprof.Trace) + + if err := http.ListenAndServe(":9000", m); err != nil { + log.Fatal(err) + } +} + +// AddServer :nodoc: +func (sp *ServiceProxy) AddServer(targetURL string) error { + if _, ok := sp.mapURL[targetURL]; ok { + return errors.New("server url already added") + } + + serviceURL, err := url.Parse(targetURL) + if err != nil { + return err + } + + if ok := isServiceAlive(serviceURL); !ok { + return errors.New("cannot dial service") + } + + sp.mapURL[targetURL] = targetURL + + proxy := httputil.NewSingleHostReverseProxy(serviceURL) + transport := &http.Transport{ + DisableCompression: true, + DisableKeepAlives: false, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + } + proxy.Transport = transport + + sp.services = append(sp.services, NewService(proxy, serviceURL)) + + return nil +} + +// getClientIP extracts the user IP address from req, if present. +func getClientIP(req *http.Request) (net.IP, error) { + ip, _, err := net.SplitHostPort(req.RemoteAddr) + if err != nil { + return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr) + } + + userIP := net.ParseIP(ip) + if userIP == nil { + return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr) + } + return userIP, nil +} + +// HandleJoin :nodoc: +func (sp *ServiceProxy) HandleJoin(w http.ResponseWriter, r *http.Request) { + ip, err := getClientIP(r) + if err != nil { + log.Fatal(err) + } + + port := ":" + r.URL.Query().Get("port") + + log.Println("requst join from host ", ip.String()+port) + if err := sp.AddServer("http://" + ip.String() + port); err != nil { + log.Error(err) + w.WriteHeader(http.StatusBadRequest) + resp, err := json.Marshal(map[string]interface{}{"error": err.Error()}) + if err != nil { + log.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Write(resp) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("success join")) +} + +// FindNextService find next alive service +func (sp *ServiceProxy) FindNextService() *Service { + if len(sp.services) == 0 { + return nil + } + + next := sp.NextIndex() + n := len(sp.services) + next + nservice := len(sp.services) + + for i := next; i < n; i++ { + idx := i % nservice + if sp.services[idx].IsAlive() { + sp.currentService = (sp.currentService + 1) % nservice + return sp.services[next] + } + } + + return nil +} + +// Handler :nodoc: +func (sp *ServiceProxy) Handler(w http.ResponseWriter, r *http.Request) { + // if the same request routing for few attempts with different backends, increase the count + attempts := getRetryAttemptsFromCtx(r, _Attempts) + if attempts > 3 { + log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) + http.Error(w, "Service not available", http.StatusServiceUnavailable) + return + } + + service := sp.FindNextService() + if service == nil { + http.Error(w, "service not available", http.StatusServiceUnavailable) + return + } + + service.Proxy.ErrorHandler = sp.ProxyErrorHandler(service) + service.Proxy.ServeHTTP(w, r) +} + +// ProxyErrorHandler :nodoc: +func (sp *ServiceProxy) ProxyErrorHandler(service *Service) func(w http.ResponseWriter, r *http.Request, err error) { + return func(w http.ResponseWriter, r *http.Request, err error) { + if err != nil { + log.Error(err) + } + + retries := getRetryAttemptsFromCtx(r, _Retry) + if retries < _MaxRetries { + select { + case <-time.After(10 * time.Millisecond): + ctx := context.WithValue(r.Context(), _Retry, retries+1) + service.Proxy.ServeHTTP(w, r.WithContext(ctx)) + } + + return + } + + service.SetAlive(false) + + // if the same request routing for few attempts with different backends, increase the count + attempts := getRetryAttemptsFromCtx(r, _Attempts) + log.Printf("%s(%s) attempting retry %d\n", r.RemoteAddr, r.URL.Path, attempts) + service := sp.FindNextService() + if service == nil { + http.Error(w, "service not available", http.StatusInternalServerError) + return + } + + ctx := context.WithValue(r.Context(), _Attempts, attempts+1) + sp.Handler(w, r.WithContext(ctx)) + } +} + +// HealthCheck check services health status +// mark service as alive if helathy +func (sp *ServiceProxy) HealthCheck() { + for _, s := range sp.services { + status := "up" + alive := isServiceAlive(s.URL) + s.SetAlive(alive) + if !alive { + status = "down" + } + + log.Printf("%s [%s]\n", s.URL, status) + } +} + +// RunHealthCheck run HealthCheck every 20 second +func (sp *ServiceProxy) RunHealthCheck() { + t := time.NewTicker(20 * time.Second) + for { + select { + case <-t.C: + log.Println("Starting health check...") + sp.HealthCheck() + log.Println("Health check completed") + } + } +} + +func getRetryAttemptsFromCtx(r *http.Request, retyAttempKey key) int { + if val, ok := r.Context().Value(retyAttempKey).(int); ok { + return val + } + + return 0 +} + +func isServiceAlive(u *url.URL) bool { + timeout := 2 * time.Second + conn, err := net.DialTimeout("tcp", u.Host, timeout) + if err != nil { + log.Println("Site unreachable, error: ", err) + return false + } + _ = conn.Close() + + return true +} diff --git a/proxy/service_pool_test.go b/proxy/service_pool_test.go new file mode 100644 index 0000000..a651136 --- /dev/null +++ b/proxy/service_pool_test.go @@ -0,0 +1,114 @@ +package proxy + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestHandler(t *testing.T) { + up1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello, client 1") + })) + + up2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello, client 2") + })) + + up3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello, client 3") + })) + + sp := NewServiceProxy() + sp.AddServer(up1.URL) + sp.AddServer(up2.URL) + sp.AddServer(up3.URL) + + ts := httptest.NewServer(http.HandlerFunc(sp.Handler)) + defer ts.Close() + + for i := 1; i <= 3; i++ { + res, err := http.Get(ts.URL) + assert.NoError(t, err) + + greeting, err := ioutil.ReadAll(res.Body) + res.Body.Close() + assert.NoError(t, err) + + assert.Equal(t, fmt.Sprintf("Hello, client %d", i), string(greeting)) + } + + res, err := http.Get(ts.URL) + assert.NoError(t, err) + + greeting, err := ioutil.ReadAll(res.Body) + res.Body.Close() + assert.NoError(t, err) + + assert.Equal(t, "Hello, client 1", string(greeting)) +} + +func Benchmark4Upstream(b *testing.B) { + b.Run("200 microsecond/response", func(b *testing.B) { + sp := NewServiceProxy() + for i := 0; i < 4; i++ { + up := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(time.Microsecond * 200) + fmt.Fprint(w, fmt.Sprintf("Hello, client %d", i)) + })) + + sp.AddServer(up.URL) + } + + ts := httptest.NewServer(http.HandlerFunc(sp.Handler)) + defer ts.Close() + + b.Run("1000 req", func(b *testing.B) { + for i := 0; i < 1000; i++ { + res, _ := http.Get(ts.URL) + res.Body.Close() + } + }) + + b.Run("10000 req", func(b *testing.B) { + for i := 0; i < 10000; i++ { + res, _ := http.Get(ts.URL) + res.Body.Close() + } + }) + }) + + b.Run("20 microsecond/response", func(b *testing.B) { + sp := NewServiceProxy() + for i := 0; i < 4; i++ { + up := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(time.Microsecond * 200) + fmt.Fprint(w, fmt.Sprintf("Hello, client %d", i)) + })) + + sp.AddServer(up.URL) + } + + ts := httptest.NewServer(http.HandlerFunc(sp.Handler)) + defer ts.Close() + + b.Run("1000 req", func(b *testing.B) { + for i := 0; i < 1000; i++ { + res, _ := http.Get(ts.URL) + res.Body.Close() + } + }) + + b.Run("10000 req", func(b *testing.B) { + for i := 0; i < 10000; i++ { + res, _ := http.Get(ts.URL) + res.Body.Close() + } + }) + }) +} diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index 1679f8f..70a8b0c 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -2,11 +2,10 @@ package sidecar import ( "errors" + "fmt" "io/ioutil" "log" "net/http" - - "github.com/miun173/rebalance/sidecar/config" ) // SideCar :nodoc: @@ -20,8 +19,24 @@ func NewSideCar(balancerURL string) *SideCar { } // Join joind load balancer cluster -func (sc *SideCar) Join() error { - resp, err := http.Get(sc.balancerURL + "/rebalance/join?port=" + config.ClientServicePort()) +func (sc *SideCar) Join(ports ...string) error { + if len(ports) == 0 { + url := sc.balancerURL + "/rebalance/join?port=80" + return join(url) + } + + for _, port := range ports { + url := fmt.Sprintf("%s%s", sc.balancerURL+"/rebalance/join?port=", port) + if err := join(url); err != nil { + return err + } + } + + return nil +} + +func join(url string) error { + resp, err := http.Get(url) if err != nil { return err }