diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 222cfbc42bd..81a08eb76b3 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -176,6 +176,19 @@ jobs: - name: Read Benchmark report run: cat test/benchmark/benchmark_report/benchmark_report.md + resilience-test: + runs-on: ubuntu-latest + if: ${{ ! startsWith(github.event_name, 'push') }} + needs: [build] + steps: + - uses: actions/checkout@v4.2.2 + - uses: ./tools/github-actions/setup-deps + - name: Resilience Test + env: + KIND_NODE_TAG: v1.28.13 + IMAGE_PULL_POLICY: IfNotPresent + CUSTOM_CNI: "true" + run: make resilience publish: runs-on: ubuntu-latest diff --git a/test/resilience/embed.go b/test/resilience/embed.go new file mode 100644 index 00000000000..8594becb669 --- /dev/null +++ b/test/resilience/embed.go @@ -0,0 +1,13 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package resilience + +import "embed" + +//go:embed testdata/*.yaml +var Manifests embed.FS diff --git a/test/resilience/resilience_test.go b/test/resilience/resilience_test.go new file mode 100644 index 00000000000..6903860ed6e --- /dev/null +++ b/test/resilience/resilience_test.go @@ -0,0 +1,40 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package resilience + +import ( + "flag" + "io/fs" + "os" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/gateway-api/conformance/utils/flags" + "testing" + + "github.com/envoyproxy/gateway/test/resilience/suite" + "github.com/envoyproxy/gateway/test/resilience/tests" + kubetest "github.com/envoyproxy/gateway/test/utils/kubernetes" +) + +func TestResilience(t *testing.T) { + cli, _ := kubetest.NewClient(t) + // Parse benchmark options. + flag.Parse() + log.SetLogger(zap.New(zap.WriteTo(os.Stderr), zap.UseDevMode(true))) + bSuite, err := suite.NewResilienceTestSuite( + cli, + *suite.ReportSaveDir, + []fs.FS{Manifests}, + *flags.GatewayClassName, + ) + if err != nil { + t.Fatalf("Failed to create the resillience test suit: %v", err) + } + t.Logf("Running %d resilience tests", len(tests.ResilienceTests)) + bSuite.Run(t, tests.ResilienceTests) +} diff --git a/test/resilience/suite/flags.go b/test/resilience/suite/flags.go new file mode 100644 index 00000000000..1004548570d --- /dev/null +++ b/test/resilience/suite/flags.go @@ -0,0 +1,14 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package suite + +import "flag" + +var ( + ReportSaveDir = flag.String("report-save-dir", "benchmark_report", "The dir where to save the benchmark test report.") +) diff --git a/test/resilience/suite/suite.go b/test/resilience/suite/suite.go new file mode 100644 index 00000000000..ec60c4f4236 --- /dev/null +++ b/test/resilience/suite/suite.go @@ -0,0 +1,115 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package suite + +import ( + "context" + "github.com/envoyproxy/gateway/test/utils/kubernetes" + "io/fs" + "sigs.k8s.io/gateway-api/conformance/utils/roundtripper" + "testing" + "time" + + opt "github.com/envoyproxy/gateway/internal/cmd/options" + kube "github.com/envoyproxy/gateway/internal/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api/conformance/utils/config" +) + +const ( + BenchmarkTestScaledKey = "benchmark-test/scaled" + BenchmarkTestClientKey = "benchmark-test/client" + DefaultControllerName = "gateway.envoyproxy.io/gatewayclass-controller" +) + +type ResilienceTest struct { + ShortName string + Description string + Test func(*testing.T, *ResilienceTestSuite) +} + +type ResilienceTestSuite struct { + Client client.Client + TimeoutConfig config.TimeoutConfig + ControllerName string + ReportSaveDir string + KubeActions *kubernetes.KubeActions + // Labels + scaledLabels map[string]string // indicate which resources are scaled + + // Clients that for internal usage. + kubeClient kube.CLIClient // required for getting logs from pod\ + ManifestFS []fs.FS + GatewayClassName string + RoundTripper roundtripper.RoundTripper +} + +func NewResilienceTestSuite(client client.Client, reportDir string, manifestFS []fs.FS, gcn string) (*ResilienceTestSuite, error) { + var ( + timeoutConfig = config.TimeoutConfig{} + ) + + // Reset some timeout config for the benchmark test. + config.SetupTimeoutConfig(&timeoutConfig) + timeoutConfig.RouteMustHaveParents = 180 * time.Second + roundTripper := &roundtripper.DefaultRoundTripper{Debug: true, TimeoutConfig: timeoutConfig} + // Initial various client. + kubeClient, err := kube.NewCLIClient(opt.DefaultConfigFlags.ToRawKubeConfigLoader()) + if err != nil { + return nil, err + } + KubeActions := kubernetes.NewKubeHelper(client, kubeClient) + return &ResilienceTestSuite{ + Client: client, + ManifestFS: manifestFS, + TimeoutConfig: timeoutConfig, + ControllerName: DefaultControllerName, + ReportSaveDir: reportDir, + GatewayClassName: gcn, + scaledLabels: map[string]string{ + BenchmarkTestScaledKey: "true", + }, + KubeActions: KubeActions, + kubeClient: kubeClient, + RoundTripper: roundTripper, + }, nil +} + +func (rts *ResilienceTestSuite) WithResCleanUp(ctx context.Context, t *testing.T, f func() (client.Object, error)) error { + res, err := f() + t.Cleanup(func() { + t.Logf("Start to cleanup resilsence test resources") + if deleteErr := rts.Client.Delete(ctx, res); deleteErr != nil { + } + + t.Logf("Clean up complete!") + }) + return err +} + +func (rts *ResilienceTestSuite) Kube() *kubernetes.KubeActions { + return rts.KubeActions +} + +func (rts *ResilienceTestSuite) Run(t *testing.T, tests []ResilienceTest) { + t.Logf("Running %d resilience tests", len(tests)) + for _, test := range tests { + t.Logf("Running resilience test: %s", test.ShortName) + test.Test(t, rts) + } +} + +func (rts *ResilienceTestSuite) RegisterCleanup(t *testing.T, ctx context.Context, object client.Object) { + t.Cleanup(func() { + t.Logf("Start to cleanup resilsence test resources") + if deleteErr := rts.Client.Delete(ctx, object); deleteErr != nil { + } + + t.Logf("Clean up complete!") + }) +} diff --git a/test/resilience/testdata/base.yaml b/test/resilience/testdata/base.yaml new file mode 100644 index 00000000000..1be77ba9602 --- /dev/null +++ b/test/resilience/testdata/base.yaml @@ -0,0 +1,110 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: gateway-resilience +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + namespace: gateway-resilience + name: envoy-gateway +spec: + controllerName: gateway.envoyproxy.io/gatewayclass-controller + parametersRef: + group: gateway.envoyproxy.io + kind: EnvoyProxy + name: custom-proxy-config + namespace: gateway-resilience +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: all-namespaces + namespace: gateway-resilience +spec: + gatewayClassName: "{GATEWAY_CLASS_NAME}" + listeners: + - name: http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: All +--- +apiVersion: v1 +kind: Service +metadata: + name: backend + namespace: gateway-resilience +spec: + selector: + app: backend + ports: + - protocol: TCP + port: 8080 + name: http11 + targetPort: 3000 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: backend + namespace: gateway-resilience + labels: + app: backend +spec: + replicas: 2 + selector: + matchLabels: + app: backend + template: + metadata: + labels: + app: backend + spec: + containers: + - name: backend + # From https://github.com/kubernetes-sigs/gateway-api/blob/main/conformance/echo-basic/echo-basic.go + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20231214-v1.0.0-140-gf544a46e + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SERVICE_NAME + value: backend + resources: + requests: + cpu: 10m +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: backend + namespace: gateway-resilience +spec: + parentRefs: + - name: all-namespaces + rules: + - matches: + - path: + type: PathPrefix + value: /welcome + backendRefs: + - name: backend + port: 8080 +--- +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyProxy +metadata: + name: custom-proxy-config + namespace: gateway-resilience +spec: + provider: + type: Kubernetes + kubernetes: + diff --git a/test/resilience/testdata/route_changes.yaml b/test/resilience/testdata/route_changes.yaml new file mode 100644 index 00000000000..2c8d98c0e8c --- /dev/null +++ b/test/resilience/testdata/route_changes.yaml @@ -0,0 +1,16 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: backend + namespace: gateway-resilience +spec: + parentRefs: + - name: all-namespaces + rules: + - matches: + - path: + type: PathPrefix + value: /route-change + backendRefs: + - name: backend + port: 8080 diff --git a/test/resilience/tests/envoygateway.go b/test/resilience/tests/envoygateway.go new file mode 100644 index 00000000000..1d8b8787879 --- /dev/null +++ b/test/resilience/tests/envoygateway.go @@ -0,0 +1,253 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package tests + +import ( + "context" + "github.com/envoyproxy/gateway/test/resilience/suite" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api/conformance/utils/http" + "sigs.k8s.io/gateway-api/conformance/utils/kubernetes" + "sigs.k8s.io/gateway-api/conformance/utils/tlog" + "testing" + "time" +) + +const ( + namespace = "envoy-gateway-system" + envoygateway = "envoy-gateway" + targetString = "successfully acquired lease" + apiServerIP = "10.96.0.1" + timeout = 2 * time.Minute + policyName = "egress-rules" + leaseName = "5b9825d2.gateway.envoyproxy.io" + trashHold = 2 +) + +func init() { + ResilienceTests = append(ResilienceTests, EGResilience) +} + +var EGResilience = suite.ResilienceTest{ + ShortName: "EGResilience", + Description: "Envoygateway resilience test", + Test: func(t *testing.T, suite *suite.ResilienceTestSuite) { + ap := kubernetes.Applier{ + ManifestFS: suite.ManifestFS, + GatewayClass: suite.GatewayClassName, + ControllerName: "gateway.envoyproxy.io/gatewayclass-controller", + } + ap.MustApplyWithCleanup(t, suite.Client, suite.TimeoutConfig, "testdata/base.yaml", true) + + //this test will fail until https://github.com/envoyproxy/gateway/pull/4767/files is merged + t.Run("Secondary EnvoyGateway instances can serve an up to date xDS", func(t *testing.T) { + ctx := context.Background() + t.Log("Scaling down the deployment to 0 replicas") + err := suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 0, time.Minute, false) + require.NoError(t, err, "Failed to scale deployment replicas") + + t.Log("Scaling up the deployment to 3 replicas") + err = suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 3, time.Minute, false) + require.NoError(t, err, "Failed to scale deployment replicas") + + t.Log("Waiting for leader election") + // Ensure leadership was taken + name, err := suite.Kube().GetElectedLeader(context.Background(), namespace, leaseName, metav1.Now(), time.Minute*2) + require.NoError(t, err, "unable to detect leader election") + + t.Log("Marking the identified pod as leader") + suite.Kube().MarkAsLeader(namespace, name) + + // Pods rely on connectivity to the API server to participate in leader election processes. + // Without this connectivity, they cannot become leaders, in this test we won't bring it back. + // The secondary pods will continue to operate using their last known good configuration (xDS) + // and share it with envoy proxies accordingly. + t.Log("Simulating API server connection failure for all pods") + err = suite.WithResCleanUp(ctx, t, func() (client.Object, error) { + return suite.Kube().ManageEgress(ctx, apiServerIP, namespace, policyName, true, map[string]string{"app.kubernetes.io/name": "gateway-helm"}) + }) + require.NoError(t, err, "Failed to simulate API server connection failure") + + // The leader pod should go down, the standby pods remain + // When a leader pod loses connectivity to the API server, Kubernetes does not immediately terminate or stop the pod. + // Instead, the pod itself detects the loss of connectivity, initiates a graceful teardown process, and restarts to attempt + // to reconnect to the API server. + // The replica count for the deployment remains at 3 throughout the process. + // Kubernetes does not schedule a new pod to replace the one that lost connectivity because the existing pod is not + // considered failed from Kubernetes’ perspective. It’s the responsibility of the application running inside the + // pod (e.g., the leader election logic) to handle reconnection attempts or restart itself. + t.Log("Verifying deployment scales down to 2 replica") + err = suite.Kube().CheckDeploymentReplicas(ctx, envoygateway, namespace, 2, time.Minute) + require.NoError(t, err, "Deployment did not scale down") + + ns := "gateway-resilience" + routeNN := types.NamespacedName{Name: "backend", Namespace: ns} + gwNN := types.NamespacedName{Name: "all-namespaces", Namespace: ns} + gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN) + + expectedResponse := http.ExpectedResponse{ + Request: http.Request{ + Path: "/welcome", + }, + Response: http.Response{ + StatusCode: 200, + }, + Namespace: ns, + } + + req := http.MakeRequest(t, &expectedResponse, gwAddr, "http", "http") + http.AwaitConvergence(t, trashHold, timeout, func(elapsed time.Duration) bool { + cReq, cRes, err := suite.RoundTripper.CaptureRoundTrip(req) + if err != nil { + tlog.Logf(t, "Request failed, not ready yet: %v (after %v)", err.Error(), elapsed) + return false + } + + if err := http.CompareRequest(t, &req, cReq, cRes, expectedResponse); err != nil { + tlog.Logf(t, "Response expectation failed for request: %+v not ready yet: %v (after %v)", req, err, elapsed) + return false + } + return true + }) + }) + + t.Run("EnvoyGateway reconciles missed resources and sync xDS after api server connectivity is restored", func(t *testing.T) { + err := suite.Kube().ScaleDeploymentAndWait(context.Background(), envoygateway, namespace, 0, timeout, false) + require.NoError(t, err, "Failed to scale deployment") + err = suite.Kube().ScaleDeploymentAndWait(context.Background(), envoygateway, namespace, 1, timeout, false) + require.NoError(t, err, "Failed to scale deployment") + + // Ensure leadership was taken + _, err = suite.Kube().GetElectedLeader(context.Background(), namespace, leaseName, metav1.Now(), timeout) + require.NoError(t, err, "unable to detect leader election") + + t.Log("Simulating API server down for all pods") + err = suite.WithResCleanUp(context.Background(), t, func() (client.Object, error) { + return suite.Kube().ManageEgress(context.Background(), apiServerIP, namespace, policyName, true, map[string]string{}) + }) + require.NoError(t, err, "unable to block api server connectivity") + + ap.MustApplyWithCleanup(t, suite.Client, suite.TimeoutConfig, "testdata/route_changes.yaml", true) + t.Log("backend routes changed") + + t.Log("restore API server connectivity") + _, err = suite.Kube().ManageEgress(context.Background(), apiServerIP, namespace, policyName, false, map[string]string{}) + require.NoError(t, err, "unable to unblock api server connectivity") + + err = suite.Kube().WaitForDeploymentReplicaCount(context.Background(), envoygateway, namespace, 1, time.Minute, false) + require.NoError(t, err, "Failed to ensure that pod is online") + _, err = suite.Kube().GetElectedLeader(context.Background(), namespace, leaseName, metav1.Now(), time.Minute*2) + require.NoError(t, err, "unable to detect leader election") + t.Log("eg is online") + ns := "gateway-resilience" + routeNN := types.NamespacedName{Name: "backend", Namespace: ns} + gwNN := types.NamespacedName{Name: "all-namespaces", Namespace: ns} + gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN) + + expectedResponse := http.ExpectedResponse{ + Request: http.Request{ + Path: "/route-change", + }, + Response: http.Response{ + StatusCode: 200, + }, + Namespace: ns, + } + + req := http.MakeRequest(t, &expectedResponse, gwAddr, "http", "http") + http.AwaitConvergence(t, trashHold, time.Minute, func(elapsed time.Duration) bool { + cReq, cRes, err := suite.RoundTripper.CaptureRoundTrip(req) + if err != nil { + tlog.Logf(t, "Request failed, not ready yet: %v (after %v)", err.Error(), elapsed) + return false + } + + if err := http.CompareRequest(t, &req, cReq, cRes, expectedResponse); err != nil { + tlog.Logf(t, "Response expectation failed for request: %+v not ready yet: %v (after %v)", req, err, elapsed) + return false + } + return true + }) + + require.NoError(t, err, "Failed during connectivity checkup") + }) + + t.Run("Leader election transitions when leader loses API server connection", func(t *testing.T) { + ctx := context.Background() + t.Log("Scaling down the deployment to 0 replicas") + err := suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 0, time.Minute, false) + require.NoError(t, err, "Failed to scale deployment replicas") + + t.Log("Scaling up the deployment to 2 replicas") + err = suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 2, time.Minute, false) + require.NoError(t, err, "Failed to scale deployment replicas") + + t.Log("Waiting for leader election") + // Ensure leadership was taken + name, err := suite.Kube().GetElectedLeader(context.Background(), namespace, leaseName, metav1.Now(), time.Minute*2) + require.NoError(t, err, "unable to detect leader election") + + t.Log("Marking the identified pod as leader") + suite.Kube().MarkAsLeader(namespace, name) + + t.Log("Simulating API server connection failure for the leader") + err = suite.WithResCleanUp(ctx, t, func() (client.Object, error) { + return suite.Kube().ManageEgress(ctx, apiServerIP, namespace, policyName, true, map[string]string{ + "leader": "true", + }) + }) + require.NoError(t, err, "Failed to simulate API server connection failure") + + // leader pod should go down, the standby remain + t.Log("Verifying deployment scales down to 1 replicas") + err = suite.Kube().CheckDeploymentReplicas(ctx, envoygateway, namespace, 1, time.Minute) + require.NoError(t, err, "Deployment did not scale down") + + // Ensure leadership was taken + newLeader, err := suite.Kube().GetElectedLeader(context.Background(), namespace, leaseName, metav1.Now(), time.Minute*2) + require.NoError(t, err, "unable to detect leader election") + require.NotEqual(t, newLeader, name, "new leader name should not be equal to the first leader") + ap.MustApplyWithCleanup(t, suite.Client, suite.TimeoutConfig, "testdata/route_changes.yaml", true) + t.Log("backend routes changed") + + ns := "gateway-resilience" + routeNN := types.NamespacedName{Name: "backend", Namespace: ns} + gwNN := types.NamespacedName{Name: "all-namespaces", Namespace: ns} + gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN) + + expectedResponse := http.ExpectedResponse{ + Request: http.Request{ + Path: "/route-change", + }, + Response: http.Response{ + StatusCode: 200, + }, + Namespace: ns, + } + + req := http.MakeRequest(t, &expectedResponse, gwAddr, "http", "http") + + http.AwaitConvergence(t, trashHold, timeout, func(elapsed time.Duration) bool { + cReq, cRes, err := suite.RoundTripper.CaptureRoundTrip(req) + if err != nil { + tlog.Logf(t, "Request failed, not ready yet: %v (after %v)", err.Error(), elapsed) + return false + } + + if err := http.CompareRequest(t, &req, cReq, cRes, expectedResponse); err != nil { + tlog.Logf(t, "Response expectation failed for request: %+v not ready yet: %v (after %v)", req, err, elapsed) + return false + } + return true + }) + }) + }, +} diff --git a/test/resilience/tests/envoyproxy.go b/test/resilience/tests/envoyproxy.go new file mode 100644 index 00000000000..492e4530c4a --- /dev/null +++ b/test/resilience/tests/envoyproxy.go @@ -0,0 +1,92 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package tests + +import ( + "context" + "github.com/envoyproxy/gateway/test/resilience/suite" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/gateway-api/conformance/utils/http" + "sigs.k8s.io/gateway-api/conformance/utils/kubernetes" + "sigs.k8s.io/gateway-api/conformance/utils/tlog" + "testing" + "time" +) + +func init() { + ResilienceTests = append(ResilienceTests, EPResilience) +} + +var EPResilience = suite.ResilienceTest{ + ShortName: "EPResilience", + Description: "Envoyproxy resilience test", + Test: func(t *testing.T, suite *suite.ResilienceTestSuite) { + var () + + ap := kubernetes.Applier{ + ManifestFS: suite.ManifestFS, + GatewayClass: suite.GatewayClassName, + ControllerName: "gateway.envoyproxy.io/gatewayclass-controller", + } + + ap.MustApplyWithCleanup(t, suite.Client, suite.TimeoutConfig, "testdata/base.yaml", true) + + t.Run("Envoy proxies continue to work even when eg is offline", func(t *testing.T) { + ctx := context.Background() + + t.Log("Scaling down the deployment to 2 replicas") + err := suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 2, time.Minute, false) + require.NoError(t, err, "Failed to scale deployment replicas") + + t.Log("ensure envoy proxy is running") + err = suite.Kube().CheckDeploymentReplicas(ctx, envoygateway, namespace, 2, time.Minute) + require.NoError(t, err, "Failed to check deployment replicas") + + t.Log("Scaling down the deployment to 0 replicas") + err = suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 0, time.Minute, false) + require.NoError(t, err, "Failed to scale deployment to replicas") + + t.Cleanup(func() { + err := suite.Kube().ScaleDeploymentAndWait(ctx, envoygateway, namespace, 1, time.Minute, false) + require.NoError(t, err, "Failed to restore replica count.") + }) + + require.NoError(t, err, "failed to add cleanup") + + ns := "gateway-resilience" + routeNN := types.NamespacedName{Name: "backend", Namespace: ns} + gwNN := types.NamespacedName{Name: "all-namespaces", Namespace: ns} + gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN) + + expectedResponse := http.ExpectedResponse{ + Request: http.Request{ + Path: "/welcome", + }, + Response: http.Response{ + StatusCode: 200, + }, + Namespace: ns, + } + + req := http.MakeRequest(t, &expectedResponse, gwAddr, "http", "http") + http.AwaitConvergence(t, trashHold, timeout, func(elapsed time.Duration) bool { + cReq, cRes, err := suite.RoundTripper.CaptureRoundTrip(req) + if err != nil { + tlog.Logf(t, "Request failed, not ready yet: %v (after %v)", err.Error(), elapsed) + return false + } + if err := http.CompareRequest(t, &req, cReq, cRes, expectedResponse); err != nil { + tlog.Logf(t, "Response expectation failed for request: %+v not ready yet: %v (after %v)", req, err, elapsed) + return false + } + return true + }) + }) + }, +} diff --git a/test/resilience/tests/tests.go b/test/resilience/tests/tests.go new file mode 100644 index 00000000000..54a2d7bbde8 --- /dev/null +++ b/test/resilience/tests/tests.go @@ -0,0 +1,12 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build resilience + +package tests + +import "github.com/envoyproxy/gateway/test/resilience/suite" + +var ResilienceTests []suite.ResilienceTest diff --git a/test/utils/kubernetes/kube.go b/test/utils/kubernetes/kube.go new file mode 100644 index 00000000000..0660bfbc99e --- /dev/null +++ b/test/utils/kubernetes/kube.go @@ -0,0 +1,322 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package kubernetes + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" + networkingv1 "k8s.io/api/networking/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + kube "github.com/envoyproxy/gateway/internal/kubernetes" +) + +// NewKubeHelper consolidates common Kubernetes operations, including deployments, traffic management, and log probing. +func NewKubeHelper(client client.Client, kubeClient kube.CLIClient) *KubeActions { + return &KubeActions{ + Client: client, + CLIClient: kubeClient, + } +} + +type KubeActions struct { + client.Client + kube.CLIClient +} + +func (ka *KubeActions) ManageEgress(ctx context.Context, ip, namespace, policyName string, blockTraffic bool, scope map[string]string) (*networkingv1.NetworkPolicy, error) { + // Retrieve the existing NetworkPolicy, if it exists + existingPolicy := &networkingv1.NetworkPolicy{} + err := ka.Get(ctx, client.ObjectKey{Name: policyName, Namespace: namespace}, existingPolicy) + if err != nil && !kerrors.IsNotFound(err) { + return nil, fmt.Errorf("failed to get existing NetworkPolicy: %w", err) + } + + // Define the Egress rule based on the enforce parameter + egressRule := networkingv1.NetworkPolicyEgressRule{ + To: []networkingv1.NetworkPolicyPeer{ + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "0.0.0.0/0", + Except: []string{ + ip + "/32", + }, + }, + }, + }, + } + // Define the NetworkPolicy object + netPolicy := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Namespace: namespace, + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: scope, + }, // Selects all pods in the namespace + PolicyTypes: []networkingv1.PolicyType{ + networkingv1.PolicyTypeEgress, + }, + Egress: []networkingv1.NetworkPolicyEgressRule{ + egressRule, + }, + }, + } + + // remove the policy + if !blockTraffic { + if err := ka.Client.Delete(ctx, netPolicy); err != nil { + return nil, fmt.Errorf("failed to delete NetworkPolicy: %w", err) + } + return nil, nil + } + + if kerrors.IsNotFound(err) { + // Create the NetworkPolicy if it doesn't exist + if err := ka.Client.Create(ctx, netPolicy); err != nil { + return nil, fmt.Errorf("failed to create NetworkPolicy: %w", err) + } + fmt.Printf("NetworkPolicy %s created.\n", netPolicy.Name) + } else { + // Update the existing NetworkPolicy + existingPolicy.Spec = netPolicy.Spec + if err := ka.Client.Update(ctx, existingPolicy); err != nil { + return nil, fmt.Errorf("failed to update NetworkPolicy: %w", err) + } + fmt.Printf("NetworkPolicy %s updated.\n", netPolicy.Name) + } + + return netPolicy, nil +} + +func (ka *KubeActions) ScaleDeploymentAndWait(ctx context.Context, deploymentName, namespace string, replicas int32, timeout time.Duration, prefix bool) error { + // Get the current deployment + deployment := &appsv1.Deployment{} + if prefix { + var err error + deployment, err = ka.getDepByPrefix(ctx, deploymentName, namespace) + if err != nil { + return err + } + } else { + err := ka.Client.Get(ctx, client.ObjectKey{Name: deploymentName, Namespace: namespace}, deployment) + if err != nil { + return err + } + } + + // Update the replicas count + deployment.Spec.Replicas = &replicas + + // Apply the update + err := ka.Client.Update(ctx, deployment) + if err != nil { + return err + } + + fmt.Printf("Deployment %s scaled to %d replicas\n", deployment.Name, replicas) + return ka.WaitForDeploymentReplicaCount(ctx, deployment.Name, namespace, replicas, timeout, false) +} + +func (ka *KubeActions) ScaleEnvoyProxy(envoyProxyName, namespace string, replicas int32) error { + ctx := context.Background() + + // Retrieve the existing EnvoyProxy resource + envoyProxy := &egv1a1.EnvoyProxy{} + err := ka.Client.Get(ctx, types.NamespacedName{Name: envoyProxyName, Namespace: namespace}, envoyProxy) + if err != nil { + return fmt.Errorf("failed to get EnvoyProxy: %w", err) + } + envoyProxy.Spec.Provider.Kubernetes = &egv1a1.EnvoyProxyKubernetesProvider{ + EnvoyDeployment: &egv1a1.KubernetesDeploymentSpec{ + Replicas: ptr.To[int32](replicas), + }, + } + + // Update the replicas count + envoyProxy.Spec.Provider.Kubernetes.EnvoyDeployment.Replicas = &replicas + + // Apply the update + err = ka.Client.Update(ctx, envoyProxy) + if err != nil { + return fmt.Errorf("failed to update EnvoyProxy: %w", err) + } + + return nil +} + +func (ka *KubeActions) MarkAsLeader(namespace, podName string) { + pod, err := ka.Kube().CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + panic(err.Error()) + } + + // Initialize the labels map if it's nil + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + // Add or update the desired label + pod.Labels["leader"] = "true" + + // Update the Pod with the new label + updatedPod, err := ka.Kube().CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + if err != nil { + panic(err.Error()) + } + + fmt.Printf("Pod %s updated with new label.\n", updatedPod.Name) +} + +func (ka *KubeActions) WaitForDeploymentReplicaCount(ctx context.Context, deploymentName, namespace string, replicas int32, timeout time.Duration, prefix bool) error { + start := time.Now() + + for { + // Check if the timeout has been reached + if time.Since(start) > timeout { + return errors.New("timeout reached waiting for deployment to scale") + } + + // Get the current deployment status + deployment := &appsv1.Deployment{} + + if prefix { + var err error + deployment, err = ka.getDepByPrefix(ctx, deploymentName, namespace) + if err != nil { + return err + } + } else { + err := ka.Get(ctx, client.ObjectKey{Name: deploymentName, Namespace: namespace}, deployment) + if err != nil { + return err + } + } + + // Check if the deployment has reached the desired number of replicas + if deployment.Status.ReadyReplicas == replicas { + fmt.Printf("Deployment %s scaled to %d replicas\n", deploymentName, replicas) + return nil + } + + // Wait before checking again + time.Sleep(5 * time.Second) + } +} + +func (ka *KubeActions) CheckDeploymentReplicas(ctx context.Context, prefix, namespace string, expectedReplicas int, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + deployment, err := ka.getDepByPrefix(ctx, prefix, namespace) + if err != nil { + return err + } + + if deployment != nil { + // Wait for the deployment to reach the expected replica count + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout reached: deployment %q did not reach %d replicas", deployment.Name, expectedReplicas) + default: + // Fetch the current status of the deployment + deployment, err := ka.Kube().AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment %q: %w", deployment.Name, err) + } + + // Check the ready replica count + if int(deployment.Status.ReadyReplicas) == expectedReplicas { + fmt.Printf("Deployment %q reached %d replicas as expected.\n", deployment.Name, expectedReplicas) + return nil + } + + fmt.Printf("Waiting for deployment %q: ready replicas %d/%d\n", + deployment.Name, deployment.Status.ReadyReplicas, expectedReplicas) + time.Sleep(1 * time.Second) // Retry interval + } + } + } + return errors.New("deployment was not found") +} + +func (ka *KubeActions) getDepByPrefix(ctx context.Context, prefix string, namespace string) (*appsv1.Deployment, error) { + deployments, err := ka.Kube().AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list deployments: %w", err) + } + + // Search for the deployment with the specified prefix + for _, dep := range deployments.Items { + if len(dep.Name) >= len(prefix) && dep.Name[:len(prefix)] == prefix { + return &dep, nil + } + } + return nil, errors.New("deployment not found") +} + +func (ka *KubeActions) GetElectedLeader(ctx context.Context, namespace, leaseName string, afterTime metav1.Time, timeout time.Duration) (string, error) { + // Create a context with a timeout + ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + for { + // Fetch the Lease object + lease, err := ka.getLease(ctxWithTimeout, namespace, leaseName) + if err != nil { + return "", fmt.Errorf("failed to get lease %s in namespace %s: %w", leaseName, namespace, err) + } + + // Check if RenewTime matches the condition + if lease.Spec.RenewTime != nil && lease.Spec.RenewTime.After(afterTime.Time) { + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" { + return "", fmt.Errorf("lease %s does not have a valid holderIdentity", leaseName) + } + + // Return the leader pod name + hi := *lease.Spec.HolderIdentity + parts := strings.SplitN(hi, "_", 2) + + // Return the left part (pod name) + if len(parts) > 0 { + return parts[0], nil + } else { + return "", fmt.Errorf("lease %s does not have a valid holderIdentity", leaseName) + } + } + + // Sleep for a short interval before retrying to avoid excessive API calls + select { + case <-ctxWithTimeout.Done(): + return "", fmt.Errorf("timeout reached while waiting for lease renew time: %w", ctxWithTimeout.Err()) + case <-time.After(1 * time.Second): + // Retry after a delay + } + } +} + +func (ka *KubeActions) getLease(ctx context.Context, namespace, leaseName string) (*coordinationv1.Lease, error) { + // Fetch the Lease object + lease, err := ka.Kube().CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get lease %s in namespace %s: %w", leaseName, namespace, err) + } + + return lease, nil +} diff --git a/tools/hack/create-cluster.sh b/tools/hack/create-cluster.sh index c779aa0d6f3..ffb2408a669 100755 --- a/tools/hack/create-cluster.sh +++ b/tools/hack/create-cluster.sh @@ -8,11 +8,19 @@ METALLB_VERSION=${METALLB_VERSION:-"v0.13.10"} KIND_NODE_TAG=${KIND_NODE_TAG:-"v1.32.0"} NUM_WORKERS=${NUM_WORKERS:-""} IP_FAMILY=${IP_FAMILY:-"ipv4"} +CUSTOM_CNI=${CUSTOM_CNI:-"false"} + +if [ "$CUSTOM_CNI" = "true" ]; then + CNI_CONFIG="disableDefaultCNI: true" +else + CNI_CONFIG="disableDefaultCNI: false" +fi KIND_CFG=$(cat <<-EOM kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 networking: + ${CNI_CONFIG} ipFamily: ${IP_FAMILY} # it's to prevent inherit search domains from the host which slows down DNS resolution # and cause problems to IPv6 only clusters running on IPv4 host. @@ -44,7 +52,38 @@ ${KIND_CFG} EOF fi fi - +if [ "$CUSTOM_CNI" = "true" ]; then +## Install Calico +# Determine the operating system +OS=$(uname -s) +case $OS in + Darwin) + CILIUM_CLI_VERSION=$(curl -s https://raw.githubusercontent.com/cilium/cilium-cli/main/stable.txt) + CLI_ARCH=amd64 + if [ "$(uname -m)" = "arm64" ]; then CLI_ARCH=arm64; fi + curl -L --fail --remote-name-all "https://github.com/cilium/cilium-cli/releases/download/${CILIUM_CLI_VERSION}/cilium-darwin-${CLI_ARCH}.tar.gz"{,.sha256sum} + shasum -a 256 -c cilium-darwin-${CLI_ARCH}.tar.gz.sha256sum + tar xf cilium-darwin-${CLI_ARCH}.tar.gz + rm cilium-darwin-${CLI_ARCH}.tar.gz{,.sha256sum} + ;; + Linux) + CILIUM_CLI_VERSION=$(curl -s https://raw.githubusercontent.com/cilium/cilium-cli/main/stable.txt) + CLI_ARCH=amd64 + if [ "$(uname -m)" = "aarch64" ]; then CLI_ARCH=arm64; fi + curl -L --fail --remote-name-all "https://github.com/cilium/cilium-cli/releases/download/${CILIUM_CLI_VERSION}/cilium-linux-${CLI_ARCH}.tar.gz"{,.sha256sum} + sha256sum --check cilium-linux-${CLI_ARCH}.tar.gz.sha256sum + tar xf cilium-linux-${CLI_ARCH}.tar.gz + rm cilium-linux-${CLI_ARCH}.tar.gz{,.sha256sum} + ;; + *) + echo "Unsupported operating system: $OS" + exit 1 + ;; +esac +mkdir -p bin +chmod +x cilium +mv cilium bin +fi ## Install MetalLB. kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/"${METALLB_VERSION}"/config/manifests/metallb-native.yaml @@ -53,9 +92,6 @@ if [ -z "$needCreate" ]; then kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="$(openssl rand -base64 128)" fi -# Wait for MetalLB to become available. -kubectl rollout status -n metallb-system deployment/controller --timeout 5m -kubectl rollout status -n metallb-system daemonset/speaker --timeout 5m # Apply config with addresses based on docker network IPAM. address_ranges="" @@ -82,8 +118,8 @@ if [ -z "${address_ranges}" ]; then exit 1 fi -# Apply MetalLB IPAddressPool and L2Advertisement -kubectl apply -f - </dev/null 2>&1 apiVersion: metallb.io/v1beta1 kind: IPAddressPool metadata: @@ -102,3 +138,29 @@ spec: ipAddressPools: - kube-services EOF +} + +RETRY_INTERVAL=5 # seconds +TIMEOUT=120 # seconds +ELAPSED_TIME=0 + +if [ "$CUSTOM_CNI" = "true" ]; then + CILIUM_BIN="./bin/cilium" + $CILIUM_BIN install --wait --version 1.16.4 + $CILIUM_BIN status --wait +fi + +# Apply MetalLB IPAddressPool and L2Advertisement +echo "Applying configuration with retries..." + # Retry loop + while [ $ELAPSED_TIME -lt $TIMEOUT ]; do + if apply_metallb_ranges; then + echo "Configuration applied successfully." + exit 0 + else + echo "Trying to apply configuration. Retrying in $RETRY_INTERVAL seconds..." + fi + sleep $RETRY_INTERVAL + ELAPSED_TIME=$((ELAPSED_TIME + RETRY_INTERVAL)) + done + diff --git a/tools/make/kube.mk b/tools/make/kube.mk index ecb458e2791..ad9f04efc52 100644 --- a/tools/make/kube.mk +++ b/tools/make/kube.mk @@ -145,6 +145,9 @@ experimental-conformance: create-cluster kube-install-image kube-deploy run-expe .PHONY: benchmark benchmark: create-cluster kube-install-image kube-deploy-for-benchmark-test run-benchmark delete-cluster ## Create a kind cluster, deploy EG into it, run Envoy Gateway benchmark test, and clean up. +.PHONY: resilience +resilience: create-cluster kube-install-image kube-deploy run-resilience delete-cluster ## Create a kind cluster, deploy EG into it, run Envoy Gateway resilience test, and clean up. + .PHONY: e2e e2e: create-cluster kube-install-image kube-deploy \ install-ratelimit install-eg-addons kube-install-examples-image \ @@ -177,6 +180,11 @@ else --run-test $(E2E_RUN_TEST) endif +.PHONY: run-resilience +run-resilience: ## Run resilience tests + @$(LOG_TARGET) + go test -v -tags resilience ./test/resilience --gateway-class=envoy-gateway + .PHONY: run-benchmark run-benchmark: install-benchmark-server prepare-ip-family ## Run benchmark tests @$(LOG_TARGET)