From 0abe44c0018b2a577e0cd43d25622b7a15a809ce Mon Sep 17 00:00:00 2001 From: Ramkumar Chinchani Date: Mon, 13 Nov 2023 19:19:46 +0000 Subject: [PATCH] feat(cluster): initial commit for scale-out cluster Signed-off-by: Ramkumar Chinchani --- go.mod | 1 + go.sum | 2 + pkg/api/config/config.go | 8 ++++ pkg/api/controller.go | 7 ++++ pkg/cluster/proxy.go | 88 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 106 insertions(+) create mode 100644 pkg/cluster/proxy.go diff --git a/go.mod b/go.mod index 31441332e..73f9ef078 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 github.com/aws/aws-secretsmanager-caching-go v1.1.3 github.com/containers/image/v5 v5.30.0 + github.com/dchest/siphash v1.2.3 github.com/google/go-github/v52 v52.0.0 github.com/gorilla/securecookie v1.1.2 github.com/gorilla/sessions v1.2.2 diff --git a/go.sum b/go.sum index d24ccf77e..666d7269f 100644 --- a/go.sum +++ b/go.sum @@ -591,6 +591,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936 h1:foGzavPWwtoyBvjWyKJYDYsyzy+23iBV7NKTwdk+LRY= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936/go.mod h1:ttKPnOepYt4LLzD+loXQ1rT6EmpyIYHro7TAJuIIlHo= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index f474843ff..279764be5 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -121,6 +121,13 @@ type SchedulerConfig struct { NumWorkers int } +// ClusterConfig is the scale-out configuration which is identical for all +// replicas +type ClusterConfig struct { + Members []string + HashKey string +} + type LDAPCredentials struct { BindDN string BindPassword string @@ -230,6 +237,7 @@ type Config struct { Log *LogConfig Extensions *extconf.ExtensionConfig Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"` + Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"` } func New() *Config { diff --git a/pkg/api/controller.go b/pkg/api/controller.go index e8c167c38..3f0852856 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -124,6 +124,13 @@ func (c *Controller) Run() error { engine.Use(SessionAuditLogger(c.Audit)) } + /* + if c.Cluster != nil { + engine.Use(ProxyCluster) + } + + */ + c.Router = engine c.Router.UseEncodedPath() diff --git a/pkg/cluster/proxy.go b/pkg/cluster/proxy.go new file mode 100644 index 000000000..1a3c4e45a --- /dev/null +++ b/pkg/cluster/proxy.go @@ -0,0 +1,88 @@ +package cluster + +import ( + "fmt" + "net/http" + + "github.com/dchest/siphash" + + "zotregistry.dev/zot/pkg/api" + "zotregistry.dev/zot/pkg/api/constants" + zreg "zotregistry.dev/zot/pkg/regexp" +) + +type ProxyRouteHandler struct { + c *api.Controller +} + +func NewRouteHandler(c *api.Controller) *RouteHandler { + rh := &ProxyRouteHandler{c: c} + rh.SetupRoutes() + + // FIXME: this is a scale-out load balancer cluster so doesn't do replicas + + return rh +} + +func (rh *ProxyRouteHandler) SetupRoutes() { + prefixedRouter := rh.c.Router.PathPrefix(constants.RoutePrefix).Subrouter() + prefixedDistSpecRouter := prefixedRouter.NewRoute().Subrouter() + + prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/", zreg.NameRegexp.String()), proxyRequestResponse(rh.c.Config)()) +} + +func proxyRequestResponse(config rh.c.Config) func(http.HandlerFunc) http.HandlerFunc { + return func(next http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { + // if no cluster or single-node cluster, handle locally + if config.Cluster == nil || len(config.Cluster.Members) { + next.ServeHTTP(response, request) + } + + vars := mux.Vars(request) + + name, ok := vars["name"] + + if !ok || name == "" { + response.WriteHeader(http.StatusNotFound) + + return + } + + h := siphash.New(key) + h.Write([]byte(name) + sum64 := h.Sum64(nil) + + member := config.Cluster.Members[sum64%len(config.Cluster.Members)] + /* + + // from the member list and our DNS/IP address, figure out if this request should be handled locally + if member == localMember { + next.ServeHTTP(response, request) + } + + */ + handleHTTP(response, request) + }) + } +} + +func handleHTTP(w http.ResponseWriter, req *http.Request) { + resp, err := http.DefaultTransport.RoundTrip(req) + if err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + defer resp.Body.Close() + copyHeader(w.Header(), resp.Header) + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +}