From 3d6153547f2c0f2c04d2d700217b88914141fc89 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Thu, 2 May 2024 15:50:36 -0400 Subject: [PATCH] Automate scale test Problem: Non-functional scale test needs to be run manually. Solution: - Automate scale test. - Use in-cluster Prometheus to collect CPU, memory and NGF metrics. - Use Kubernetes API server to get NGF logs. For development and troubleshooting, it is possible to run scale test locally in Kind cluster. However, it is necessary to bring down the number of HTTPRoutes to 50 or less (roughly). Testing: - Ran this test locally with 64 listeners, 50 routes and 50 upstreams with NGINX OSS. - Ran this test on GKE with the default configuration with NGINX OSS. Out of scope: ensuring this test runs successfully via GitHub pipeline. Closes https://github.com/nginxinc/nginx-gateway-fabric/issues/1368 Largely based on work by Ciara in https://github.com/nginxinc/nginx-gateway-fabric/pull/1804 Co-authored-by: Ciara Stacke --- .../generate_manifests.go | 183 ++-- tests/framework/portforward.go | 46 +- tests/framework/prometheus.go | 292 +++++++ tests/framework/request.go | 32 +- tests/framework/resourcemanager.go | 107 ++- tests/framework/results.go | 63 +- tests/framework/timeout.go | 4 + tests/scale/manifests/prom-clusterrole.yaml | 44 - tests/scale/scale_test.go | 259 ------ tests/scripts/cpu-plot.gp | 20 + tests/scripts/create-gke-cluster.sh | 14 +- tests/scripts/memory-plot.gp | 21 + tests/scripts/ttr-plot.gp | 18 + tests/scripts/vars.env-example | 2 + tests/suite/dataplane_perf_test.go | 2 +- .../manifests/scale/matches.yaml} | 0 .../manifests/scale/upstreams.yaml} | 0 tests/suite/scale_test.go | 817 ++++++++++++++++++ tests/suite/system_suite_test.go | 38 +- tests/suite/upgrade_test.go | 9 +- 20 files changed, 1536 insertions(+), 435 deletions(-) rename tests/{scale => framework}/generate_manifests.go (66%) create mode 100644 tests/framework/prometheus.go delete mode 100644 tests/scale/manifests/prom-clusterrole.yaml delete mode 100644 tests/scale/scale_test.go create mode 100644 tests/scripts/cpu-plot.gp create mode 100644 tests/scripts/memory-plot.gp create mode 100644 tests/scripts/ttr-plot.gp rename tests/{scale/manifests/scale-matches.yaml => suite/manifests/scale/matches.yaml} (100%) rename tests/{scale/manifests/scale-upstreams.yaml => suite/manifests/scale/upstreams.yaml} (100%) create mode 100644 tests/suite/scale_test.go diff --git a/tests/scale/generate_manifests.go b/tests/framework/generate_manifests.go similarity index 66% rename from tests/scale/generate_manifests.go rename to tests/framework/generate_manifests.go index 88f752141d..46c7b92b32 100644 --- a/tests/scale/generate_manifests.go +++ b/tests/framework/generate_manifests.go @@ -1,17 +1,18 @@ -//go:build scale -// +build scale - -package scale +package framework import ( "bytes" + "errors" "fmt" - "os" - "path/filepath" + "io" "text/template" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/yaml" + "sigs.k8s.io/controller-runtime/pkg/client" ) -var gwTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 +const gwTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 kind: Gateway metadata: name: gateway @@ -33,7 +34,7 @@ spec: {{- end -}} {{- end -}}` -var hrTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 +const hrTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 kind: HTTPRoute metadata: name: {{ .Name }} @@ -53,7 +54,7 @@ spec: port: 80` // nolint:all -var secretTmplTxt = `apiVersion: v1 +const secretTmplTxt = `apiVersion: v1 kind: Secret metadata: name: {{ . }} @@ -63,8 +64,7 @@ data: tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2UUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktjd2dnU2pBZ0VBQW9JQkFRQzZtTnJSdUZ2WXZoSE4KbXI3c1FvNUtKSUVDN3N6TFVrNExFeklSNS9yMEVaUjQ2RnRTaGJQd0ZuaXAwMFBxekhpVkhKYy92TjdkQTVLeApQS1VmdFJuQ1J6YldVaTZBZzJpRU93bXF6WUhGbVNpZkFlVjk0RlAxOGtSbjl1ckV3OEpiRXJIUncrVW51L25tCmFMRHF1eGpFTVBweGhuRklCSnYwK1R3djNEVGx6TjNwUlV6dnpidGZvZCtEVTZBSmR6N3Rid1dTNmR6MHc1Z2kKbW9RelZnbFpnVDBJek9FZkV3NVpWMnRMZllHZWRlRVJ1VjhtR041c09va3R2aGxsMU1udHRaMkZNVHgySmVjUQo3K0xBRm9YVnBTS2NjbUFVZ1JBM0xOOHdVZXBVTHZZdFhiUm1QTFc4SjFINmhFeHJHTHBiTERZNmpzbGxBNlZpCk0xMjVjU0hsQWdNQkFBRUNnZ0VBQnpaRE50bmVTdWxGdk9HZlFYaHRFWGFKdWZoSzJBenRVVVpEcUNlRUxvekQKWlV6dHdxbkNRNlJLczUyandWNTN4cU9kUU94bTNMbjNvSHdNa2NZcEliWW82MjJ2dUczYnkwaVEzaFlsVHVMVgpqQmZCcS9UUXFlL2NMdngvSkczQWhFNmJxdFRjZFlXeGFmTmY2eUtpR1dzZk11WVVXTWs4MGVJVUxuRmZaZ1pOCklYNTlSOHlqdE9CVm9Sa3hjYTVoMW1ZTDFsSlJNM3ZqVHNHTHFybmpOTjNBdWZ3ZGRpK1VDbGZVL2l0K1EvZkUKV216aFFoTlRpNVFkRWJLVStOTnYvNnYvb2JvandNb25HVVBCdEFTUE05cmxFemIralQ1WHdWQjgvLzRGY3VoSwoyVzNpcjhtNHVlQ1JHSVlrbGxlLzhuQmZ0eVhiVkNocVRyZFBlaGlPM1FLQmdRRGlrR3JTOTc3cjg3Y1JPOCtQClpoeXltNXo4NVIzTHVVbFNTazJiOTI1QlhvakpZL2RRZDVTdFVsSWE4OUZKZnNWc1JRcEhHaTFCYzBMaTY1YjIKazR0cE5xcVFoUmZ1UVh0UG9GYXRuQzlPRnJVTXJXbDVJN0ZFejZnNkNQMVBXMEg5d2hPemFKZUdpZVpNYjlYTQoybDdSSFZOcC9jTDlYbmhNMnN0Q1lua2Iwd0tCZ1FEUzF4K0crakEyUVNtRVFWNXA1RnRONGcyamsyZEFjMEhNClRIQ2tTazFDRjhkR0Z2UWtsWm5ZbUt0dXFYeXNtekJGcnZKdmt2eUhqbUNYYTducXlpajBEdDZtODViN3BGcVAKQWxtajdtbXI3Z1pUeG1ZMXBhRWFLMXY4SDNINGtRNVl3MWdrTWRybVJHcVAvaTBGaDVpaGtSZS9DOUtGTFVkSQpDcnJjTzhkUVp3S0JnSHA1MzRXVWNCMVZibzFlYStIMUxXWlFRUmxsTWlwRFM2TzBqeWZWSmtFb1BZSEJESnp2ClIrdzZLREJ4eFoyWmJsZ05LblV0YlhHSVFZd3lGelhNcFB5SGxNVHpiZkJhYmJLcDFyR2JVT2RCMXpXM09PRkgKcmppb21TUm1YNmxhaDk0SjRHU0lFZ0drNGw1SHhxZ3JGRDZ2UDd4NGRjUktJWFpLZ0w2dVJSSUpBb0dCQU1CVApaL2p5WStRNTBLdEtEZHUrYU9ORW4zaGxUN3hrNXRKN3NBek5rbWdGMU10RXlQUk9Xd1pQVGFJbWpRbk9qbHdpCldCZ2JGcXg0M2ZlQ1Z4ZXJ6V3ZEM0txaWJVbWpCTkNMTGtYeGh3ZEVteFQwVit2NzZGYzgwaTNNYVdSNnZZR08KditwVVovL0F6UXdJcWZ6dlVmV2ZxdStrMHlhVXhQOGNlcFBIRyt0bEFvR0FmQUtVVWhqeFU0Ym5vVzVwVUhKegpwWWZXZXZ5TW54NWZyT2VsSmRmNzlvNGMvMHhVSjh1eFBFWDFkRmNrZW96dHNpaVFTNkN6MENRY09XVWxtSkRwCnVrdERvVzM3VmNSQU1BVjY3NlgxQVZlM0UwNm5aL2g2Tkd4Z28rT042Q3pwL0lkMkJPUm9IMFAxa2RjY1NLT3kKMUtFZlNnb1B0c1N1eEpBZXdUZmxDMXc9Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K ` -var appTmplTxt = `apiVersion: v1 -apiVersion: apps/v1 +const appTmplTxt = `apiVersion: apps/v1 kind: Deployment metadata: name: {{ . }} @@ -105,25 +105,55 @@ var ( appTmpl = template.Must(template.New("app").Parse(appTmplTxt)) ) -type Listener struct { +type listener struct { Name string HostnamePrefix string SecretName string } -type Route struct { +type route struct { Name string ListenerName string HostnamePrefix string BackendName string } -func getPrereqDirName(manifestDir string) string { - return filepath.Join(manifestDir, "prereqs") +// ScaleObjects contains objects for scale testing. +type ScaleObjects struct { + // BaseObjects contains objects that are common to all scale iterations. + BaseObjects []client.Object + // ScaleIterationGroups contains objects for each scale iteration. + ScaleIterationGroups [][]client.Object } -func generateScaleListenerManifests(numListeners int, manifestDir string, tls bool) error { - listeners := make([]Listener, 0) +func decodeObjects(reader io.Reader) ([]client.Object, error) { + var objects []client.Object + + decoder := yaml.NewYAMLOrJSONDecoder(reader, 4096) + for { + obj := unstructured.Unstructured{} + if err := decoder.Decode(&obj); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, fmt.Errorf("error decoding resource: %w", err) + } + + if len(obj.Object) == 0 { + continue + } + + objects = append(objects, &obj) + } + + return objects, nil +} + +// GenerateScaleListenerObjects generates objects for a given number of listeners for the scale test. +func GenerateScaleListenerObjects(numListeners int, tls bool) (ScaleObjects, error) { + var result ScaleObjects + + listeners := make([]listener, 0) backends := make([]string, 0) secrets := make([]string, 0) @@ -138,13 +168,13 @@ func generateScaleListenerManifests(numListeners int, manifestDir string, tls bo secrets = append(secrets, secretName) } - listeners = append(listeners, Listener{ + listeners = append(listeners, listener{ Name: listenerName, HostnamePrefix: hostnamePrefix, SecretName: secretName, }) - route := Route{ + r := route{ Name: fmt.Sprintf("route-%d", i), ListenerName: listenerName, HostnamePrefix: hostnamePrefix, @@ -153,44 +183,57 @@ func generateScaleListenerManifests(numListeners int, manifestDir string, tls bo backends = append(backends, backendName) - if err := generateManifests(manifestDir, i, listeners, []Route{route}); err != nil { - return err + objects, err := generateManifests(listeners, []route{r}) + if err != nil { + return ScaleObjects{}, err } + + result.ScaleIterationGroups = append(result.ScaleIterationGroups, objects) } - if err := generateSecrets(getPrereqDirName(manifestDir), secrets); err != nil { - return err + secretObjects, err := generateSecrets(secrets) + if err != nil { + return ScaleObjects{}, err } - return generateBackendAppManifests(getPrereqDirName(manifestDir), backends) -} + result.BaseObjects = append(result.BaseObjects, secretObjects...) -func generateSecrets(secretsDir string, secrets []string) error { - err := os.Mkdir(secretsDir, 0o750) - if err != nil && !os.IsExist(err) { - return err + backendObjects, err := generateBackendAppObjects(backends) + if err != nil { + return ScaleObjects{}, err } + result.BaseObjects = append(result.BaseObjects, backendObjects...) + + return result, nil +} + +func generateSecrets(secrets []string) ([]client.Object, error) { + objects := make([]client.Object, 0, len(secrets)) + for _, secret := range secrets { var buf bytes.Buffer - if err = secretTmpl.Execute(&buf, secret); err != nil { - return err + if err := secretTmpl.Execute(&buf, secret); err != nil { + return nil, err } - path := filepath.Join(secretsDir, fmt.Sprintf("%s.yaml", secret)) - - fmt.Println("Writing", path) - if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil { - return err + objs, err := decodeObjects(&buf) + if err != nil { + return nil, err } + + objects = append(objects, objs...) } - return nil + return objects, nil } -func generateScaleHTTPRouteManifests(numRoutes int, manifestDir string) error { - l := Listener{ +// GenerateScaleHTTPRouteObjects generates objects for a given number of routes for the scale test. +func GenerateScaleHTTPRouteObjects(numRoutes int) (ScaleObjects, error) { + var result ScaleObjects + + l := listener{ Name: "listener", HostnamePrefix: "*", } @@ -198,35 +241,43 @@ func generateScaleHTTPRouteManifests(numRoutes int, manifestDir string) error { backendName := "backend" for i := 0; i < numRoutes; i++ { - - route := Route{ + r := route{ Name: fmt.Sprintf("route-%d", i), HostnamePrefix: fmt.Sprintf("%d", i), ListenerName: "listener", BackendName: backendName, } - var listeners []Listener + var listeners []listener if i == 0 { // only generate a Gateway on the first iteration - listeners = []Listener{l} + listeners = []listener{l} } - if err := generateManifests(manifestDir, i, listeners, []Route{route}); err != nil { - return err + objects, err := generateManifests(listeners, []route{r}) + if err != nil { + return ScaleObjects{}, err } + result.ScaleIterationGroups = append(result.ScaleIterationGroups, objects) + } + + backendObjects, err := generateBackendAppObjects([]string{backendName}) + if err != nil { + return ScaleObjects{}, err } - return generateBackendAppManifests(getPrereqDirName(manifestDir), []string{backendName}) + result.BaseObjects = backendObjects + + return result, nil } -func generateManifests(outDir string, version int, listeners []Listener, routes []Route) error { +func generateManifests(listeners []listener, routes []route) ([]client.Object, error) { var buf bytes.Buffer if len(listeners) > 0 { if err := gwTmpl.Execute(&buf, listeners); err != nil { - return err + return nil, err } } @@ -236,42 +287,30 @@ func generateManifests(outDir string, version int, listeners []Listener, routes } if err := hrTmpl.Execute(&buf, r); err != nil { - return err + return nil, err } } - err := os.Mkdir(outDir, 0o750) - if err != nil && !os.IsExist(err) { - return err - } - - filename := fmt.Sprintf("manifest-%d.yaml", version) - path := filepath.Join(outDir, filename) - - fmt.Println("Writing", path) - return os.WriteFile(path, buf.Bytes(), 0o600) + return decodeObjects(&buf) } -func generateBackendAppManifests(outDir string, backends []string) error { - err := os.Mkdir(outDir, 0o750) - if err != nil && !os.IsExist(err) { - return err - } +func generateBackendAppObjects(backends []string) ([]client.Object, error) { + objects := make([]client.Object, 0, 2*len(backends)) for _, backend := range backends { var buf bytes.Buffer - if err = appTmpl.Execute(&buf, backend); err != nil { - return err + if err := appTmpl.Execute(&buf, backend); err != nil { + return nil, err } - path := filepath.Join(outDir, fmt.Sprintf("%s.yaml", backend)) - - fmt.Println("Writing", path) - if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil { - return err + objs, err := decodeObjects(&buf) + if err != nil { + return nil, err } + + objects = append(objects, objs...) } - return nil + return objects, nil } diff --git a/tests/framework/portforward.go b/tests/framework/portforward.go index 1efc16be3c..e8234fea7c 100644 --- a/tests/framework/portforward.go +++ b/tests/framework/portforward.go @@ -6,22 +6,25 @@ import ( "net/http" "net/url" "path" + "time" + + "log/slog" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" ) -// PortForward starts a port-forward to the specified Pod and returns the local port being forwarded. -func PortForward(config *rest.Config, namespace, podName string, stopCh chan struct{}) (int, error) { +// PortForward starts a port-forward to the specified Pod. +func PortForward(config *rest.Config, namespace, podName string, ports []string, stopCh <-chan struct{}) error { roundTripper, upgrader, err := spdy.RoundTripperFor(config) if err != nil { - return 0, fmt.Errorf("error creating roundtripper: %w", err) + return fmt.Errorf("error creating roundtripper: %w", err) } serverURL, err := url.Parse(config.Host) if err != nil { - return 0, fmt.Errorf("error parsing rest config host: %w", err) + return fmt.Errorf("error parsing rest config host: %w", err) } serverURL.Path = path.Join( @@ -33,25 +36,34 @@ func PortForward(config *rest.Config, namespace, podName string, stopCh chan str dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, serverURL) - readyCh := make(chan struct{}, 1) out, errOut := new(bytes.Buffer), new(bytes.Buffer) - forwarder, err := portforward.New(dialer, []string{":80"}, stopCh, readyCh, out, errOut) - if err != nil { - return 0, fmt.Errorf("error creating port forwarder: %w", err) + forward := func() error { + readyCh := make(chan struct{}, 1) + + forwarder, err := portforward.New(dialer, ports, stopCh, readyCh, out, errOut) + if err != nil { + return fmt.Errorf("error creating port forwarder: %w", err) + } + + return forwarder.ForwardPorts() } go func() { - if err := forwarder.ForwardPorts(); err != nil { - panic(err) + for { + if err := forward(); err != nil { + slog.Error("error forwarding ports", "error", err) + slog.Info("retrying port forward in 100ms...") + } + + select { + case <-stopCh: + return + case <-time.After(100 * time.Millisecond): + // retrying + } } }() - <-readyCh - ports, err := forwarder.GetPorts() - if err != nil { - return 0, fmt.Errorf("error getting ports being forwarded: %w", err) - } - - return int(ports[0].Local), nil + return nil } diff --git a/tests/framework/prometheus.go b/tests/framework/prometheus.go new file mode 100644 index 0000000000..eefca29fe5 --- /dev/null +++ b/tests/framework/prometheus.go @@ -0,0 +1,292 @@ +package framework + +import ( + "context" + "encoding/csv" + "errors" + "fmt" + "log/slog" + "os" + "os/exec" + "time" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + prometheusNamespace = "prom" + prometheusReleaseName = "prom" +) + +var defaultPrometheusQueryTimeout = 2 * time.Second + +// PrometheusConfig is the configuration for installing Prometheus +type PrometheusConfig struct { + // ScrapeInterval is the interval at which Prometheus scrapes metrics. + ScrapeInterval time.Duration + // QueryTimeout is the timeout for Prometheus queries. + // Default is 2s. + QueryTimeout time.Duration +} + +// InstallPrometheus installs Prometheus in the cluster. +// It waits for Prometheus pods to be ready before returning. +func InstallPrometheus( + ctx context.Context, + rm ResourceManager, + cfg PrometheusConfig, +) (PrometheusInstance, error) { + output, err := exec.Command( + "helm", + "repo", + "add", + "prometheus-community", + "https://prometheus-community.github.io/helm-charts", + ).CombinedOutput() + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to add Prometheus helm repo: %w; output: %s", err, string(output)) + } + + output, err = exec.Command( + "helm", + "repo", + "update", + ).CombinedOutput() + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to update helm repos: %w; output: %s", err, string(output)) + } + + scrapeInterval := fmt.Sprintf("%ds", int(cfg.ScrapeInterval.Seconds())) + + output, err = exec.Command( + "helm", + "install", + prometheusReleaseName, + "prometheus-community/prometheus", + "--create-namespace", + "--namespace", prometheusNamespace, + "--set", fmt.Sprintf("server.global.scrape_interval=%s", scrapeInterval), + "--wait", + ).CombinedOutput() + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to install Prometheus: %w; output: %s", err, string(output)) + } + + pods, err := rm.GetPods(prometheusNamespace, client.MatchingLabels{ + "app.kubernetes.io/name": "prometheus", + }) + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to get Prometheus pods: %w", err) + } + + if len(pods) == 0 { + return PrometheusInstance{}, errors.New("no Prometheus pods found") + } + + if len(pods) > 1 { + return PrometheusInstance{}, errors.New("multiple Prometheus pods found, expected one") + } + + pod := pods[0] + + if pod.Status.PodIP == "" { + return PrometheusInstance{}, errors.New("Prometheus pod has no IP") + } + + var queryTimeout time.Duration + if cfg.QueryTimeout == 0 { + queryTimeout = defaultPrometheusQueryTimeout + } else { + queryTimeout = cfg.QueryTimeout + } + + return PrometheusInstance{ + podIP: pod.Status.PodIP, + podName: pod.Name, + podNamespace: pod.Namespace, + queryTimeout: queryTimeout, + }, nil +} + +// UninstallPrometheus uninstalls Prometheus from the cluster. +func UninstallPrometheus() error { + output, err := exec.Command( + "helm", + "uninstall", + prometheusReleaseName, + "-n", prometheusNamespace, + ).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to uninstall Prometheus: %w; output: %s", err, string(output)) + } + + return nil +} + +const ( + // PrometheusPortForwardPort is the local port that will forward to the Prometheus API. + PrometheusPortForwardPort = 9090 + prometheusAPIPort = 9090 +) + +// PrometheusInstance represents a Prometheus instance in the cluster. +type PrometheusInstance struct { + podIP string + podName string + podNamespace string + portForward bool + queryTimeout time.Duration + + apiClient v1.API +} + +// PortForward starts port forwarding to the Prometheus instance. +func (ins *PrometheusInstance) PortForward(config *rest.Config, stopCh <-chan struct{}) error { + if ins.portForward { + panic("port forwarding already started") + } + + ins.portForward = true + + ports := []string{fmt.Sprintf("%d:%d", PrometheusPortForwardPort, prometheusAPIPort)} + return PortForward(config, ins.podNamespace, ins.podName, ports, stopCh) +} + +func (ins *PrometheusInstance) getAPIClient() (v1.API, error) { + var endpoint string + if ins.portForward { + endpoint = fmt.Sprintf("http://localhost:%d", PrometheusPortForwardPort) + } else { + // on GKE, test runner VM can access the pod directly + endpoint = fmt.Sprintf("http://%s:%d", ins.podIP, prometheusAPIPort) + } + + cfg := api.Config{ + Address: fmt.Sprintf("%s", endpoint), + } + + c, err := api.NewClient(cfg) + if err != nil { + return nil, err + } + + return v1.NewAPI(c), nil +} + +func (ins *PrometheusInstance) ensureAPIClient() error { + if ins.apiClient == nil { + api, err := ins.getAPIClient() + if err != nil { + return fmt.Errorf("failed to get Prometheus API client: %w", err) + } + ins.apiClient = api + } + + return nil +} + +// Query sends a query to Prometheus. +func (ins *PrometheusInstance) Query(query string) (model.Value, error) { + ctx, cancel := context.WithTimeout(context.Background(), ins.queryTimeout) + defer cancel() + + return ins.QueryWithCtx(ctx, query) +} + +// QueryWithCtx sends a query to Prometheus with the specified context. +func (ins *PrometheusInstance) QueryWithCtx(ctx context.Context, query string) (model.Value, error) { + if err := ins.ensureAPIClient(); err != nil { + return nil, err + } + + result, warnings, err := ins.apiClient.Query(ctx, query, time.Time{}) + if err != nil { + return nil, fmt.Errorf("failed to query Prometheus: %w", err) + } + + if len(warnings) > 0 { + slog.Info("Prometheus query returned warnings", + "query", query, + "warnings", warnings, + ) + } + + return result, nil +} + +// QueryRange sends a range query to Prometheus. +func (ins *PrometheusInstance) QueryRange(query string, promRange v1.Range) (model.Value, error) { + ctx, cancel := context.WithTimeout(context.Background(), ins.queryTimeout) + defer cancel() + + return ins.QueryRangeWithCtx(ctx, query, promRange) +} + +// QueryRangeWithCtx sends a range query to Prometheus with the specified context. +func (ins *PrometheusInstance) QueryRangeWithCtx(ctx context.Context, query string, promRange v1.Range) (model.Value, error) { + if err := ins.ensureAPIClient(); err != nil { + return nil, err + } + + result, warnings, err := ins.apiClient.QueryRange(ctx, query, promRange) + if err != nil { + return nil, fmt.Errorf("failed to query Prometheus: %w", err) + } + + if len(warnings) > 0 { + slog.Info("Prometheus range query returned warnings", + "query", query, + "range", promRange, + "warnings", warnings, + ) + } + + return result, nil +} + +// GetFirstValueOfPrometheusVector returns the first value of a Prometheus vector. +func GetFirstValueOfPrometheusVector(val model.Value) (float64, error) { + res, ok := val.(model.Vector) + if !ok { + return 0, fmt.Errorf("expected a vector, got %T", val) + } + + if len(res) == 0 { + return 0, errors.New("empty vector") + } + + return float64(res[0].Value), nil +} + +// WritePrometheusMatrixToCSVFile writes a Prometheus matrix to a CSV file. +func WritePrometheusMatrixToCSVFile(fileName string, value model.Value) error { + file, err := os.Create(fileName) + if err != nil { + return err + } + defer file.Close() + + csvWriter := csv.NewWriter(file) + + matrix, ok := value.(model.Matrix) + if !ok { + return fmt.Errorf("expected a matrix, got %T", value) + } + + for _, sample := range matrix { + for _, pair := range sample.Values { + record := []string{fmt.Sprint(pair.Timestamp.Unix()), pair.Value.String()} + if err := csvWriter.Write(record); err != nil { + return err + } + } + } + + csvWriter.Flush() + + return nil +} diff --git a/tests/framework/request.go b/tests/framework/request.go index 674a35ed45..588e5ddf1d 100644 --- a/tests/framework/request.go +++ b/tests/framework/request.go @@ -3,6 +3,7 @@ package framework import ( "bytes" "context" + "crypto/tls" "fmt" "net" "net/http" @@ -16,14 +17,27 @@ import ( func Get(url, address string, timeout time.Duration) (int, string, error) { dialer := &net.Dialer{} - http.DefaultTransport.(*http.Transport).DialContext = func( - ctx context.Context, - network, - addr string, - ) (net.Conn, error) { - split := strings.Split(addr, ":") - port := split[len(split)-1] - return dialer.DialContext(ctx, network, fmt.Sprintf("%s:%s", address, port)) + transport := &http.Transport{ + DialContext: func( + ctx context.Context, + network, + addr string, + ) (net.Conn, error) { + split := strings.Split(addr, ":") + port := split[len(split)-1] + return dialer.DialContext(ctx, network, fmt.Sprintf("%s:%s", address, port)) + }, + } + + client := &http.Client{ + Transport: transport, + Timeout: timeout, + } + + if strings.HasPrefix(url, "https") { + t := client.Transport.(*http.Transport).Clone() + t.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + client.Transport = t } ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -34,7 +48,7 @@ func Get(url, address string, timeout time.Duration) (int, string, error) { return 0, "", err } - resp, err := http.DefaultClient.Do(req) + resp, err := client.Do(req) if err != nil { return 0, "", err } diff --git a/tests/framework/resourcemanager.go b/tests/framework/resourcemanager.go index e62cb5db87..89f1772256 100644 --- a/tests/framework/resourcemanager.go +++ b/tests/framework/resourcemanager.go @@ -27,9 +27,12 @@ import ( "fmt" "io" "net/http" + "reflect" "strings" "time" + "k8s.io/client-go/util/retry" + apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -71,7 +74,17 @@ func (rm *ResourceManager) Apply(resources []client.Object) error { defer cancel() for _, resource := range resources { - if err := rm.K8sClient.Get(ctx, client.ObjectKeyFromObject(resource), resource); err != nil { + var obj client.Object + + unstructuredObj, ok := resource.(*unstructured.Unstructured) + if ok { + obj = unstructuredObj.DeepCopy() + } else { + t := reflect.TypeOf(resource).Elem() + obj = reflect.New(t).Interface().(client.Object) + } + + if err := rm.K8sClient.Get(ctx, client.ObjectKeyFromObject(resource), obj); err != nil { if !apierrors.IsNotFound(err) { return fmt.Errorf("error getting resource: %w", err) } @@ -83,7 +96,19 @@ func (rm *ResourceManager) Apply(resources []client.Object) error { continue } - if err := rm.K8sClient.Update(ctx, resource); err != nil { + // Some tests modify resources that are also modified by NGF (to update their status), so conflicts are possible + // For example, a Gateway resource. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := rm.K8sClient.Get(ctx, client.ObjectKeyFromObject(resource), obj); err != nil { + return err + } + resource.SetResourceVersion(obj.GetResourceVersion()) + if err := rm.K8sClient.Update(ctx, resource); err != nil { + return err + } + return nil + }) + if err != nil { return fmt.Errorf("error updating resource: %w", err) } } @@ -112,8 +137,19 @@ func (rm *ResourceManager) ApplyFromFiles(files []string, namespace string) erro return nil } - obj.SetResourceVersion(fetchedObj.GetResourceVersion()) - if err := rm.K8sClient.Update(ctx, &obj); err != nil { + // Some tests modify resources that are also modified by NGF (to update their status), so conflicts are possible + // For example, a Gateway resource. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := rm.K8sClient.Get(ctx, nsName, fetchedObj); err != nil { + return err + } + obj.SetResourceVersion(fetchedObj.GetResourceVersion()) + if err := rm.K8sClient.Update(ctx, &obj); err != nil { + return err + } + return nil + }) + if err != nil { return fmt.Errorf("error updating resource: %w", err) } @@ -137,7 +173,7 @@ func (rm *ResourceManager) Delete(resources []client.Object) error { return nil } -// DeleteFromFile deletes Kubernetes resources defined within the provided YAML files. +// DeleteFromFiles deletes Kubernetes resources defined within the provided YAML files. func (rm *ResourceManager) DeleteFromFiles(files []string, namespace string) error { handlerFunc := func(obj unstructured.Unstructured) error { obj.SetNamespace(namespace) @@ -241,7 +277,13 @@ func (rm *ResourceManager) WaitForAppsToBeReady(namespace string) error { ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.CreateTimeout) defer cancel() - if err := rm.waitForPodsToBeReady(ctx, namespace); err != nil { + return rm.WaitForAppsToBeReadyWithCtx(ctx, namespace) +} + +// WaitForAppsToBeReadyWithCtx waits for all apps in the specified namespace to be ready or +// until the provided context is cancelled. +func (rm *ResourceManager) WaitForAppsToBeReadyWithCtx(ctx context.Context, namespace string) error { + if err := rm.WaitForPodsToBeReady(ctx, namespace); err != nil { return err } @@ -252,7 +294,9 @@ func (rm *ResourceManager) WaitForAppsToBeReady(namespace string) error { return rm.waitForGatewaysToBeReady(ctx, namespace) } -func (rm *ResourceManager) waitForPodsToBeReady(ctx context.Context, namespace string) error { +// WaitForPodsToBeReady waits for all Pods in the specified namespace to be ready or +// until the provided context is cancelled. +func (rm *ResourceManager) WaitForPodsToBeReady(ctx context.Context, namespace string) error { return wait.PollUntilContextCancel( ctx, 500*time.Millisecond, @@ -447,6 +491,37 @@ func (rm *ResourceManager) GetPodNames(namespace string, labels client.MatchingL return names, nil } +// GetPods returns all Pods in the specified namespace that match the given labels. +func (rm *ResourceManager) GetPods(namespace string, labels client.MatchingLabels) ([]core.Pod, error) { + ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.GetTimeout) + defer cancel() + + var podList core.PodList + if err := rm.K8sClient.List( + ctx, + &podList, + client.InNamespace(namespace), + labels, + ); err != nil { + return nil, fmt.Errorf("error getting list of Pods: %w", err) + } + + return podList.Items, nil +} + +// GetPod returns the Pod in the specified namespace with the given name. +func (rm *ResourceManager) GetPod(namespace, name string) (*core.Pod, error) { + ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.GetTimeout) + defer cancel() + + var pod core.Pod + if err := rm.K8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &pod); err != nil { + return nil, fmt.Errorf("error getting Pod: %w", err) + } + + return &pod, nil +} + // GetPodLogs returns the logs from the specified Pod func (rm *ResourceManager) GetPodLogs(namespace, name string, opts *core.PodLogOptions) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.GetTimeout) @@ -494,6 +569,24 @@ func (rm *ResourceManager) GetNGFDeployment(namespace, releaseName string) (*app return &deployment, nil } +// ScaleDeployment scales the Deployment to the specified number of replicas. +func (rm *ResourceManager) ScaleDeployment(namespace, name string, replicas int32) error { + ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.UpdateTimeout) + defer cancel() + + var deployment apps.Deployment + if err := rm.K8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &deployment); err != nil { + return fmt.Errorf("error getting Deployment: %w", err) + } + + deployment.Spec.Replicas = &replicas + if err := rm.K8sClient.Update(ctx, &deployment); err != nil { + return fmt.Errorf("error updating Deployment: %w", err) + } + + return nil +} + // GetReadyNGFPodNames returns the name(s) of the NGF Pod(s). func GetReadyNGFPodNames( k8sClient client.Client, diff --git a/tests/framework/results.go b/tests/framework/results.go index 7d5b8ad2ee..0e02a0037e 100644 --- a/tests/framework/results.go +++ b/tests/framework/results.go @@ -1,6 +1,7 @@ package framework import ( + "encoding/csv" "fmt" "io" "os" @@ -64,24 +65,48 @@ func WriteSystemInfoToFile(file *os.File, ci ClusterInfo, plus bool) error { return nil } -// GeneratePNG generates a PNG using gnuplot. -func GeneratePNG(resultsDir, inputFilename, outputFilename string) ([]byte, error) { +func generatePNG(resultsDir, inputFilename, outputFilename, configFilename string) error { pwd, err := os.Getwd() if err != nil { - return nil, err + return err } - gnuplotCfg := filepath.Join(filepath.Dir(pwd), "scripts", "requests-plot.gp") + gnuplotCfg := filepath.Join(filepath.Dir(pwd), "scripts", configFilename) files := fmt.Sprintf("inputfile='%s';outputfile='%s'", inputFilename, outputFilename) cmd := exec.Command("gnuplot", "-e", files, "-c", gnuplotCfg) cmd.Dir = resultsDir - return cmd.CombinedOutput() + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to generate PNG: %w; output: %s", err, string(output)) + } + + return nil +} + +// GenerateRequestsPNG generates a Requests PNG using gnuplot. +func GenerateRequestsPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "requests-plot.gpgp") +} + +// GenerateTTRPNG generates a TTR PNG using gnuplot. +func GenerateTTRPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "ttr-plot.gp") +} + +// GenerateCPUPNG generates a CPU usage PNG using gnuplot. +func GenerateCPUPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "cpu-plot.gp") } -// WriteResults writes the vegeta metrics results to the results file in text format. -func WriteResults(resultsFile *os.File, metrics *Metrics) error { +// GenerateMemoryPNG generates a Memory usage PNG using gnuplot. +func GenerateMemoryPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "memory-plot.gp") +} + +// WriteVegetaResults writes the vegeta metrics results to the results file in text format. +func WriteVegetaResults(resultsFile *os.File, metrics *Metrics) error { reporter := vegeta.NewTextReporter(&metrics.Metrics) return reporter.Report(resultsFile) @@ -96,7 +121,27 @@ func WriteContent(resultsFile *os.File, content string) error { return nil } -// NewCSVEncoder returns a vegeta CSV encoder. -func NewCSVEncoder(w io.Writer) vegeta.Encoder { +// NewVegetaCSVEncoder returns a vegeta CSV encoder. +func NewVegetaCSVEncoder(w io.Writer) vegeta.Encoder { return vegeta.NewCSVEncoder(w) } + +// NewCSVResultsWriter creates and returns a CSV results file and writer. +func NewCSVResultsWriter(resultsDir, fileName string, resultHeaders ...string) (*os.File, *csv.Writer, error) { + if err := os.MkdirAll(resultsDir, 0o750); err != nil { + return nil, nil, err + } + + file, err := os.OpenFile(filepath.Join(resultsDir, fileName), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + return nil, nil, err + } + + writer := csv.NewWriter(file) + + if err = writer.Write(resultHeaders); err != nil { + return nil, nil, err + } + + return file, writer, nil +} diff --git a/tests/framework/timeout.go b/tests/framework/timeout.go index d49e988a50..f872977d6d 100644 --- a/tests/framework/timeout.go +++ b/tests/framework/timeout.go @@ -6,6 +6,9 @@ type TimeoutConfig struct { // CreateTimeout represents the maximum time for a Kubernetes object to be created. CreateTimeout time.Duration + // UpdateTimeout represents the maximum time for a Kubernetes object to be updated. + UpdateTimeout time.Duration + // DeleteTimeout represents the maximum time for a Kubernetes object to be deleted. DeleteTimeout time.Duration @@ -23,6 +26,7 @@ type TimeoutConfig struct { func DefaultTimeoutConfig() TimeoutConfig { return TimeoutConfig{ CreateTimeout: 60 * time.Second, + UpdateTimeout: 60 * time.Second, DeleteTimeout: 10 * time.Second, GetTimeout: 10 * time.Second, ManifestFetchTimeout: 10 * time.Second, diff --git a/tests/scale/manifests/prom-clusterrole.yaml b/tests/scale/manifests/prom-clusterrole.yaml deleted file mode 100644 index f8aefdd36e..0000000000 --- a/tests/scale/manifests/prom-clusterrole.yaml +++ /dev/null @@ -1,44 +0,0 @@ -apiVersion: v1 -kind: Namespace -metadata: - name: prom ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: prometheus - namespace: prom ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: prometheus - namespace: prom -rules: -- apiGroups: [""] - resources: - - nodes - - services - - endpoints - - pods - verbs: ["get", "list", "watch"] -- apiGroups: [""] - resources: - - configmaps - verbs: ["get"] -- nonResourceURLs: ["/metrics"] - verbs: ["get"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: prometheus - namespace: prom -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: prometheus -subjects: -- kind: ServiceAccount - name: prometheus - namespace: prom diff --git a/tests/scale/scale_test.go b/tests/scale/scale_test.go deleted file mode 100644 index 89f87a1d82..0000000000 --- a/tests/scale/scale_test.go +++ /dev/null @@ -1,259 +0,0 @@ -//go:build scale -// +build scale - -package scale - -import ( - "context" - "crypto/tls" - "encoding/csv" - "flag" - "fmt" - "net/http" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "testing" - "time" - - "k8s.io/apimachinery/pkg/util/wait" -) - -// testing flags -var ( - numIterations = flag.Int("i", 1, "number of times to scale the resource") - delay = flag.Duration("delay", 0, "delay between each scaling iteration") - version = flag.String("version", "1.2.0", "version of NGF under test") - plus = flag.Bool("plus", false, "nginx-plus enabled") -) - -func TestScale_Listeners(t *testing.T) { - ip := getIP(t) - url := fmt.Sprintf("http://%s/", ip) - - runScaleTest( - t, - []string{"# Listeners", "Time to Ready (s)", "Error"}, - func(dir string) error { - return generateScaleListenerManifests(*numIterations, dir, false /*non-tls*/) - }, - url, - ) -} - -func TestScale_HTTPSListeners(t *testing.T) { - ip := getIP(t) - url := fmt.Sprintf("https://%s/", ip) - - runScaleTest( - t, - []string{"# HTTPS Listeners", "Time to Ready (s)", "Error"}, - func(dir string) error { - return generateScaleListenerManifests(*numIterations, dir, true /*tls*/) - }, - url, - ) -} - -func TestScale_HTTPRoutes(t *testing.T) { - ip := getIP(t) - url := fmt.Sprintf("http://%s/", ip) - - runScaleTest( - t, - []string{"# HTTPRoutes", "Time to Ready (s)", "Error"}, - func(dir string) error { - return generateScaleHTTPRouteManifests(*numIterations, dir) - }, - url, - ) -} - -func runScaleTest( - t *testing.T, - resultHeaders []string, - generateManifests func(dir string) error, - url string, -) { - t.Helper() - manifestDir := t.Name() - - writer := newResultsWriter(t, t.Name(), resultHeaders...) - - if err := generateManifests(manifestDir); err != nil { - t.Fatalf("failed to generate manifests: %s", err) - } - - startTime := time.Now() - startUnix := fmt.Sprintf("%d", startTime.Unix()) - - if err := kubectlApply(getPrereqDirName(manifestDir)); err != nil { - t.Fatalf("failed to apply prerequisite resources: %s", err) - } - - t.Log("Waiting for all Pods to be Ready") - if err := kubectlWaitAllPodsReady(); err != nil { - t.Fatalf("failed to wait for all Pods to be Ready: %s", err) - } - - for i := 0; i < *numIterations; i++ { - t.Logf("Scaling up to %d resources", i) - - manifestFile := filepath.Join(manifestDir, fmt.Sprintf("manifest-%d.yaml", i)) - - if err := kubectlApply(manifestFile); err != nil { - t.Errorf("failed to scale up: %s", err) - } - - host := fmt.Sprintf("%d.example.com", i) - - t.Logf("Sending request to url %s with host %s...", url, host) - - ttr, err := waitForResponseForHost(url, host) - - seconds := ttr.Seconds() - record := []string{strconv.Itoa(i + 1), strconv.FormatFloat(seconds, 'f', -1, 64)} - if err != nil { - record = append(record, err.Error()) - } - - if err = writer.Write(record); err != nil { - t.Fatalf("failed to write time to ready to csv file: %s", err) - } - - time.Sleep(*delay) - } - - endTime := time.Now() - endUnix := fmt.Sprintf("%d", endTime.Unix()) - - // This accounts for prometheus 10s scraping window - endUnixPlusTen := fmt.Sprintf("%d", endTime.Add(10*time.Second).Unix()) - - records := [][]string{ - {"Test Start", "Test End", "Test End + 10s", "Duration"}, - {startUnix, endUnix, endUnixPlusTen, endTime.Sub(startTime).String()}, - } - - if err := writer.WriteAll(records); err != nil { - t.Logf("failed to write records to csv") - } -} - -func getIP(t *testing.T) string { - t.Helper() - - ip := os.Getenv("NGF_IP") - if ip == "" { - t.Fatalf("NGF_IP env var not set") - } - - return ip -} - -func newResultsWriter(t *testing.T, testName string, resultHeaders ...string) *csv.Writer { - t.Helper() - - versionDir := filepath.Join("results", *version) - if err := os.Mkdir(versionDir, 0o750); err != nil && !os.IsExist(err) { - t.Fatalf("failed to create results version directory: %s", err) - } - - testDirName := testName - if *plus { - testDirName += "_Plus" - } - - dir := filepath.Join(versionDir, testDirName) - if err := os.Mkdir(dir, 0o750); err != nil { - t.Fatalf("failed to create results test directory: %s", err) - } - - file, err := os.Create(filepath.Join(dir, "results.csv")) - if err != nil { - t.Fatalf("failed to create results csv file: %s", err) - } - - writer := csv.NewWriter(file) - - if err = writer.Write(resultHeaders); err != nil { - t.Fatalf("failed to write headers to csv file: %s", err) - } - - t.Cleanup(func() { - writer.Flush() - _ = file.Close() - }) - - return writer -} - -func kubectlApply(filename string) error { - if err := kubectlExec("apply", "-f", filename); err != nil { - return fmt.Errorf("error applying %s: %w", filename, err) - } - - return nil -} - -func kubectlWaitAllPodsReady() error { - if err := kubectlExec("wait", "pod", "--all", "--for=condition=Ready"); err != nil { - return fmt.Errorf("error waiting for all pods to be ready:%w", err) - } - - return nil -} - -func kubectlExec(arg ...string) error { - cmd := exec.Command("kubectl", arg...) - - return cmd.Run() -} - -func waitForResponseForHost(url, host string) (time.Duration, error) { - client := &http.Client{} - - if strings.HasPrefix(url, "https") { - customTransport := http.DefaultTransport.(*http.Transport) - customTransport.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: true, // nolint: gosec - ServerName: host, - } - client.Transport = customTransport - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return 0, err - } - - req.Host = host - - start := time.Now() - - err = wait.PollUntilContextCancel( - ctx, - 200*time.Millisecond, - true, - func(ctx context.Context) (done bool, err error) { - resp, err := client.Do(req) - if err != nil { - fmt.Println("Retrying GET request", "error", err) - return false, err - } - - if resp.StatusCode == http.StatusOK { - return true, nil - } - - fmt.Println("Retrying GET request", "host", host, "status", resp.Status) - return false, nil - }) - - return time.Since(start), err -} diff --git a/tests/scripts/cpu-plot.gp b/tests/scripts/cpu-plot.gp new file mode 100644 index 0000000000..71584811a3 --- /dev/null +++ b/tests/scripts/cpu-plot.gp @@ -0,0 +1,20 @@ +set terminal png size 800,600 +set title "CPU Usage" +set datafile separator "," +set output outputfile . "" + +# X-axis settings +set xlabel "Timestamp" +set xdata time +set timefmt "%s" +set format x "%M:%S" +set xrange [*:*] +set xtics nomirror + +# Y-axis settings +set yrange [0:*] +set ylabel "CPU Usage (core seconds)" +set format y "%.2f" + +# Plotting data +plot inputfile using 1:2 with lines lw 2 notitle diff --git a/tests/scripts/create-gke-cluster.sh b/tests/scripts/create-gke-cluster.sh index 9d034e1c62..b5a2061f8b 100644 --- a/tests/scripts/create-gke-cluster.sh +++ b/tests/scripts/create-gke-cluster.sh @@ -6,6 +6,16 @@ ip_random_digit=$((1 + $RANDOM % 250)) IS_CI=${1:-false} +if [ -z "$GKE_MACHINE_TYPE" ]; then + # If the environment variable is not set, use a default value + GKE_MACHINE_TYPE="e2-medium" +fi + +if [ -z "$GKE_NUM_NODES" ]; then + # If the environment variable is not set, use a default value + GKE_NUM_NODES="3" +fi + gcloud container clusters create ${GKE_CLUSTER_NAME} \ --project ${GKE_PROJECT} \ --zone ${GKE_CLUSTER_ZONE} \ @@ -16,7 +26,9 @@ gcloud container clusters create ${GKE_CLUSTER_NAME} \ --master-ipv4-cidr 172.16.${ip_random_digit}.32/28 \ --metadata=block-project-ssh-keys=TRUE \ --monitoring=SYSTEM,POD,DEPLOYMENT \ - --logging=SYSTEM,WORKLOAD + --logging=SYSTEM,WORKLOAD \ + --machine-type ${GKE_MACHINE_TYPE} \ + --num-nodes ${GKE_NUM_NODES} # Add current IP to GKE master control node access, if this script is not invoked during a CI run. if [ "${IS_CI}" = "false" ]; then diff --git a/tests/scripts/memory-plot.gp b/tests/scripts/memory-plot.gp new file mode 100644 index 0000000000..c25614db70 --- /dev/null +++ b/tests/scripts/memory-plot.gp @@ -0,0 +1,21 @@ +# Define a function to convert bytes to Mebibytes +bytes_to_MiB(bytes) = bytes / (1024.0 * 1024.0) + +set terminal png size 800,600 +set title "Memory Usage" +set datafile separator "," +set output outputfile . "" + +# X-axis settings +set xlabel "Timestamp" +set xdata time +set timefmt "%s" +set format x "%M:%S" +set xrange [*:*] # Specify a range covering all timestamps + +# Y-axis settings +set yrange [0:*] +set ylabel "Memory Usage (MiB)" + +# Plotting data +plot inputfile using 1:(bytes_to_MiB($2)) with lines lw 2 notitle diff --git a/tests/scripts/ttr-plot.gp b/tests/scripts/ttr-plot.gp new file mode 100644 index 0000000000..97d0eb2891 --- /dev/null +++ b/tests/scripts/ttr-plot.gp @@ -0,0 +1,18 @@ +set terminal png size 800,600 +set title "Scaling resources" +set datafile separator "," +set output outputfile . "" + +# X-axis settings +set xrange [0:70] +set xtics 10 +set xlabel "# Resources" +set grid xtics + +# Y-axis settings +set yrange [0:*] +set ylabel "Time to Ready (s)" +set format y "%.1f" + +# Plotting data +plot inputfile using 1:2 with lines lw 2 notitle diff --git a/tests/scripts/vars.env-example b/tests/scripts/vars.env-example index c4163b0120..52138f113f 100644 --- a/tests/scripts/vars.env-example +++ b/tests/scripts/vars.env-example @@ -20,3 +20,5 @@ SOURCE_IP_RANGE= PLUS_ENABLED= NGF_VERSION= +GKE_MACHINE_TYPE= +GKE_NUM_NODES= diff --git a/tests/suite/dataplane_perf_test.go b/tests/suite/dataplane_perf_test.go index 743e79977c..ffb07d85f7 100644 --- a/tests/suite/dataplane_perf_test.go +++ b/tests/suite/dataplane_perf_test.go @@ -97,7 +97,7 @@ var _ = Describe("Dataplane performance", Ordered, Label("nfr", "performance"), } _, metrics := framework.RunLoadTest(cfg) - Expect(framework.WriteResults(outFile, &metrics)).To(Succeed()) + Expect(framework.WriteVegetaResults(outFile, &metrics)).To(Succeed()) _, err = fmt.Fprint(outFile, "```\n") Expect(err).ToNot(HaveOccurred()) diff --git a/tests/scale/manifests/scale-matches.yaml b/tests/suite/manifests/scale/matches.yaml similarity index 100% rename from tests/scale/manifests/scale-matches.yaml rename to tests/suite/manifests/scale/matches.yaml diff --git a/tests/scale/manifests/scale-upstreams.yaml b/tests/suite/manifests/scale/upstreams.yaml similarity index 100% rename from tests/scale/manifests/scale-upstreams.yaml rename to tests/suite/manifests/scale/upstreams.yaml diff --git a/tests/suite/scale_test.go b/tests/suite/scale_test.go new file mode 100644 index 0000000000..db87c32593 --- /dev/null +++ b/tests/suite/scale_test.go @@ -0,0 +1,817 @@ +package suite + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "text/template" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctlr "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/nginxinc/nginx-gateway-fabric/tests/framework" +) + +var _ = Describe("Scale test", Ordered, Label("nfr", "scale"), func() { + // One of the tests - scales upstream servers - requires a big cluster to provision 648 pods. + // On GKE, you can use the following configuration: + // - A Kubernetes cluster with 12 nodes on GKE + // - Node: n2d-standard-16 (16 vCPU, 64GB memory) + + var ( + matchesManifests = []string{ + "scale/matches.yaml", + } + upstreamsManifests = []string{ + "scale/upstreams.yaml", + } + + ns = &core.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "scale", + }, + } + + scrapeInterval = 15 * time.Second + queryRangeStep = 15 * time.Second + + resultsDir string + outFile *os.File + ngfPodName string + promInstance framework.PrometheusInstance + promPortForwardStopCh = make(chan struct{}) + ) + + const ( + httpListenerCount = 64 + httpsListenerCount = 64 + httpRouteCount = 1000 + upstreamServerCount = 648 + ) + + BeforeAll(func() { + // Scale tests need a dedicated NGF instance + // Because they analyze the logs of NGF and NGINX, and they don't want to analyze the logs of other tests. + teardown(releaseName) + + Expect(resourceManager.Apply([]client.Object{ns})).To(Succeed()) + + var err error + resultsDir, err = framework.CreateResultsDir("scale", version) + Expect(err).ToNot(HaveOccurred()) + + filename := filepath.Join(resultsDir, fmt.Sprintf("%s.md", version)) + outFile, err = framework.CreateResultsFile(filename) + Expect(err).ToNot(HaveOccurred()) + Expect(framework.WriteSystemInfoToFile(outFile, clusterInfo, *plusEnabled)).To(Succeed()) + + promCfg := framework.PrometheusConfig{ + ScrapeInterval: scrapeInterval, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + promInstance, err = framework.InstallPrometheus(ctx, resourceManager, promCfg) + Expect(err).ToNot(HaveOccurred()) + + k8sConfig := ctlr.GetConfigOrDie() + + if !clusterInfo.IsGKE { + Expect(promInstance.PortForward(k8sConfig, promPortForwardStopCh)).To(Succeed()) + } + }) + + BeforeEach(func() { + // Scale tests need a dedicated NGF instance per test. + // Because they analyze the logs of NGF and NGINX, and they don't want to analyze the logs of other tests. + cfg := getDefaultSetupCfg() + labelFilter := GinkgoLabelFilter() + cfg.nfr = isNFR(labelFilter) + setup(cfg) + + podNames, err := framework.GetReadyNGFPodNames(k8sClient, ngfNamespace, releaseName, timeoutConfig.GetTimeout) + Expect(err).ToNot(HaveOccurred()) + Expect(podNames).To(HaveLen(1)) + ngfPodName = podNames[0] + }) + + createResponseChecker := func(url, address string) func() error { + return func() error { + status, _, err := framework.Get(url, address, timeoutConfig.RequestTimeout) + if err != nil { + return fmt.Errorf("bad response: %w", err) + } + + if status != 200 { + return fmt.Errorf("unexpected status code: %d", status) + } + + return nil + } + } + + createMetricExistChecker := func(query string, getTime func() time.Time, modifyTime func()) func() error { + return func() error { + queryWithTimestamp := fmt.Sprintf("%s @ %d", query, getTime().Unix()) + + result, err := promInstance.Query(queryWithTimestamp) + if err != nil { + return fmt.Errorf("failed to query Prometheus: %w", err) + } + + if result.String() == "" { + modifyTime() + return errors.New("empty result") + } + + return nil + } + } + + createEndTimeFinder := func(query string, startTime time.Time, t *time.Time) func() error { + return func() error { + result, err := promInstance.QueryRange(query, v1.Range{ + Start: startTime, + End: *t, + Step: queryRangeStep, + }) + if err != nil { + return fmt.Errorf("failed to query Prometheus: %w", err) + } + + if result.String() == "" { + *t = time.Now() + return errors.New("empty result") + } + + return nil + } + } + + getFirstValueOfVector := func(query string) float64 { + result, err := promInstance.Query(query) + Expect(err).ToNot(HaveOccurred()) + + val, err := framework.GetFirstValueOfPrometheusVector(result) + Expect(err).ToNot(HaveOccurred()) + + return val + } + + getBuckets := func(query string) []bucket { + result, err := promInstance.Query(query) + Expect(err).ToNot(HaveOccurred()) + + res, ok := result.(model.Vector) + Expect(ok).To(BeTrue()) + + buckets := make([]bucket, 0, len(res)) + + for _, sample := range res { + le := sample.Metric["le"] + val := float64(sample.Value) + bucket := bucket{ + Le: string(le), + Val: int(val), + } + buckets = append(buckets, bucket) + } + + return buckets + } + + checkLogErrors := func(containerName string, substrings []string, fileName string) int { + logs, err := resourceManager.GetPodLogs(ngfNamespace, ngfPodName, &core.PodLogOptions{ + Container: containerName, + }) + Expect(err).ToNot(HaveOccurred()) + + logLines := strings.Split(logs, "\n") + errors := 0 + + outer: + for _, line := range logLines { + for _, substr := range substrings { + if strings.Contains(line, substr) { + errors++ + continue outer + } + } + } + + // attach full logs + if errors > 0 { + f, err := os.Create(fileName) + Expect(err).ToNot(HaveOccurred()) + defer f.Close() + + _, err = io.WriteString(f, logs) + Expect(err).ToNot(HaveOccurred()) + } + return errors + } + + runTestWithMetricsAndLogs := func(testName, testResultsDir string, test func()) { + var ( + metricExistTimeout = 2 * time.Minute + metricExistPolling = 1 * time.Second + ) + + startTime := time.Now() + + // We need to make sure that for the startTime, the metrics exists in Prometheus. + // if they don't exist, we increase the startTime and try again. + // Note: it's important that Polling interval in Eventually is greater than the startTime increment. + + getStartTime := func() time.Time { return startTime } + modifyStartTime := func() { startTime = startTime.Add(500 * time.Millisecond) } + + queries := []string{ + fmt.Sprintf(`container_memory_usage_bytes{pod="%s",container="nginx-gateway"}`, ngfPodName), + fmt.Sprintf(`container_cpu_usage_seconds_total{pod="%s",container="nginx-gateway"}`, ngfPodName), + // We don't need to check all nginx_gateway_fabric_* metrics, as they are collected at the same time + fmt.Sprintf(`nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`, ngfPodName), + } + + for _, q := range queries { + Eventually( + createMetricExistChecker( + q, + getStartTime, + modifyStartTime, + ), + ).WithTimeout(metricExistTimeout).WithPolling(metricExistPolling).Should(Succeed()) + } + + test() + + // We sleep for 2 scape intervals to ensure that Prometheus scrapes the metrics after the test() finishes + // before endTime, so that we don't lose any metric values like reloads. + time.Sleep(2 * scrapeInterval) + + endTime := time.Now() + + // Now we check that Prometheus has the metrics for the endTime + + // If the test duration is small (which can happen if you run the test with small number of resources), + // the rate query may not return any data. + // To ensure it returns data, we increase the startTime. + Eventually( + createEndTimeFinder( + fmt.Sprintf(`rate(container_cpu_usage_seconds_total{pod="%s",container="nginx-gateway"}[2m])`, ngfPodName), + startTime, + &endTime, + ), + ).WithTimeout(metricExistTimeout).WithPolling(metricExistPolling).Should(Succeed()) + + getEndTime := func() time.Time { return endTime } + noOpModifier := func() {} + + queries = []string{ + fmt.Sprintf(`container_memory_usage_bytes{pod="%s",container="nginx-gateway"}`, ngfPodName), + // We don't need to check all nginx_gateway_fabric_* metrics, as they are collected at the same time + fmt.Sprintf(`nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`, ngfPodName), + } + + for _, q := range queries { + Eventually( + createMetricExistChecker( + q, + getEndTime, + noOpModifier, + ), + ).WithTimeout(metricExistTimeout).WithPolling(metricExistPolling).Should(Succeed()) + } + + // Collect metric values + // For some metrics, generate PNGs + + result, err := promInstance.QueryRange( + fmt.Sprintf(`container_memory_usage_bytes{pod="%s",container="nginx-gateway"}`, ngfPodName), + v1.Range{ + Start: startTime, + End: endTime, + Step: queryRangeStep, + }, + ) + Expect(err).ToNot(HaveOccurred()) + + memCSV := filepath.Join(testResultsDir, "memory.csv") + + Expect(framework.WritePrometheusMatrixToCSVFile(memCSV, result)).To(Succeed()) + + Expect( + framework.GenerateMemoryPNG(testResultsDir, memCSV, "memory.png"), + ).To(Succeed()) + + Expect(os.Remove(memCSV)).To(Succeed()) + + result, err = promInstance.QueryRange( + fmt.Sprintf(`rate(container_cpu_usage_seconds_total{pod="%s",container="nginx-gateway"}[2m])`, ngfPodName), + v1.Range{ + Start: startTime, + End: endTime, + Step: queryRangeStep, + }, + ) + Expect(err).ToNot(HaveOccurred()) + + cpuCSV := filepath.Join(testResultsDir, "cpu.csv") + + Expect(framework.WritePrometheusMatrixToCSVFile(cpuCSV, result)).To(Succeed()) + + Expect( + framework.GenerateCPUPNG(testResultsDir, cpuCSV, "cpu.png"), + ).To(Succeed()) + + Expect(os.Remove(cpuCSV)).To(Succeed()) + + reloadCount := getFirstValueOfVector( + fmt.Sprintf( + `nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_total{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + reloadErrsCount := getFirstValueOfVector( + fmt.Sprintf( + `nginx_gateway_fabric_nginx_reload_errors_total{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reload_errors_total{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + reloadAvgTime := getFirstValueOfVector( + fmt.Sprintf( + `(nginx_gateway_fabric_nginx_reloads_milliseconds_sum{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_milliseconds_sum{pod="%s"} @ %d)`+ + ` / `+ + `(nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_total{pod="%s"} @ %d)`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ngfPodName, + ngfPodName, + startTime.Unix(), + )) + + reloadBuckets := getBuckets( + fmt.Sprintf( + `nginx_gateway_fabric_nginx_reloads_milliseconds_bucket{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_milliseconds_bucket{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + eventsCount := getFirstValueOfVector( + fmt.Sprintf( + `nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + eventsAvgTime := getFirstValueOfVector( + fmt.Sprintf( + `(nginx_gateway_fabric_event_batch_processing_milliseconds_sum{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_sum{pod="%s"} @ %d)`+ + ` / `+ + `(nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"} @ %d)`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + eventsBuckets := getBuckets( + fmt.Sprintf( + `nginx_gateway_fabric_event_batch_processing_milliseconds_bucket{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_bucket{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + // Check container logs for errors + + ngfErrors := checkLogErrors( + "nginx-gateway", + []string{"error"}, + filepath.Join(testResultsDir, "ngf.log"), + ) + nginxErrors := checkLogErrors( + "nginx", + []string{"[error]", "[emerg]", "[crit]", "[alert]"}, + filepath.Join(testResultsDir, "nginx.log"), + ) + + // Check container restarts + + pod, err := resourceManager.GetPod(ngfNamespace, ngfPodName) + Expect(err).ToNot(HaveOccurred()) + + findRestarts := func(name string) int { + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name == name { + return int(containerStatus.RestartCount) + } + } + Fail(fmt.Sprintf("container %s not found", name)) + return 0 + } + + ngfRestarts := findRestarts("nginx-gateway") + nginxRestarts := findRestarts("nginx") + + // Write results + + results := scaleTestResults{ + Name: testName, + ReloadCount: int(reloadCount), + ReloadErrsCount: int(reloadErrsCount), + ReloadAvgTime: int(reloadAvgTime), + ReloadBuckets: reloadBuckets, + EventsCount: int(eventsCount), + EventsAvgTime: int(eventsAvgTime), + EventsBuckets: eventsBuckets, + NGFErrors: ngfErrors, + NginxErrors: nginxErrors, + NGFContainerRestarts: ngfRestarts, + NginxContainerRestarts: nginxRestarts, + } + + err = writeScaleResults(outFile, results) + Expect(err).ToNot(HaveOccurred()) + } + + runScaleResources := func(objects framework.ScaleObjects, testResultsDir string, protocol string) { + ttrCsvFile, writer, err := framework.NewCSVResultsWriter(testResultsDir, "ttr.csv") + Expect(err).ToNot(HaveOccurred()) + + Expect(resourceManager.Apply(objects.BaseObjects)).To(Succeed()) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + Expect(resourceManager.WaitForPodsToBeReady(ctx, ns.Name)).To(Succeed()) + + for i := 0; i < len(objects.ScaleIterationGroups); i++ { + Expect(resourceManager.Apply(objects.ScaleIterationGroups[i])).To(Succeed()) + + url := fmt.Sprintf("%s://%d.example.com", protocol, i) + + if protocol == "http" && portFwdPort != 0 { + url = fmt.Sprintf("%s://%d.example.com:%d", protocol, i, portFwdPort) + } else if protocol == "https" && portFwdHTTPSPort != 0 { + url = fmt.Sprintf("%s://%d.example.com:%d", protocol, i, portFwdHTTPSPort) + } + + startCheck := time.Now() + + Eventually( + createResponseChecker(url, address), + ).WithTimeout(30 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) + + ttr := time.Since(startCheck) + + seconds := ttr.Seconds() + record := []string{strconv.Itoa(i + 1), strconv.FormatFloat(seconds, 'f', -1, 64)} + + Expect(writer.Write(record)).To(Succeed()) + } + + writer.Flush() + Expect(ttrCsvFile.Close()).To(Succeed()) + + Expect( + framework.GenerateTTRPNG(testResultsDir, ttrCsvFile.Name(), "ttr.png"), + ).To(Succeed()) + + Expect(os.Remove(ttrCsvFile.Name())).To(Succeed()) + + Expect(resourceManager.Delete(objects.BaseObjects)).To(Succeed()) + for i := 0; i < len(objects.ScaleIterationGroups); i++ { + Expect(resourceManager.Delete(objects.ScaleIterationGroups[i])).To(Succeed()) + } + } + + runScaleUpstreams := func() { + Expect(resourceManager.ApplyFromFiles(upstreamsManifests, ns.Name)).To(Succeed()) + Expect(resourceManager.WaitForAppsToBeReady(ns.Name)).To(Succeed()) + + url := "http://hello.example.com" + if portFwdPort != 0 { + url = fmt.Sprintf("http://hello.example.com:%d", portFwdPort) + } + + Eventually( + createResponseChecker(url, address), + ).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) + + Expect( + resourceManager.ScaleDeployment(ns.Name, "backend", upstreamServerCount), + ).To(Succeed()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + Expect(resourceManager.WaitForPodsToBeReady(ctx, ns.Name)).To(Succeed()) + + Eventually( + createResponseChecker(url, address), + ).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) + + Expect( + resourceManager.DeleteFromFiles(upstreamsManifests, ns.Name), + ).To(Succeed()) + } + + setNamespace := func(objects framework.ScaleObjects) { + for _, obj := range objects.BaseObjects { + obj.SetNamespace(ns.Name) + } + for _, objs := range objects.ScaleIterationGroups { + for _, obj := range objs { + obj.SetNamespace(ns.Name) + } + } + } + + It(fmt.Sprintf("scales HTTP listeners to %d", httpListenerCount), func() { + const testName = "TestScale_Listeners" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0755)).To(Succeed()) + + objects, err := framework.GenerateScaleListenerObjects(httpListenerCount, false /*non-tls*/) + Expect(err).ToNot(HaveOccurred()) + + setNamespace(objects) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleResources( + objects, + testResultsDir, + "http", + ) + }, + ) + + Expect( + os.RemoveAll(filepath.Join(testResultsDir, "manifests")), + ).To(Succeed()) + }) + + It(fmt.Sprintf("scales HTTPS listeners to %d", httpsListenerCount), func() { + const testName = "TestScale_HTTPSListeners" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0o755)).To(Succeed()) + + objects, err := framework.GenerateScaleListenerObjects(httpsListenerCount, true /*tls*/) + Expect(err).ToNot(HaveOccurred()) + + setNamespace(objects) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleResources( + objects, + testResultsDir, + "https", + ) + }, + ) + + Expect( + os.RemoveAll(filepath.Join(testResultsDir, "manifests")), + ).To(Succeed()) + }) + + It(fmt.Sprintf("scales HTTP routes to %d", httpRouteCount), func() { + const testName = "TestScale_HTTPRoutes" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0o755)).To(Succeed()) + + objects, err := framework.GenerateScaleHTTPRouteObjects(httpRouteCount) + Expect(err).ToNot(HaveOccurred()) + + setNamespace(objects) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleResources( + objects, + testResultsDir, + "http", + ) + }, + ) + + Expect( + os.RemoveAll(filepath.Join(testResultsDir, "manifests")), + ).To(Succeed()) + }) + + It(fmt.Sprintf("scales upstream servers to %d", upstreamServerCount), func() { + const testName = "TestScale_UpstreamServers" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0o755)).To(Succeed()) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleUpstreams() + }, + ) + }) + + It("scale HTTP matches", func() { + const testName = "TestScale_HTTPMatches" + + Expect(resourceManager.ApplyFromFiles(matchesManifests, ns.Name)).To(Succeed()) + Expect(resourceManager.WaitForAppsToBeReady(ns.Name)).To(Succeed()) + + var port int + if portFwdPort != 0 { + port = portFwdPort + } else { + port = 80 + } + + addr := fmt.Sprintf("%s:%d", address, port) + + baseURL := "http://cafe.example.com" + + text := fmt.Sprintf("\n## Test %s\n\n", testName) + + _, err := fmt.Fprint(outFile, text) + Expect(err).ToNot(HaveOccurred()) + + run := func(t framework.Target) { + cfg := framework.LoadTestConfig{ + Targets: []framework.Target{t}, + Rate: 1000, + Duration: 30 * time.Second, + Description: "First matches", + Proxy: addr, + ServerName: "cafe.example.com", + } + _, metrics := framework.RunLoadTest(cfg) + + _, err = fmt.Fprintln(outFile, "```text") + Expect(err).ToNot(HaveOccurred()) + Expect(framework.WriteVegetaResults(outFile, &metrics)).To(Succeed()) + _, err = fmt.Fprintln(outFile, "```") + Expect(err).ToNot(HaveOccurred()) + } + + run(framework.Target{ + Method: "GET", + URL: fmt.Sprintf("%s%s", baseURL, "/latte"), + Header: map[string][]string{ + "header-1": {"header-1-val"}, + }, + }) + + run(framework.Target{ + Method: "GET", + URL: fmt.Sprintf("%s%s", baseURL, "/latte"), + Header: map[string][]string{ + "header-50": {"header-50-val"}, + }, + }) + + Expect(resourceManager.DeleteFromFiles(matchesManifests, ns.Name)).To(Succeed()) + }) + + AfterEach(func() { + teardown(releaseName) + }) + + AfterAll(func() { + close(promPortForwardStopCh) + Expect(framework.UninstallPrometheus()).To(Succeed()) + Expect(resourceManager.Delete([]client.Object{ns})).To(Succeed()) + Expect(outFile.Close()).To(Succeed()) + + // restoring NGF shared among tests in the suite + cfg := getDefaultSetupCfg() + labelFilter := GinkgoLabelFilter() + cfg.nfr = isNFR(labelFilter) + setup(cfg) + }) +}) + +type bucket struct { + Le string + Val int +} + +type scaleTestResults struct { + Name string + + ReloadCount int + ReloadErrsCount int + ReloadAvgTime int + ReloadBuckets []bucket + + EventsCount int + EventsAvgTime int + EventsBuckets []bucket + + NGFErrors int + NginxErrors int + + NGFContainerRestarts int + NginxContainerRestarts int +} + +const scaleResultTemplate = ` +## Test {{ .Name }} + +### Reloads + +- Total: {{ .ReloadCount }} +- Total Errors: {{ .ReloadErrsCount }} +- Average Time: {{ .ReloadAvgTime }}ms +- Reload distribution: +{{- range .ReloadBuckets }} + - {{ .Le }}ms: {{ .Val }} +{{- end }} + +### Event Batch Processing + +- Total: {{ .EventsCount }} +- Average Time: {{ .EventsAvgTime }}ms +- Event Batch Processing distribution: +{{- range .EventsBuckets }} + - {{ .Le }}ms: {{ .Val }} +{{- end }} + +### Errors + +- NGF errors: {{ .NGFErrors }} +- NGF container restarts: {{ .NGFContainerRestarts }} +- NGINX errors: {{ .NginxErrors }} +- NGINX container restarts: {{ .NginxContainerRestarts }} + +### Graphs and Logs + +See [output directory](./{{ .Name }}) for more details. +The logs are attached only if there are errors. +` + +func writeScaleResults(dest io.Writer, results scaleTestResults) error { + tmpl, err := template.New("results").Parse(scaleResultTemplate) + if err != nil { + return err + } + + return tmpl.Execute(dest, results) +} diff --git a/tests/suite/system_suite_test.go b/tests/suite/system_suite_test.go index d7786aaa3d..2d29ceb431 100644 --- a/tests/suite/system_suite_test.go +++ b/tests/suite/system_suite_test.go @@ -4,6 +4,7 @@ import ( "context" "embed" "flag" + "fmt" "path" "path/filepath" "runtime" @@ -63,8 +64,9 @@ var ( manifests embed.FS k8sClient client.Client resourceManager framework.ResourceManager - portForwardStopCh = make(chan struct{}, 1) + portForwardStopCh chan struct{} portFwdPort int + portFwdHTTPSPort int timeoutConfig framework.TimeoutConfig localChartPath string address string @@ -74,8 +76,10 @@ var ( ) const ( - releaseName = "ngf-test" - ngfNamespace = "nginx-gateway" + releaseName = "ngf-test" + ngfNamespace = "nginx-gateway" + ngfHTTPForwardedPort = 10080 + ngfHTTPSForwardedPort = 10443 ) type setupConfig struct { @@ -177,8 +181,12 @@ func setup(cfg setupConfig, extraInstallArgs ...string) { Expect(podNames).ToNot(BeEmpty()) if *serviceType != "LoadBalancer" { - portFwdPort, err = framework.PortForward(k8sConfig, installCfg.Namespace, podNames[0], portForwardStopCh) + ports := []string{fmt.Sprintf("%d:80", ngfHTTPForwardedPort), fmt.Sprintf("%d:443", ngfHTTPSForwardedPort)} + portForwardStopCh = make(chan struct{}) + err = framework.PortForward(k8sConfig, installCfg.Namespace, podNames[0], ports, portForwardStopCh) address = "127.0.0.1" + portFwdPort = ngfHTTPForwardedPort + portFwdHTTPSPort = ngfHTTPSForwardedPort } else { address, err = resourceManager.GetLBIPAddress(installCfg.Namespace) } @@ -187,7 +195,9 @@ func setup(cfg setupConfig, extraInstallArgs ...string) { func teardown(relName string) { if portFwdPort != 0 { - portForwardStopCh <- struct{}{} + close(portForwardStopCh) + portFwdPort = 0 + portFwdHTTPSPort = 0 } cfg := framework.InstallationConfig{ @@ -240,13 +250,17 @@ var _ = BeforeSuite(func() { cfg.nfr = isNFR(labelFilter) // Skip deployment if: - // - running upgrade test (this test will deploy its own version) - // - running longevity teardown (deployment will already exist) - // - running telemetry test (NGF will be deployed as part of the test) - if strings.Contains(labelFilter, "upgrade") || - strings.Contains(labelFilter, "longevity-teardown") || - strings.Contains(labelFilter, "telemetry") { - cfg.deploy = false + skipSubstrings := []string{ + "upgrade", // - running upgrade test (this test will deploy its own version) + "longevity-teardown", // - running longevity teardown (deployment will already exist) + "telemetry", // - running telemetry test (NGF will be deployed as part of the test) + "scale", // - running scale test (this test will deploy its own version) + } + for _, s := range skipSubstrings { + if strings.Contains(labelFilter, s) { + cfg.deploy = false + break + } } // use a different release name for longevity to allow us to filter on a specific label when collecting diff --git a/tests/suite/upgrade_test.go b/tests/suite/upgrade_test.go index 09b3ecb8cf..a6fe32b2bf 100644 --- a/tests/suite/upgrade_test.go +++ b/tests/suite/upgrade_test.go @@ -157,7 +157,7 @@ var _ = Describe("Upgrade testing", Label("nfr", "upgrade"), func() { } buf := new(bytes.Buffer) - encoder := framework.NewCSVEncoder(buf) + encoder := framework.NewVegetaCSVEncoder(buf) for _, res := range results { res := res Expect(encoder.Encode(&res)).To(Succeed()) @@ -173,8 +173,9 @@ var _ = Describe("Upgrade testing", Label("nfr", "upgrade"), func() { csvFile.Close() pngName := framework.CreateResultsFilename("png", scheme, *plusEnabled) - output, err := framework.GeneratePNG(resultsDir, csvName, pngName) - Expect(err).ToNot(HaveOccurred(), string(output)) + Expect( + framework.GenerateRequestsPNG(resultsDir, csvName, pngName), + ).To(Succeed()) metricsCh <- &metricsRes }(test) @@ -251,7 +252,7 @@ var _ = Describe("Upgrade testing", Label("nfr", "upgrade"), func() { _, err := fmt.Fprint(resultsFile, res.testName) Expect(err).ToNot(HaveOccurred()) - Expect(framework.WriteResults(resultsFile, res.metrics)).To(Succeed()) + Expect(framework.WriteVegetaResults(resultsFile, res.metrics)).To(Succeed()) link := fmt.Sprintf("\n\n![%[1]v.png](%[1]v.png)\n", res.scheme) if *plusEnabled {