Skip to content

Commit

Permalink
feat(cluster): initial commit for scale-out cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha authored and vrajashkr committed May 7, 2024
1 parent be5ad66 commit 0abe44c
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6
github.com/aws/aws-secretsmanager-caching-go v1.1.3
github.com/containers/image/v5 v5.30.0
github.com/dchest/siphash v1.2.3
github.com/google/go-github/v52 v52.0.0
github.com/gorilla/securecookie v1.1.2
github.com/gorilla/sessions v1.2.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936 h1:foGzavPWwtoyBvjWyKJYDYsyzy+23iBV7NKTwdk+LRY=
github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936/go.mod h1:ttKPnOepYt4LLzD+loXQ1rT6EmpyIYHro7TAJuIIlHo=
github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg=
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ type SchedulerConfig struct {
NumWorkers int
}

// ClusterConfig is the scale-out configuration which is identical for all
// replicas
type ClusterConfig struct {
Members []string
HashKey string
}

type LDAPCredentials struct {
BindDN string
BindPassword string
Expand Down Expand Up @@ -230,6 +237,7 @@ type Config struct {
Log *LogConfig
Extensions *extconf.ExtensionConfig
Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"`
Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"`
}

func New() *Config {
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ func (c *Controller) Run() error {
engine.Use(SessionAuditLogger(c.Audit))
}

/*
if c.Cluster != nil {
engine.Use(ProxyCluster)
}
*/

c.Router = engine
c.Router.UseEncodedPath()

Expand Down
88 changes: 88 additions & 0 deletions pkg/cluster/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cluster

import (
"fmt"
"net/http"

"github.com/dchest/siphash"

"zotregistry.dev/zot/pkg/api"
"zotregistry.dev/zot/pkg/api/constants"
zreg "zotregistry.dev/zot/pkg/regexp"
)

type ProxyRouteHandler struct {
c *api.Controller
}

func NewRouteHandler(c *api.Controller) *RouteHandler {
rh := &ProxyRouteHandler{c: c}
rh.SetupRoutes()

// FIXME: this is a scale-out load balancer cluster so doesn't do replicas

return rh
}

func (rh *ProxyRouteHandler) SetupRoutes() {
prefixedRouter := rh.c.Router.PathPrefix(constants.RoutePrefix).Subrouter()
prefixedDistSpecRouter := prefixedRouter.NewRoute().Subrouter()

prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/", zreg.NameRegexp.String()), proxyRequestResponse(rh.c.Config)())
}

func proxyRequestResponse(config rh.c.Config) func(http.HandlerFunc) http.HandlerFunc {
return func(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
// if no cluster or single-node cluster, handle locally
if config.Cluster == nil || len(config.Cluster.Members) {
next.ServeHTTP(response, request)
}

vars := mux.Vars(request)

name, ok := vars["name"]

if !ok || name == "" {
response.WriteHeader(http.StatusNotFound)

return
}

h := siphash.New(key)
h.Write([]byte(name)
sum64 := h.Sum64(nil)

member := config.Cluster.Members[sum64%len(config.Cluster.Members)]
/*
// from the member list and our DNS/IP address, figure out if this request should be handled locally
if member == localMember {
next.ServeHTTP(response, request)
}
*/
handleHTTP(response, request)
})
}
}

func handleHTTP(w http.ResponseWriter, req *http.Request) {
resp, err := http.DefaultTransport.RoundTrip(req)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}

func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}

0 comments on commit 0abe44c

Please sign in to comment.