diff --git a/.github/workflows/ecosystem-tools.yaml b/.github/workflows/ecosystem-tools.yaml index 05dd8c7197..0c268534b5 100644 --- a/.github/workflows/ecosystem-tools.yaml +++ b/.github/workflows/ecosystem-tools.yaml @@ -27,7 +27,7 @@ jobs: go install github.com/swaggo/swag/cmd/swag@v1.16.2 go mod download sudo apt-get update - sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap haproxy jq # install skopeo git clone -b v1.12.0 https://github.com/containers/skopeo.git cd skopeo @@ -80,4 +80,37 @@ jobs: env: AWS_ACCESS_KEY_ID: fake AWS_SECRET_ACCESS_KEY: fake + - name: Run cloud scale-out tests + id: scale + run: | + make run-cloud-scale-out-tests + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + continue-on-error: true + - name: print service logs for scale-out + run: | + find /tmp/zot-ft-logs -name '*.log' -print0 | xargs -0 cat + - name: multi-hop detection + id: multihop + run: | + if find /tmp/zot-ft-logs -name '*.log' -print0 | xargs -0 cat | grep 'cannot proxy an already proxied request'; then + echo "detected multi-hop" + exit 1 + else + exit 0 + fi + continue-on-error: true + - name: clean up scale-out logs + run: | + rm -r /tmp/zot-ft-logs + - name: fail job if error + if: ${{ steps.scale.outcome != 'success' || steps.multihop.outcome != 'success' }} + run: | + exit 1 + - name: Upload zb test results zip as build artifact + uses: actions/upload-artifact@v4 + with: + name: zb-cloud-scale-out-functional-results-${{ github.sha }} + path: ./zb-results/ - uses: ./.github/actions/teardown-localstack diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index 3df6e3b29e..fafd42348b 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -3,14 +3,19 @@ on: schedule: - cron: '30 1 * * *' workflow_dispatch: + pull_request: + branches: + - main permissions: read-all -# Here we are running two tests: +# The following tests are run: # 1. run zot with local storage and dedupe disabled, push images, restart zot with dedupe enabled # task scheduler will start a dedupe all blobs process at zot startup and it shouldn't interfere with clients. # 2. run zot with s3 storage and dynamodb and dedupe enabled, push images, restart zot with dedupe false and no cache # task scheduler will start a restore all blobs process at zot startup, after it finishes all blobs should be restored to their original state (have content) +# 3. run many, many, many instances of zot with shared storage and metadata front-ended by HAProxy. start a long-running zb run with high concurrency and number of requests +# to achieve a long-running sustained load on the system. The system is expected to perform well without errors and return performance data after the test. jobs: dedupe: name: Dedupe/restore blobs @@ -195,3 +200,82 @@ jobs: - name: Run tests run: | ./examples/kind/kind-ci.sh + + cloud-scale-out-performance: + name: s3+dynamodb scale-out performance + runs-on: ubuntu-latest-16-cores + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + cache: false + go-version: 1.22.x + - name: Install dependencies + run: | + cd $GITHUB_WORKSPACE + go install github.com/swaggo/swag/cmd/swag@v1.16.2 + go mod download + sudo apt-get update + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap haproxy jq + # install skopeo + git clone -b v1.12.0 https://github.com/containers/skopeo.git + cd skopeo + make bin/skopeo + sudo cp bin/skopeo /usr/bin + skopeo -v + cd $GITHUB_WORKSPACE + - name: Log in to GitHub Docker Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ github.token }} + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + - name: Install localstack + run: | + pip install --upgrade pyopenssl + pip install localstack==3.3.0 awscli-local[ver1] # install LocalStack cli and awslocal + docker pull ghcr.io/project-zot/ci-images/localstack:3.3.0 # Make sure to pull a working version of the image + localstack start -d # Start LocalStack in the background + + echo "Waiting for LocalStack startup..." # Wait 30 seconds for the LocalStack container + localstack wait -t 30 # to become ready before timing out + echo "Startup complete" + - name: Run cloud scale-out high scale performance tests + id: scale + run: | + make run-cloud-scale-out-high-scale-tests + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + continue-on-error: true + - name: print service logs + run: | + sudo dmesg + cat /tmp/zot-logs/*.log + - name: multi-hop detection + id: multihop + run: | + if cat /tmp/zot-logs/*.log | grep 'cannot proxy an already proxied request'; then + echo "detected multi-hop" + exit 1 + else + exit 0 + fi + continue-on-error: true + - name: clean up logs + run: | + rm -r /tmp/zot-logs + - name: fail job if error + if: ${{ steps.scale.outcome != 'success' || steps.multihop.outcome != 'success' }} + run: | + exit 1 + - name: Upload zb test results zip as build artifact + if: steps.scale.outcome == 'success' + uses: actions/upload-artifact@v4 + with: + name: zb-cloud-scale-out-perf-results-${{ github.sha }} + path: ./zb-results/ + - uses: ./.github/actions/teardown-localstack diff --git a/Makefile b/Makefile index 75cb709206..2afab068e5 100644 --- a/Makefile +++ b/Makefile @@ -489,6 +489,17 @@ run-blackbox-tests: $(BATS_TEST_FILE_PATH) check-blackbox-prerequisites binary b echo running bats test "$(BATS_TEST_FILE_PATH)"; \ $(BATS) $(BATS_FLAGS) $(BATS_TEST_FILE_PATH) +.PHONY: run-cloud-scale-out-tests +run-cloud-scale-out-tests: check-blackbox-prerequisites check-awslocal binary bench test-prereq + echo running scale out bats test; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_no_auth.bats; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_basic_auth_tls.bats + +.PHONY: run-cloud-scale-out-high-scale-tests +run-cloud-scale-out-high-scale-tests: check-blackbox-prerequisites check-awslocal binary bench test-prereq + echo running cloud scale out bats high scale test; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats + .PHONY: run-blackbox-ci run-blackbox-ci: check-blackbox-prerequisites binary binary-minimal cli echo running CI bats tests concurently diff --git a/examples/scale-out-cluster-cloud/config-cluster-member0.json b/examples/scale-out-cluster-cloud/config-cluster-member0.json new file mode 100644 index 0000000000..f00b969ce3 --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member0.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9000" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/config-cluster-member1.json b/examples/scale-out-cluster-cloud/config-cluster-member1.json new file mode 100644 index 0000000000..468dabb48a --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member1.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9001" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/config-cluster-member2.json b/examples/scale-out-cluster-cloud/config-cluster-member2.json new file mode 100644 index 0000000000..84981d743a --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member2.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9002" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/haproxy.cfg b/examples/scale-out-cluster-cloud/haproxy.cfg new file mode 100644 index 0000000000..ffb5664e99 --- /dev/null +++ b/examples/scale-out-cluster-cloud/haproxy.cfg @@ -0,0 +1,26 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + daemon + +defaults + log global + mode http + option httplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 + server zot2 127.0.0.1:9002 cookie zot2 diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json new file mode 100644 index 0000000000..5d7f219612 --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9000", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json new file mode 100644 index 0000000000..bd269e3ff8 --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9001", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json new file mode 100644 index 0000000000..5b87b5bbb8 --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9002", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/haproxy.cfg b/examples/scale-out-cluster-cloud/tls/haproxy.cfg new file mode 100644 index 0000000000..5a39636b1d --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/haproxy.cfg @@ -0,0 +1,25 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + +defaults + log global + mode tcp + option tcplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 + server zot2 127.0.0.1:9002 cookie zot2 diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index 279764be5d..f06fdd5ca2 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -121,11 +121,19 @@ type SchedulerConfig struct { NumWorkers int } -// ClusterConfig is the scale-out configuration which is identical for all -// replicas +// contains the scale-out configuration which is identical for all zot replicas. type ClusterConfig struct { - Members []string - HashKey string + // contains the "host:port" of all the zot instances participating + // in the cluster. + Members []string `json:"members" mapstructure:"members"` + + // contains the hash key that is required for siphash. + // must be a 128-bit (16-byte) key + // https://github.com/dchest/siphash?tab=readme-ov-file#func-newkey-byte-hashhash64 + HashKey string `json:"hashKey" mapstructure:"hashKey"` + + // contains client TLS config. + TLS *TLSConfig `json:"tls" mapstructure:"tls"` } type LDAPCredentials struct { @@ -237,7 +245,7 @@ type Config struct { Log *LogConfig Extensions *extconf.ExtensionConfig Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"` - Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"` + Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"` } func New() *Config { diff --git a/pkg/api/constants/consts.go b/pkg/api/constants/consts.go index df3cec8eac..5afe00a9f9 100644 --- a/pkg/api/constants/consts.go +++ b/pkg/api/constants/consts.go @@ -31,4 +31,9 @@ const ( DeletePermission = "delete" // behaviour actions. DetectManifestCollisionPermission = "detectManifestCollision" + // zot scale-out hop count header. + ScaleOutHopCountHeader = "X-Zot-Cluster-Hop-Count" + // log string keys. + // these can be used together with the logger to add context to a log message. + RepositoryLogKey = "repository" ) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 3f0852856f..ffa6d11d5c 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -58,6 +58,13 @@ func NewController(config *config.Config) *Controller { var controller Controller logger := log.NewLogger(config.Log.Level, config.Log.Output) + + if config.Cluster != nil { + // add global context of address:port for the instance's loggers only if + // scale-out cluster mode is enabled + host := fmt.Sprintf("%s:%s", config.HTTP.Address, config.HTTP.Port) + logger.Logger = logger.Logger.With().Str("host", host).Logger() + } controller.Config = config controller.Log = logger @@ -124,13 +131,6 @@ func (c *Controller) Run() error { engine.Use(SessionAuditLogger(c.Audit)) } - /* - if c.Cluster != nil { - engine.Use(ProxyCluster) - } - - */ - c.Router = engine c.Router.UseEncodedPath() diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 4d312ceee5..8903b9eee7 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -958,6 +958,434 @@ func TestBlobReferenced(t *testing.T) { }) } +// tests for shared-storage scale-out cluster. +func TestScaleOutRequestProxy(t *testing.T) { + // when there is only one member, no proxying is expected and the responses should be correct. + Convey("Given a zot scale out cluster in http mode with only 1 member", t, func() { + port := test.GetFreePort() + clusterMembers := make([]string, 1) + clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port) + + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + + Convey("Controller should start up and respond without error", func() { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Should upload images and fetch valid responses for repo tags list", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for _, repoName := range reposToTest { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0") + So(err, ShouldBeNil) + + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + }) + }) + + // when only one member in the cluster is online, an error is expected when there is a + // request proxied to an offline member. + Convey("Given a scale out http cluster with only 1 online member", t, func() { + port := test.GetFreePort() + clusterMembers := make([]string, 3) + clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port) + clusterMembers[1] = "127.0.0.1:1" + clusterMembers[2] = "127.0.0.1:2" + + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + + Convey("Controller should start up and respond without error", func() { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Should fail to upload an image that is proxied to another instance", func() { + repoName := "alpine" + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0") + So(err, ShouldNotBeNil) + So(err.Error(), ShouldEqual, "can't post blob") + + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when there are multiple members in a cluster, requests are expected to return + // the same data for any member due to proxying. + Convey("Given a zot scale out cluster in http mode with 3 members", t, func() { + numMembers := 3 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + Convey("All 3 controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Should upload images to repos and fetch same response from all 3 members", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for idx, repoName := range reposToTest { + img := CreateRandomImage() + + // Upload to each instance based on loop counter + err := UploadImage(img, test.GetBaseURL(ports[idx]), repoName, "1.0") + So(err, ShouldBeNil) + + // Query all 3 instances and expect the same response + for _, port := range ports { + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + } + }) + }) + + // this test checks for functionality when TLS and htpasswd auth are enabled. + // it primarily checks that headers are correctly copied over during the proxying process. + Convey("Given a zot scale out cluster in https mode with auth enabled", t, func() { + numMembers := 3 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + username, _ := test.GenerateRandomString() + password, _ := test.GenerateRandomString() + htpasswdPath := test.MakeHtpasswdFileFromString(test.GetCredString(username, password)) + defer os.Remove(htpasswdPath) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.HTTP.Auth = &config.AuthConfig{ + HTPasswd: config.AuthHTPasswd{ + Path: htpasswdPath, + }, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + Convey("All 3 controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().SetBasicAuth(username, password).Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Should upload images to repos and fetch same response from all 3 instances", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for idx, repoName := range reposToTest { + img := CreateRandomImage() + + // Upload to each instance based on loop counter + err := UploadImageWithBasicAuth(img, test.GetSecureBaseURL(ports[idx]), repoName, "1.0", username, password) + So(err, ShouldBeNil) + + // Query all 3 instances and expect the same response + for _, port := range ports { + resp, err := resty.R().SetBasicAuth(username, password).Get( + fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(port), repoName), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + } + }) + }) + + // when the RootCA file does not exist, expect an error + Convey("Given a zot scale out cluster in with 2 members and an incorrect RootCACert", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when the server cert file does not exist, expect an error while proxying + Convey("Given a zot scale out cluster in with 2 members and an incorrect server cert", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + Cert: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when the server key file does not exist, expect an error while proxying + Convey("Given a zot scale out cluster in with 2 members and an incorrect server key", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + Cert: ServerCert, + Key: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) +} + func TestPrintTracebackOnPanic(t *testing.T) { Convey("Run server on unavailable port", t, func() { port := test.GetFreePort() diff --git a/pkg/api/proxy.go b/pkg/api/proxy.go new file mode 100644 index 0000000000..829b5bdfdc --- /dev/null +++ b/pkg/api/proxy.go @@ -0,0 +1,310 @@ +package api + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + + "github.com/dchest/siphash" + "github.com/gorilla/mux" + + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/common" +) + +var ErrFailedToParseIPAddress = errors.New("failed to parse IP address") + +// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure +// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster. +// based on the hash value of the repository name, the request will either be handled locally +// or proxied to another zot member in the cluster to get the data before sending a response to the client. +func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { + return func(next http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { + config := ctrlr.Config + logger := ctrlr.Log + + // if no cluster or single-node cluster, handle locally. + if config.Cluster == nil || len(config.Cluster.Members) == 1 { + next.ServeHTTP(response, request) + + return + } + + // since the handler has been wrapped, it should be possible to get the name + // of the repository from the mux. + vars := mux.Vars(request) + name, ok := vars["name"] + + if !ok || name == "" { + response.WriteHeader(http.StatusNotFound) + + return + } + + // the target member is the only one which should do read/write for the dist-spec APIs + // for the given repository. + targetMember := computeTargetMember(config, name) + + // from the member list and our DNS/IP address, calculate if this request should be handled locally. + // handle locally, if the "address:port" is a match, else, proxy it to the target member. + localIPs, err := getLocalServerSockets(config.HTTP.Port) + if err != nil { + logger.Error().Str(constants.RepositoryLogKey, name). + Msg(fmt.Sprintf("failed to get local sockets. error: %s", err.Error())) + response.WriteHeader(http.StatusInternalServerError) + + return + } + + logger.Debug().Str(constants.RepositoryLogKey, name).Msg(fmt.Sprintf("local sockets: %v", localIPs)) + + targetMemberIPs, err := getTargetMemberServerSockets(targetMember) + if err != nil { + logger.Error().Str(constants.RepositoryLogKey, name). + Msg(fmt.Sprintf("failed to get target member sockets. error: %s", err.Error())) + response.WriteHeader(http.StatusInternalServerError) + + return + } + + logger.Debug().Str(constants.RepositoryLogKey, name).Msg(fmt.Sprintf("target member sockets: %v", targetMemberIPs)) + + // if the target member is the same as the local member, the current member should handle the request. + isLocal, err := isTargetMemberLocal(localIPs, targetMemberIPs) + if err != nil { + logger.Error().Str(constants.RepositoryLogKey, name). + Msg(fmt.Sprintf("failed to compare local and target sockets. error: %s", err.Error())) + response.WriteHeader(http.StatusInternalServerError) + + return + } + + if isLocal { + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally") + next.ServeHTTP(response, request) + + return + } + + // if the header contains a hop-count, return an error response as there should be no multi-hop + if request.Header.Get(constants.ScaleOutHopCountHeader) != "" { + logger.Fatal().Str("url", request.URL.String()). + Msg("failed to process request - cannot proxy an already proxied request") + + return + } + + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request") + + proxyResponse, err := proxyHTTPRequest(request.Context(), request, targetMember, ctrlr) + if err != nil { + http.Error(response, err.Error(), http.StatusInternalServerError) + logger.Error().Str(constants.RepositoryLogKey, name).Msg( + fmt.Sprintf("error while proxying request %s", err.Error()), + ) + + return + } + defer proxyResponse.Body.Close() + + copyHeader(response.Header(), proxyResponse.Header) + response.WriteHeader(proxyResponse.StatusCode) + _, _ = io.Copy(response, proxyResponse.Body) + }) + } +} + +// computes the target member using siphash. +// siphash was chosen to prevent against hash attacks where an attacker +// can target all requests to one given instance instead of balancing across the cluster +// resulting in a Denial-of-Service (DOS). +// ref: https://en.wikipedia.org/wiki/SipHash +func computeTargetMember(config *config.Config, name string) string { + h := siphash.New([]byte(config.Cluster.HashKey)) + h.Write([]byte(name)) + sum64 := h.Sum64() + + return config.Cluster.Members[sum64%uint64(len(config.Cluster.Members))] +} + +// returns if the target member input is the local instance or not. +// accepts a list of Local sockets (IP:Port) and a list of target sockets (IP:Port). +// return true if there is any match, otherwise, false. +func isTargetMemberLocal(localSockets []string, targetSockets []string) (bool, error) { + // if any of the targetMember IPs are present in localIPs, then it is a local member + for _, targetSocket := range targetSockets { + for _, localSocket := range localSockets { + localSplitIndex := strings.LastIndex(localSocket, ":") + local := []string{localSocket[:localSplitIndex], localSocket[localSplitIndex+1:]} + + localHost := net.ParseIP(local[0]) + if localHost == nil { + return false, fmt.Errorf("%w: %s", ErrFailedToParseIPAddress, local[0]) + } + localPort := local[1] + + targetSplitIndex := strings.LastIndex(targetSocket, ":") + target := []string{targetSocket[:targetSplitIndex], targetSocket[targetSplitIndex+1:]} + + targetHost := net.ParseIP(target[0]) + if targetHost == nil { + return false, fmt.Errorf("%w: %s", ErrFailedToParseIPAddress, target[0]) + } + targetPort := target[1] + + // we need to compare using the net package as IPv6 can be written in multiple forms - + // expanded and shortened which are inequal strings. + if localHost.Equal(targetHost) && (localPort == targetPort) { + return true, nil + } + } + } + + return false, nil +} + +// gets all the local server sockets (IP:Port). +func getLocalServerSockets(port string) ([]string, error) { + localIPs, err := common.GetLocalIPs() + if err != nil { + return []string{}, err + } + + localSockets := make([]string, len(localIPs)) + for idx, ip := range localIPs { + localSockets[idx] = fmt.Sprintf("%s:%s", ip, port) + } + + return localSockets, nil +} + +// gets all the server sockets of a target member (IP:Port). +// if the input is an IP address, returns the same targetMember in an array. +// if the input is a host name, performs a lookup and returns the server sockets. +func getTargetMemberServerSockets(targetMemberSocket string) ([]string, error) { + // host is the first part of the target member string + targetSplitIndex := strings.LastIndex(targetMemberSocket, ":") + target := []string{targetMemberSocket[:targetSplitIndex], targetMemberSocket[targetSplitIndex+1:]} + targetHost := target[0] + targetPort := target[1] + + addr := net.ParseIP(targetHost) + if addr != nil { + // this is an IP address, return as is + return []string{targetMemberSocket}, nil + } + // this is a hostname - try to resolve to an IP + resolvedAddrs, err := common.GetIPFromHostName(targetHost) + if err != nil { + return []string{}, err + } + + targetSockets := make([]string, len(resolvedAddrs)) + for idx, resolvedAddr := range resolvedAddrs { + targetSockets[idx] = fmt.Sprintf("%s:%s", resolvedAddr, targetPort) + } + + return targetSockets, nil +} + +// proxy the request to the target member and return a pointer to the response or an error. +func proxyHTTPRequest(ctx context.Context, req *http.Request, + targetMember string, ctrlr *Controller, +) (*http.Response, error) { + cloneURL := *req.URL + + proxyQueryScheme := "http" + if ctrlr.Config.HTTP.TLS != nil { + proxyQueryScheme = "https" + } + + cloneURL.Scheme = proxyQueryScheme + cloneURL.Host = targetMember + + clonedBody := cloneRequestBody(req) + + fwdRequest, err := http.NewRequestWithContext(ctx, req.Method, cloneURL.String(), clonedBody) + if err != nil { + return nil, err + } + + copyHeader(fwdRequest.Header, req.Header) + + // always set hop count to 0 for now. + // the handler wrapper above panics if it sees a request that + // already has a hop count but is due for proxying. + fwdRequest.Header.Set(constants.ScaleOutHopCountHeader, "0") + + clientOpts := common.HTTPClientOptions{ + TLSEnabled: ctrlr.Config.HTTP.TLS != nil, + VerifyTLS: ctrlr.Config.HTTP.TLS != nil, // for now, always verify TLS when TLS mode is enabled + Host: targetMember, + } + + tlsConfig := ctrlr.Config.Cluster.TLS + if tlsConfig != nil { + clientOpts.CertOptions.ClientCertFile = tlsConfig.Cert + clientOpts.CertOptions.ClientKeyFile = tlsConfig.Key + clientOpts.CertOptions.RootCaCertFile = tlsConfig.CACert + } + + httpClient, err := common.CreateHTTPClient(&clientOpts) + if err != nil { + return nil, err + } + + resp, err := httpClient.Do(fwdRequest) + if err != nil { + return nil, err + } + + var clonedRespBody bytes.Buffer + + // copy out the contents into a new buffer as the response body + // stream should be closed to get all the data out. + _, _ = io.Copy(&clonedRespBody, resp.Body) + resp.Body.Close() + + // after closing the original body, substitute it with a new reader + // using the buffer that was just created. + // this buffer should be closed later by the consumer of the response. + resp.Body = io.NopCloser(bytes.NewReader(clonedRespBody.Bytes())) + + return resp, nil +} + +func cloneRequestBody(src *http.Request) io.Reader { + var bCloneForOriginal, bCloneForCopy bytes.Buffer + multiWriter := io.MultiWriter(&bCloneForOriginal, &bCloneForCopy) + numBytesCopied, _ := io.Copy(multiWriter, src.Body) + + // if the body is a type of io.NopCloser and length is 0, + // the Content-Length header is not sent in the proxied request. + // explicitly returning http.NoBody allows the implementation + // to set the header. + // ref: https://github.com/golang/go/issues/34295 + if numBytesCopied == 0 { + src.Body = http.NoBody + + return http.NoBody + } + + src.Body = io.NopCloser(&bCloneForOriginal) + + return bytes.NewReader(bCloneForCopy.Bytes()) +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} diff --git a/pkg/api/routes.go b/pkg/api/routes.go index b9001b5f57..f71d7ae5c1 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -127,40 +127,66 @@ func (rh *RouteHandler) SetupRoutes() { prefixedDistSpecRouter.Use(DistSpecAuthzHandler(rh.c)) } + clusterRouteProxy := ClusterProxy(rh.c) + // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#endpoints + // dist-spec APIs that need to be proxied are wrapped in clusterRouteProxy for scale-out proxying. + // these are handlers that have a repository name. { prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( - applyCORSHeaders(rh.ListTags))).Methods(http.MethodGet, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( + applyCORSHeaders(rh.ListTags), + ), + ), + ).Methods(http.MethodGet, http.MethodOptions) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)( - applyCORSHeaders(rh.CheckManifest))).Methods(http.MethodHead, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)( + applyCORSHeaders(rh.CheckManifest), + ), + ), + ).Methods(http.MethodHead, http.MethodOptions) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - applyCORSHeaders(rh.GetManifest)).Methods(http.MethodGet) + clusterRouteProxy( + applyCORSHeaders(rh.GetManifest), + ), + ).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - rh.UpdateManifest).Methods(http.MethodPut) + clusterRouteProxy(rh.UpdateManifest)).Methods(http.MethodPut) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - applyCORSHeaders(rh.DeleteManifest)).Methods(http.MethodDelete) + clusterRouteProxy( + applyCORSHeaders(rh.DeleteManifest), + ), + ).Methods(http.MethodDelete) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.CheckBlob).Methods(http.MethodHead) + clusterRouteProxy(rh.CheckBlob)).Methods(http.MethodHead) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.GetBlob).Methods(http.MethodGet) + clusterRouteProxy(rh.GetBlob)).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.DeleteBlob).Methods(http.MethodDelete) + clusterRouteProxy(rh.DeleteBlob)).Methods(http.MethodDelete) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", zreg.NameRegexp.String()), - rh.CreateBlobUpload).Methods(http.MethodPost) + clusterRouteProxy(rh.CreateBlobUpload)).Methods(http.MethodPost) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.GetBlobUpload).Methods(http.MethodGet) + clusterRouteProxy(rh.GetBlobUpload)).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.PatchBlobUpload).Methods(http.MethodPatch) + clusterRouteProxy(rh.PatchBlobUpload)).Methods(http.MethodPatch) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.UpdateBlobUpload).Methods(http.MethodPut) + clusterRouteProxy(rh.UpdateBlobUpload)).Methods(http.MethodPut) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.DeleteBlobUpload).Methods(http.MethodDelete) + clusterRouteProxy(rh.DeleteBlobUpload)).Methods(http.MethodDelete) // support for OCI artifact references prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/referrers/{digest}", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( - applyCORSHeaders(rh.GetReferrers))).Methods(http.MethodGet, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( + applyCORSHeaders(rh.GetReferrers), + ), + ), + ).Methods(http.MethodGet, http.MethodOptions) + + // handlers which work fine with a single node do not need proxying. + // catalog handler doesn't require proxying as the metadata and storage are shared. + // discover and the default path handlers are node-specific so do not require proxying. prefixedRouter.HandleFunc(constants.ExtCatalogPrefix, getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( applyCORSHeaders(rh.ListRepositories))).Methods(http.MethodGet, http.MethodOptions) diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index 4916cd200f..46f16b0d71 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -457,6 +457,11 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error { } } + // check validity of scale out cluster config + if err := validateClusterConfig(config, log); err != nil { + return err + } + return nil } @@ -1103,3 +1108,27 @@ func validateSync(config *config.Config, log zlog.Logger) error { return nil } + +func validateClusterConfig(config *config.Config, log zlog.Logger) error { + if config.Cluster != nil { + if len(config.Cluster.Members) == 0 { + log.Error().Err(zerr.ErrBadConfig). + Msg("cannot have 0 members in a scale out cluster") + + return zerr.ErrBadConfig + } + + // the allowed length is 16 as the siphash requires a 128 bit key. + // that translates to 16 characters * 8 bits each. + allowedHashKeyLength := 16 + if len(config.Cluster.HashKey) != allowedHashKeyLength { + log.Error().Err(zerr.ErrBadConfig). + Str("hashkey", config.Cluster.HashKey). + Msg(fmt.Sprintf("hashKey for scale out cluster must have %d characters", allowedHashKeyLength)) + + return zerr.ErrBadConfig + } + } + + return nil +} diff --git a/pkg/cli/server/root_test.go b/pkg/cli/server/root_test.go index b0f7e94f70..9e7fab4e8a 100644 --- a/pkg/cli/server/root_test.go +++ b/pkg/cli/server/root_test.go @@ -2028,6 +2028,150 @@ func TestUpdateLDAPConfig(t *testing.T) { }) } +func TestClusterConfig(t *testing.T) { + baseExamplePath := "../../../examples/scale-out-cluster-cloud/" + + Convey("Should successfully load example configs for cloud", t, func() { + for memberIdx := 0; memberIdx < 3; memberIdx++ { + cfgFileToLoad := fmt.Sprintf("%s/config-cluster-member%d.json", baseExamplePath, memberIdx) + cfg := config.New() + err := cli.LoadConfiguration(cfg, cfgFileToLoad) + So(err, ShouldBeNil) + } + }) + + Convey("Should successfully load example TLS configs for cloud", t, func() { + for memberIdx := 0; memberIdx < 3; memberIdx++ { + cfgFileToLoad := fmt.Sprintf("%s/tls/config-cluster-member%d.json", baseExamplePath, memberIdx) + cfg := config.New() + err := cli.LoadConfiguration(cfg, cfgFileToLoad) + So(err, ShouldBeNil) + } + }) + + Convey("Should reject scale out cluster invalid cases", t, func() { + cfgFileContents, err := os.ReadFile(baseExamplePath + "config-cluster-member0.json") + So(err, ShouldBeNil) + + Convey("Should reject empty members list", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the members to an empty list + cfg.Cluster.Members = []string{} + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject missing members list", func() { + cfg := config.New() + + configStr := ` + { + "storage": { + "RootDirectory": "/tmp/example" + }, + "http": { + "address": "127.0.0.1", + "port": "800" + }, + "cluster" { + "hashKey": "loremipsumdolors" + } + }` + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + err = os.WriteFile(file.Name(), []byte(configStr), 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject missing hashkey", func() { + cfg := config.New() + + configStr := ` + { + "storage": { + "RootDirectory": "/tmp/example" + }, + "http": { + "address": "127.0.0.1", + "port": "800" + }, + "cluster" { + "members": ["127.0.0.1:9000"] + } + }` + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + err = os.WriteFile(file.Name(), []byte(configStr), 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject a hashkey that is too short", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the hashkey to a string shorter than 16 characters + cfg.Cluster.HashKey = "fifteencharacte" + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject a hashkey that is too long", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the hashkey to a string longer than 16 characters + cfg.Cluster.HashKey = "seventeencharacte" + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + }) +} + // run cli and return output. func runCLIWithConfig(tempDir string, config string) (string, error) { port := GetFreePort() diff --git a/pkg/cluster/proxy.go b/pkg/cluster/proxy.go deleted file mode 100644 index 1a3c4e45ad..0000000000 --- a/pkg/cluster/proxy.go +++ /dev/null @@ -1,88 +0,0 @@ -package cluster - -import ( - "fmt" - "net/http" - - "github.com/dchest/siphash" - - "zotregistry.dev/zot/pkg/api" - "zotregistry.dev/zot/pkg/api/constants" - zreg "zotregistry.dev/zot/pkg/regexp" -) - -type ProxyRouteHandler struct { - c *api.Controller -} - -func NewRouteHandler(c *api.Controller) *RouteHandler { - rh := &ProxyRouteHandler{c: c} - rh.SetupRoutes() - - // FIXME: this is a scale-out load balancer cluster so doesn't do replicas - - return rh -} - -func (rh *ProxyRouteHandler) SetupRoutes() { - prefixedRouter := rh.c.Router.PathPrefix(constants.RoutePrefix).Subrouter() - prefixedDistSpecRouter := prefixedRouter.NewRoute().Subrouter() - - prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/", zreg.NameRegexp.String()), proxyRequestResponse(rh.c.Config)()) -} - -func proxyRequestResponse(config rh.c.Config) func(http.HandlerFunc) http.HandlerFunc { - return func(next http.HandlerFunc) http.HandlerFunc { - return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { - // if no cluster or single-node cluster, handle locally - if config.Cluster == nil || len(config.Cluster.Members) { - next.ServeHTTP(response, request) - } - - vars := mux.Vars(request) - - name, ok := vars["name"] - - if !ok || name == "" { - response.WriteHeader(http.StatusNotFound) - - return - } - - h := siphash.New(key) - h.Write([]byte(name) - sum64 := h.Sum64(nil) - - member := config.Cluster.Members[sum64%len(config.Cluster.Members)] - /* - - // from the member list and our DNS/IP address, figure out if this request should be handled locally - if member == localMember { - next.ServeHTTP(response, request) - } - - */ - handleHTTP(response, request) - }) - } -} - -func handleHTTP(w http.ResponseWriter, req *http.Request) { - resp, err := http.DefaultTransport.RoundTrip(req) - if err != nil { - http.Error(w, err.Error(), http.StatusServiceUnavailable) - return - } - defer resp.Body.Close() - copyHeader(w.Header(), resp.Header) - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) -} - -func copyHeader(dst, src http.Header) { - for k, vv := range src { - for _, v := range vv { - dst.Add(k, v) - } - } -} diff --git a/pkg/common/common.go b/pkg/common/common.go index 3b9abe8bb7..74d84d56fe 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/fs" + "net" "os" "regexp" "strings" @@ -145,3 +146,42 @@ func IsContextDone(ctx context.Context) bool { return false } } + +func GetLocalIPs() ([]string, error) { + var localIPs []string + + ifaces, err := net.Interfaces() + if err != nil { + return []string{}, err + } + + for _, i := range ifaces { + addrs, err := i.Addrs() + if err != nil { + return localIPs, err + } + + for _, addr := range addrs { + if localIP, ok := addr.(*net.IPNet); ok { + localIPs = append(localIPs, localIP.IP.String()) + } + } + } + + return localIPs, nil +} + +func GetIPFromHostName(host string) ([]string, error) { + addrs, err := net.LookupIP(host) + if err != nil { + return []string{}, err + } + + ips := make([]string, 0, len(addrs)) + + for _, ip := range addrs { + ips = append(ips, ip.String()) + } + + return ips, nil +} diff --git a/test/scale-out/cloud_scale_out_basic_auth_tls.bats b/test/scale-out/cloud_scale_out_basic_auth_tls.bats new file mode 100644 index 0000000000..bf232d5cec --- /dev/null +++ b/test/scale-out/cloud_scale_out_basic_auth_tls.bats @@ -0,0 +1,75 @@ +# note: intended to be run as "make run-cloud-scale-out-tests". +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 +ZOT_LOG_DIR=/tmp/zot-ft-logs/auth-tls + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p ${ZOT_LOG_DIR} + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="${ZOT_LOG_DIR}/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + update_zot_cfg_set_htpasswd_auth "${zot_config_file}" ${ZOT_HTPASSWD_PATH} + update_zot_cfg_set_tls "${zot_config_file}" ${ZOT_TLS_CERT_PATH} ${ZOT_TLS_KEY_PATH} ${ZOT_TLS_CA_CERT_PATH} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} "https" +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + # setup htpasswd for local auth + setup_local_htpasswd + + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "https" + haproxy_start ${HAPROXY_CFG_FILE} + + # list haproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-basic-auth-tls-bats" "https://127.0.0.1:8000" 3 5 "${ZOT_AUTH_USER}:${ZOT_AUTH_PASS}" +} diff --git a/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats b/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats new file mode 100644 index 0000000000..446aa21a55 --- /dev/null +++ b/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats @@ -0,0 +1,74 @@ +# note: intended to be run as "make run-cloud-scale-out-high-scale-tests" +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p /tmp/zot-logs + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="/tmp/zot-logs/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + update_zot_cfg_set_htpasswd_auth "${zot_config_file}" ${ZOT_HTPASSWD_PATH} + update_zot_cfg_set_tls "${zot_config_file}" ${ZOT_TLS_CERT_PATH} ${ZOT_TLS_KEY_PATH} ${ZOT_TLS_CA_CERT_PATH} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} "https" +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + # setup htpasswd for local auth + setup_local_htpasswd + + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "https" + haproxy_start ${HAPROXY_CFG_FILE} + + # list haproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-high-scale-bats" "https://127.0.0.1:8000" 10 100 "${ZOT_AUTH_USER}:${ZOT_AUTH_PASS}" +} diff --git a/test/scale-out/cloud_scale_out_no_auth.bats b/test/scale-out/cloud_scale_out_no_auth.bats new file mode 100644 index 0000000000..938a02a44a --- /dev/null +++ b/test/scale-out/cloud_scale_out_no_auth.bats @@ -0,0 +1,69 @@ +# note: intended to be run as "make run-cloud-scale-out-tests" +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 +ZOT_LOG_DIR=/tmp/zot-ft-logs/no-auth + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p ${ZOT_LOG_DIR} + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="${ZOT_LOG_DIR}/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "http" + haproxy_start ${HAPROXY_CFG_FILE} + + # list HAproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-no-auth-bats" "http://127.0.0.1:8000" 3 5 +} diff --git a/test/scale-out/helpers_cloud.bash b/test/scale-out/helpers_cloud.bash new file mode 100644 index 0000000000..a747e8d0c7 --- /dev/null +++ b/test/scale-out/helpers_cloud.bash @@ -0,0 +1,35 @@ +function setup_cloud_services() { + setup_s3 "us-east-2" "zot-storage-test" + setup_dynamodb "us-east-2" +} + +function teardown_cloud_services() { + delete_s3_bucket "zot-storage-test" + teardown_dynamodb "us-east-2" +} + +function setup_s3() { + local region=${1} + local bucket=${2} + awslocal s3 --region ${region} mb s3://${bucket} +} + +function delete_s3_bucket() { + local bucket=${1} + awslocal s3 rb s3://${bucket} --force +} + +function setup_dynamodb() { + local region=${1} + awslocal dynamodb --region ${region} \ + create-table \ + --table-name "BlobTable" \ + --attribute-definitions AttributeName=Digest,AttributeType=S \ + --key-schema AttributeName=Digest,KeyType=HASH \ + --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 +} + +function teardown_dynamodb() { + local region=${1} + awslocal dynamodb --region ${region} delete-table --table-name "BlobTable" +} diff --git a/test/scale-out/helpers_haproxy.bash b/test/scale-out/helpers_haproxy.bash new file mode 100644 index 0000000000..68cafa4a7b --- /dev/null +++ b/test/scale-out/helpers_haproxy.bash @@ -0,0 +1,71 @@ +HAPROXY_CFG_FILE="${BATS_FILE_TMPDIR}/haproxy/haproxy-test.cfg" + +function generate_haproxy_server_list() { + local num_instances=${1} + for ((i=0;i<${num_instances};i++)) do + local port=$(( 10000 + $i )) + echo " server zot${i} 127.0.0.1:${port}" + done +} + +# stops all haproxy instances started by the test +function haproxy_stop_all() { + pkill haproxy +} + +# starts one haproxy instance with the given config file +# expects the haproxy config to specify daemon mode +function haproxy_start() { + local haproxy_cfg_file=${1} + + # Check the config file + haproxy -f ${haproxy_cfg_file} -c >&3 + + # Start haproxy + haproxy -f ${haproxy_cfg_file} +} + +# generates HAproxy config for use in the test +function generate_haproxy_config() { + local haproxy_cfg_file="${1}" + local haproxy_root_dir="$(dirname ${haproxy_cfg_file})" + # can be either http or https + local protocol="${2}" + + mkdir -p ${haproxy_root_dir} + + local haproxy_mode='http' + if [ "$protocol" == 'https' ]; then + haproxy_mode='tcp' + fi + + cat > ${haproxy_cfg_file}<> ${haproxy_cfg_file} + + cat ${haproxy_cfg_file} >&3 +} diff --git a/test/scale-out/helpers_zot.bash b/test/scale-out/helpers_zot.bash new file mode 100644 index 0000000000..22dc5b92ab --- /dev/null +++ b/test/scale-out/helpers_zot.bash @@ -0,0 +1,273 @@ +ROOT_DIR=$(git rev-parse --show-toplevel) +OS=$(go env GOOS) +ARCH=$(go env GOARCH) +ZOT_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH} +ZLI_PATH=${ROOT_DIR}/bin/zli-${OS}-${ARCH} +ZOT_MINIMAL_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH}-minimal + +# basic auth +ZOT_AUTH_USER=poweruser +ZOT_AUTH_PASS=sup*rSecr9T +ZOT_CREDS_PATH="${BATS_FILE_TMPDIR}/creds" +ZOT_HTPASSWD_PATH="${ZOT_CREDS_PATH}/htpasswd" + +# zb +ZB_PATH=${ROOT_DIR}/bin/zb-${OS}-${ARCH} +ZB_RESULTS_PATH=${ROOT_DIR}/zb-results +ZB_CI_CD_OUTPUT_FILE=${ROOT_DIR}/ci-cd.json + +# zot scale out cluster +ZOT_CLUSTER_MEMBERS_PATCH_FILE="${BATS_FILE_TMPDIR}/members-patch.json" +ZOT_ROOT_DIR="${BATS_FILE_TMPDIR}/zot" +ZOT_TLS_CERT_PATH="${ROOT_DIR}/test/data/server.cert" +ZOT_TLS_KEY_PATH="${ROOT_DIR}/test/data/server.key" +ZOT_TLS_CA_CERT_PATH="${ROOT_DIR}/test/data/ca.crt" + +function verify_prerequisites { + if [ ! -f ${ZOT_PATH} ]; then + echo "you need to build ${ZOT_PATH} before running the tests" >&3 + return 1 + fi + + if [ ! -f ${ZB_PATH} ]; then + echo "you need to build ${ZB_PATH} before running the tests" >&3 + return 1 + fi + + if [ ! $(command -v skopeo) ]; then + echo "you need to install skopeo as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v awslocal) ] &>/dev/null; then + echo "you need to install aws cli as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v haproxy) ] &>/dev/null; then + echo "you need to install haproxy as a prerequisite to running the tests" >&3 + return 1 + fi + + return 0 +} + +function get_free_port(){ + while true + do + random_port=$(( ((RANDOM<<15)|RANDOM) % 49152 + 10000 )) + status="$(nc -z 127.0.0.1 $random_port < /dev/null &>/dev/null; echo $?)" + if [ "${status}" != "0" ]; then + free_port=${random_port}; + break; + fi + done + + echo ${free_port} +} + +function zot_serve() { + local config_file=${1} + ${ZOT_PATH} serve ${config_file} & +} + +# stops all zot instances started by the test +function zot_stop_all() { + pkill zot +} + +# waits for the zot server to be reachable +# leaving argument 2 blank or specifying "http" causes the function to use HTTP +# specifying "https" for argument 2 causes the function to use TLS +function wait_zot_reachable() { + local zot_port=${1} + local protocol=${2} + if [ -z "${protocol}" ]; then + protocol="http" + fi + + local zot_url="${protocol}://127.0.0.1:${zot_port}/v2/_catalog" + + local curl_opts=( + --connect-timeout 3 + --max-time 5 + --retry 20 + --retry-delay 1 + --retry-max-time 180 + --retry-connrefused + ) + + # since this is only a reachability check, we can disable cert verification + if [ "${protocol}" == "https" ]; then + curl_opts=(--insecure "${curl_opts[@]}") + fi + + curl "${curl_opts[@]}" ${zot_url} +} + +function zb_run() { + local test_name=${1} + local zot_address=${2} + local concurrent_reqs=${3} + local num_requests=${4} + local credentials=${5} + + if [ ! -d "${ZB_RESULTS_PATH}" ]; then + mkdir -p "${ZB_RESULTS_PATH}" + fi + + local zb_args=( + -c ${concurrent_reqs} + -n ${num_requests} + --src-cidr 127.0.10.0/24 + -o ci-cd + --skip-cleanup + ) + + if [ ! -z "${credentials}" ]; then + zb_args=(-A ${credentials} "${zb_args[@]}") + fi + + start=$(date +%s) + ${ZB_PATH} "${zb_args[@]}" ${zot_address} + stop=$(date +%s) + + runtime=$((stop-start)) + echo "Duration: ${runtime} seconds" >&3 + + if [ -f "${ZB_CI_CD_OUTPUT_FILE}" ]; then + mv "${ZB_CI_CD_OUTPUT_FILE}" "${ZB_RESULTS_PATH}/${test_name}-results.json" + fi +} + +function setup_local_htpasswd() { + create_htpasswd_file "${ZOT_CREDS_PATH}" "${ZOT_HTPASSWD_PATH}" ${ZOT_AUTH_USER} ${ZOT_AUTH_PASS} +} + +function create_htpasswd_file() { + local creds_dir_path="${1}" + local htpasswd_file_path="${2}" + local user=${3} + local password=${4} + + mkdir -p "${creds_dir_path}" + htpasswd -b -c -B "${htpasswd_file_path}" ${user} ${password} +} + +# given the number of zot instances, computes a list of cluster members +# and saves them as a JSON to a file that can be used with jq later. +function generate_zot_cluster_member_list() { + local num_zot_instances=${1} + local patch_file_path=${2} + local temp_file="${BATS_FILE_TMPDIR}/jq-member-dump.json" + echo "{\"cluster\":{\"members\":[]}}" > ${patch_file_path} + + for ((i=0;i<${num_zot_instances};i++)); do + local member="127.0.0.1:$(( 10000 + $i ))" + jq ".cluster.members += [\"${member}\"]" ${patch_file_path} > ${temp_file} && \ + mv ${temp_file} ${patch_file_path} + done + + echo "cluster members patch file" >&3 + cat ${patch_file_path} >&3 +} + +# patches an existing zot config file to add all the cluster members. +function update_zot_cluster_member_list_in_config_file() { + local zot_config_file=${1} + local zot_members_patch_file=${2} + local temp_file="${BATS_FILE_TMPDIR}/jq-mem-update-dump.json" + + jq -s '.[0] * .[1]' ${zot_config_file} ${zot_members_patch_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +} + +# generates and saves a base cloud config with shared storage +# given some basic parameters about the zot instance. +function create_zot_cloud_base_config_file() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir="${3}" + local zot_config_file="${4}" + local zot_log_file="${5}" + + cat > ${zot_config_file}< ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +} + +# updates an existing zot config file that already has an HTTP config +# to include TLS configuration. +# intended for use with create_zot_cloud_base_config_file() above. +function update_zot_cfg_set_tls() { + local zot_config_file="${1}" + local zot_cert_path="${2}" + local zot_key_path="${3}" + local zot_cacert_path="${4}" + local temp_file="${BATS_FILE_TMPDIR}/jq-tls-dump.json" + + # set zot TLS config + jq --arg zot_cert_path "${zot_cert_path}" --arg zot_key_path "${zot_key_path}" '(.http) += {tls: {cert: $zot_cert_path, key: $zot_key_path}}' \ + ${zot_config_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} + + jq --arg zot_cacert_path "${zot_cacert_path}" '(.cluster) += {tls: {cacert: $zot_cacert_path}}' \ + ${zot_config_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +}