diff --git a/tests/scale/generate_manifests.go b/tests/framework/generate_manifests.go similarity index 66% rename from tests/scale/generate_manifests.go rename to tests/framework/generate_manifests.go index 88f752141d..46c7b92b32 100644 --- a/tests/scale/generate_manifests.go +++ b/tests/framework/generate_manifests.go @@ -1,17 +1,18 @@ -//go:build scale -// +build scale - -package scale +package framework import ( "bytes" + "errors" "fmt" - "os" - "path/filepath" + "io" "text/template" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/yaml" + "sigs.k8s.io/controller-runtime/pkg/client" ) -var gwTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 +const gwTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 kind: Gateway metadata: name: gateway @@ -33,7 +34,7 @@ spec: {{- end -}} {{- end -}}` -var hrTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 +const hrTmplTxt = `apiVersion: gateway.networking.k8s.io/v1 kind: HTTPRoute metadata: name: {{ .Name }} @@ -53,7 +54,7 @@ spec: port: 80` // nolint:all -var secretTmplTxt = `apiVersion: v1 +const secretTmplTxt = `apiVersion: v1 kind: Secret metadata: name: {{ . }} @@ -63,8 +64,7 @@ data: tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2UUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktjd2dnU2pBZ0VBQW9JQkFRQzZtTnJSdUZ2WXZoSE4KbXI3c1FvNUtKSUVDN3N6TFVrNExFeklSNS9yMEVaUjQ2RnRTaGJQd0ZuaXAwMFBxekhpVkhKYy92TjdkQTVLeApQS1VmdFJuQ1J6YldVaTZBZzJpRU93bXF6WUhGbVNpZkFlVjk0RlAxOGtSbjl1ckV3OEpiRXJIUncrVW51L25tCmFMRHF1eGpFTVBweGhuRklCSnYwK1R3djNEVGx6TjNwUlV6dnpidGZvZCtEVTZBSmR6N3Rid1dTNmR6MHc1Z2kKbW9RelZnbFpnVDBJek9FZkV3NVpWMnRMZllHZWRlRVJ1VjhtR041c09va3R2aGxsMU1udHRaMkZNVHgySmVjUQo3K0xBRm9YVnBTS2NjbUFVZ1JBM0xOOHdVZXBVTHZZdFhiUm1QTFc4SjFINmhFeHJHTHBiTERZNmpzbGxBNlZpCk0xMjVjU0hsQWdNQkFBRUNnZ0VBQnpaRE50bmVTdWxGdk9HZlFYaHRFWGFKdWZoSzJBenRVVVpEcUNlRUxvekQKWlV6dHdxbkNRNlJLczUyandWNTN4cU9kUU94bTNMbjNvSHdNa2NZcEliWW82MjJ2dUczYnkwaVEzaFlsVHVMVgpqQmZCcS9UUXFlL2NMdngvSkczQWhFNmJxdFRjZFlXeGFmTmY2eUtpR1dzZk11WVVXTWs4MGVJVUxuRmZaZ1pOCklYNTlSOHlqdE9CVm9Sa3hjYTVoMW1ZTDFsSlJNM3ZqVHNHTHFybmpOTjNBdWZ3ZGRpK1VDbGZVL2l0K1EvZkUKV216aFFoTlRpNVFkRWJLVStOTnYvNnYvb2JvandNb25HVVBCdEFTUE05cmxFemIralQ1WHdWQjgvLzRGY3VoSwoyVzNpcjhtNHVlQ1JHSVlrbGxlLzhuQmZ0eVhiVkNocVRyZFBlaGlPM1FLQmdRRGlrR3JTOTc3cjg3Y1JPOCtQClpoeXltNXo4NVIzTHVVbFNTazJiOTI1QlhvakpZL2RRZDVTdFVsSWE4OUZKZnNWc1JRcEhHaTFCYzBMaTY1YjIKazR0cE5xcVFoUmZ1UVh0UG9GYXRuQzlPRnJVTXJXbDVJN0ZFejZnNkNQMVBXMEg5d2hPemFKZUdpZVpNYjlYTQoybDdSSFZOcC9jTDlYbmhNMnN0Q1lua2Iwd0tCZ1FEUzF4K0crakEyUVNtRVFWNXA1RnRONGcyamsyZEFjMEhNClRIQ2tTazFDRjhkR0Z2UWtsWm5ZbUt0dXFYeXNtekJGcnZKdmt2eUhqbUNYYTducXlpajBEdDZtODViN3BGcVAKQWxtajdtbXI3Z1pUeG1ZMXBhRWFLMXY4SDNINGtRNVl3MWdrTWRybVJHcVAvaTBGaDVpaGtSZS9DOUtGTFVkSQpDcnJjTzhkUVp3S0JnSHA1MzRXVWNCMVZibzFlYStIMUxXWlFRUmxsTWlwRFM2TzBqeWZWSmtFb1BZSEJESnp2ClIrdzZLREJ4eFoyWmJsZ05LblV0YlhHSVFZd3lGelhNcFB5SGxNVHpiZkJhYmJLcDFyR2JVT2RCMXpXM09PRkgKcmppb21TUm1YNmxhaDk0SjRHU0lFZ0drNGw1SHhxZ3JGRDZ2UDd4NGRjUktJWFpLZ0w2dVJSSUpBb0dCQU1CVApaL2p5WStRNTBLdEtEZHUrYU9ORW4zaGxUN3hrNXRKN3NBek5rbWdGMU10RXlQUk9Xd1pQVGFJbWpRbk9qbHdpCldCZ2JGcXg0M2ZlQ1Z4ZXJ6V3ZEM0txaWJVbWpCTkNMTGtYeGh3ZEVteFQwVit2NzZGYzgwaTNNYVdSNnZZR08KditwVVovL0F6UXdJcWZ6dlVmV2ZxdStrMHlhVXhQOGNlcFBIRyt0bEFvR0FmQUtVVWhqeFU0Ym5vVzVwVUhKegpwWWZXZXZ5TW54NWZyT2VsSmRmNzlvNGMvMHhVSjh1eFBFWDFkRmNrZW96dHNpaVFTNkN6MENRY09XVWxtSkRwCnVrdERvVzM3VmNSQU1BVjY3NlgxQVZlM0UwNm5aL2g2Tkd4Z28rT042Q3pwL0lkMkJPUm9IMFAxa2RjY1NLT3kKMUtFZlNnb1B0c1N1eEpBZXdUZmxDMXc9Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K ` -var appTmplTxt = `apiVersion: v1 -apiVersion: apps/v1 +const appTmplTxt = `apiVersion: apps/v1 kind: Deployment metadata: name: {{ . }} @@ -105,25 +105,55 @@ var ( appTmpl = template.Must(template.New("app").Parse(appTmplTxt)) ) -type Listener struct { +type listener struct { Name string HostnamePrefix string SecretName string } -type Route struct { +type route struct { Name string ListenerName string HostnamePrefix string BackendName string } -func getPrereqDirName(manifestDir string) string { - return filepath.Join(manifestDir, "prereqs") +// ScaleObjects contains objects for scale testing. +type ScaleObjects struct { + // BaseObjects contains objects that are common to all scale iterations. + BaseObjects []client.Object + // ScaleIterationGroups contains objects for each scale iteration. + ScaleIterationGroups [][]client.Object } -func generateScaleListenerManifests(numListeners int, manifestDir string, tls bool) error { - listeners := make([]Listener, 0) +func decodeObjects(reader io.Reader) ([]client.Object, error) { + var objects []client.Object + + decoder := yaml.NewYAMLOrJSONDecoder(reader, 4096) + for { + obj := unstructured.Unstructured{} + if err := decoder.Decode(&obj); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, fmt.Errorf("error decoding resource: %w", err) + } + + if len(obj.Object) == 0 { + continue + } + + objects = append(objects, &obj) + } + + return objects, nil +} + +// GenerateScaleListenerObjects generates objects for a given number of listeners for the scale test. +func GenerateScaleListenerObjects(numListeners int, tls bool) (ScaleObjects, error) { + var result ScaleObjects + + listeners := make([]listener, 0) backends := make([]string, 0) secrets := make([]string, 0) @@ -138,13 +168,13 @@ func generateScaleListenerManifests(numListeners int, manifestDir string, tls bo secrets = append(secrets, secretName) } - listeners = append(listeners, Listener{ + listeners = append(listeners, listener{ Name: listenerName, HostnamePrefix: hostnamePrefix, SecretName: secretName, }) - route := Route{ + r := route{ Name: fmt.Sprintf("route-%d", i), ListenerName: listenerName, HostnamePrefix: hostnamePrefix, @@ -153,44 +183,57 @@ func generateScaleListenerManifests(numListeners int, manifestDir string, tls bo backends = append(backends, backendName) - if err := generateManifests(manifestDir, i, listeners, []Route{route}); err != nil { - return err + objects, err := generateManifests(listeners, []route{r}) + if err != nil { + return ScaleObjects{}, err } + + result.ScaleIterationGroups = append(result.ScaleIterationGroups, objects) } - if err := generateSecrets(getPrereqDirName(manifestDir), secrets); err != nil { - return err + secretObjects, err := generateSecrets(secrets) + if err != nil { + return ScaleObjects{}, err } - return generateBackendAppManifests(getPrereqDirName(manifestDir), backends) -} + result.BaseObjects = append(result.BaseObjects, secretObjects...) -func generateSecrets(secretsDir string, secrets []string) error { - err := os.Mkdir(secretsDir, 0o750) - if err != nil && !os.IsExist(err) { - return err + backendObjects, err := generateBackendAppObjects(backends) + if err != nil { + return ScaleObjects{}, err } + result.BaseObjects = append(result.BaseObjects, backendObjects...) + + return result, nil +} + +func generateSecrets(secrets []string) ([]client.Object, error) { + objects := make([]client.Object, 0, len(secrets)) + for _, secret := range secrets { var buf bytes.Buffer - if err = secretTmpl.Execute(&buf, secret); err != nil { - return err + if err := secretTmpl.Execute(&buf, secret); err != nil { + return nil, err } - path := filepath.Join(secretsDir, fmt.Sprintf("%s.yaml", secret)) - - fmt.Println("Writing", path) - if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil { - return err + objs, err := decodeObjects(&buf) + if err != nil { + return nil, err } + + objects = append(objects, objs...) } - return nil + return objects, nil } -func generateScaleHTTPRouteManifests(numRoutes int, manifestDir string) error { - l := Listener{ +// GenerateScaleHTTPRouteObjects generates objects for a given number of routes for the scale test. +func GenerateScaleHTTPRouteObjects(numRoutes int) (ScaleObjects, error) { + var result ScaleObjects + + l := listener{ Name: "listener", HostnamePrefix: "*", } @@ -198,35 +241,43 @@ func generateScaleHTTPRouteManifests(numRoutes int, manifestDir string) error { backendName := "backend" for i := 0; i < numRoutes; i++ { - - route := Route{ + r := route{ Name: fmt.Sprintf("route-%d", i), HostnamePrefix: fmt.Sprintf("%d", i), ListenerName: "listener", BackendName: backendName, } - var listeners []Listener + var listeners []listener if i == 0 { // only generate a Gateway on the first iteration - listeners = []Listener{l} + listeners = []listener{l} } - if err := generateManifests(manifestDir, i, listeners, []Route{route}); err != nil { - return err + objects, err := generateManifests(listeners, []route{r}) + if err != nil { + return ScaleObjects{}, err } + result.ScaleIterationGroups = append(result.ScaleIterationGroups, objects) + } + + backendObjects, err := generateBackendAppObjects([]string{backendName}) + if err != nil { + return ScaleObjects{}, err } - return generateBackendAppManifests(getPrereqDirName(manifestDir), []string{backendName}) + result.BaseObjects = backendObjects + + return result, nil } -func generateManifests(outDir string, version int, listeners []Listener, routes []Route) error { +func generateManifests(listeners []listener, routes []route) ([]client.Object, error) { var buf bytes.Buffer if len(listeners) > 0 { if err := gwTmpl.Execute(&buf, listeners); err != nil { - return err + return nil, err } } @@ -236,42 +287,30 @@ func generateManifests(outDir string, version int, listeners []Listener, routes } if err := hrTmpl.Execute(&buf, r); err != nil { - return err + return nil, err } } - err := os.Mkdir(outDir, 0o750) - if err != nil && !os.IsExist(err) { - return err - } - - filename := fmt.Sprintf("manifest-%d.yaml", version) - path := filepath.Join(outDir, filename) - - fmt.Println("Writing", path) - return os.WriteFile(path, buf.Bytes(), 0o600) + return decodeObjects(&buf) } -func generateBackendAppManifests(outDir string, backends []string) error { - err := os.Mkdir(outDir, 0o750) - if err != nil && !os.IsExist(err) { - return err - } +func generateBackendAppObjects(backends []string) ([]client.Object, error) { + objects := make([]client.Object, 0, 2*len(backends)) for _, backend := range backends { var buf bytes.Buffer - if err = appTmpl.Execute(&buf, backend); err != nil { - return err + if err := appTmpl.Execute(&buf, backend); err != nil { + return nil, err } - path := filepath.Join(outDir, fmt.Sprintf("%s.yaml", backend)) - - fmt.Println("Writing", path) - if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil { - return err + objs, err := decodeObjects(&buf) + if err != nil { + return nil, err } + + objects = append(objects, objs...) } - return nil + return objects, nil } diff --git a/tests/framework/portforward.go b/tests/framework/portforward.go index 1efc16be3c..e8234fea7c 100644 --- a/tests/framework/portforward.go +++ b/tests/framework/portforward.go @@ -6,22 +6,25 @@ import ( "net/http" "net/url" "path" + "time" + + "log/slog" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" ) -// PortForward starts a port-forward to the specified Pod and returns the local port being forwarded. -func PortForward(config *rest.Config, namespace, podName string, stopCh chan struct{}) (int, error) { +// PortForward starts a port-forward to the specified Pod. +func PortForward(config *rest.Config, namespace, podName string, ports []string, stopCh <-chan struct{}) error { roundTripper, upgrader, err := spdy.RoundTripperFor(config) if err != nil { - return 0, fmt.Errorf("error creating roundtripper: %w", err) + return fmt.Errorf("error creating roundtripper: %w", err) } serverURL, err := url.Parse(config.Host) if err != nil { - return 0, fmt.Errorf("error parsing rest config host: %w", err) + return fmt.Errorf("error parsing rest config host: %w", err) } serverURL.Path = path.Join( @@ -33,25 +36,34 @@ func PortForward(config *rest.Config, namespace, podName string, stopCh chan str dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, serverURL) - readyCh := make(chan struct{}, 1) out, errOut := new(bytes.Buffer), new(bytes.Buffer) - forwarder, err := portforward.New(dialer, []string{":80"}, stopCh, readyCh, out, errOut) - if err != nil { - return 0, fmt.Errorf("error creating port forwarder: %w", err) + forward := func() error { + readyCh := make(chan struct{}, 1) + + forwarder, err := portforward.New(dialer, ports, stopCh, readyCh, out, errOut) + if err != nil { + return fmt.Errorf("error creating port forwarder: %w", err) + } + + return forwarder.ForwardPorts() } go func() { - if err := forwarder.ForwardPorts(); err != nil { - panic(err) + for { + if err := forward(); err != nil { + slog.Error("error forwarding ports", "error", err) + slog.Info("retrying port forward in 100ms...") + } + + select { + case <-stopCh: + return + case <-time.After(100 * time.Millisecond): + // retrying + } } }() - <-readyCh - ports, err := forwarder.GetPorts() - if err != nil { - return 0, fmt.Errorf("error getting ports being forwarded: %w", err) - } - - return int(ports[0].Local), nil + return nil } diff --git a/tests/framework/prometheus.go b/tests/framework/prometheus.go new file mode 100644 index 0000000000..eefca29fe5 --- /dev/null +++ b/tests/framework/prometheus.go @@ -0,0 +1,292 @@ +package framework + +import ( + "context" + "encoding/csv" + "errors" + "fmt" + "log/slog" + "os" + "os/exec" + "time" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + prometheusNamespace = "prom" + prometheusReleaseName = "prom" +) + +var defaultPrometheusQueryTimeout = 2 * time.Second + +// PrometheusConfig is the configuration for installing Prometheus +type PrometheusConfig struct { + // ScrapeInterval is the interval at which Prometheus scrapes metrics. + ScrapeInterval time.Duration + // QueryTimeout is the timeout for Prometheus queries. + // Default is 2s. + QueryTimeout time.Duration +} + +// InstallPrometheus installs Prometheus in the cluster. +// It waits for Prometheus pods to be ready before returning. +func InstallPrometheus( + ctx context.Context, + rm ResourceManager, + cfg PrometheusConfig, +) (PrometheusInstance, error) { + output, err := exec.Command( + "helm", + "repo", + "add", + "prometheus-community", + "https://prometheus-community.github.io/helm-charts", + ).CombinedOutput() + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to add Prometheus helm repo: %w; output: %s", err, string(output)) + } + + output, err = exec.Command( + "helm", + "repo", + "update", + ).CombinedOutput() + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to update helm repos: %w; output: %s", err, string(output)) + } + + scrapeInterval := fmt.Sprintf("%ds", int(cfg.ScrapeInterval.Seconds())) + + output, err = exec.Command( + "helm", + "install", + prometheusReleaseName, + "prometheus-community/prometheus", + "--create-namespace", + "--namespace", prometheusNamespace, + "--set", fmt.Sprintf("server.global.scrape_interval=%s", scrapeInterval), + "--wait", + ).CombinedOutput() + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to install Prometheus: %w; output: %s", err, string(output)) + } + + pods, err := rm.GetPods(prometheusNamespace, client.MatchingLabels{ + "app.kubernetes.io/name": "prometheus", + }) + if err != nil { + return PrometheusInstance{}, fmt.Errorf("failed to get Prometheus pods: %w", err) + } + + if len(pods) == 0 { + return PrometheusInstance{}, errors.New("no Prometheus pods found") + } + + if len(pods) > 1 { + return PrometheusInstance{}, errors.New("multiple Prometheus pods found, expected one") + } + + pod := pods[0] + + if pod.Status.PodIP == "" { + return PrometheusInstance{}, errors.New("Prometheus pod has no IP") + } + + var queryTimeout time.Duration + if cfg.QueryTimeout == 0 { + queryTimeout = defaultPrometheusQueryTimeout + } else { + queryTimeout = cfg.QueryTimeout + } + + return PrometheusInstance{ + podIP: pod.Status.PodIP, + podName: pod.Name, + podNamespace: pod.Namespace, + queryTimeout: queryTimeout, + }, nil +} + +// UninstallPrometheus uninstalls Prometheus from the cluster. +func UninstallPrometheus() error { + output, err := exec.Command( + "helm", + "uninstall", + prometheusReleaseName, + "-n", prometheusNamespace, + ).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to uninstall Prometheus: %w; output: %s", err, string(output)) + } + + return nil +} + +const ( + // PrometheusPortForwardPort is the local port that will forward to the Prometheus API. + PrometheusPortForwardPort = 9090 + prometheusAPIPort = 9090 +) + +// PrometheusInstance represents a Prometheus instance in the cluster. +type PrometheusInstance struct { + podIP string + podName string + podNamespace string + portForward bool + queryTimeout time.Duration + + apiClient v1.API +} + +// PortForward starts port forwarding to the Prometheus instance. +func (ins *PrometheusInstance) PortForward(config *rest.Config, stopCh <-chan struct{}) error { + if ins.portForward { + panic("port forwarding already started") + } + + ins.portForward = true + + ports := []string{fmt.Sprintf("%d:%d", PrometheusPortForwardPort, prometheusAPIPort)} + return PortForward(config, ins.podNamespace, ins.podName, ports, stopCh) +} + +func (ins *PrometheusInstance) getAPIClient() (v1.API, error) { + var endpoint string + if ins.portForward { + endpoint = fmt.Sprintf("http://localhost:%d", PrometheusPortForwardPort) + } else { + // on GKE, test runner VM can access the pod directly + endpoint = fmt.Sprintf("http://%s:%d", ins.podIP, prometheusAPIPort) + } + + cfg := api.Config{ + Address: fmt.Sprintf("%s", endpoint), + } + + c, err := api.NewClient(cfg) + if err != nil { + return nil, err + } + + return v1.NewAPI(c), nil +} + +func (ins *PrometheusInstance) ensureAPIClient() error { + if ins.apiClient == nil { + api, err := ins.getAPIClient() + if err != nil { + return fmt.Errorf("failed to get Prometheus API client: %w", err) + } + ins.apiClient = api + } + + return nil +} + +// Query sends a query to Prometheus. +func (ins *PrometheusInstance) Query(query string) (model.Value, error) { + ctx, cancel := context.WithTimeout(context.Background(), ins.queryTimeout) + defer cancel() + + return ins.QueryWithCtx(ctx, query) +} + +// QueryWithCtx sends a query to Prometheus with the specified context. +func (ins *PrometheusInstance) QueryWithCtx(ctx context.Context, query string) (model.Value, error) { + if err := ins.ensureAPIClient(); err != nil { + return nil, err + } + + result, warnings, err := ins.apiClient.Query(ctx, query, time.Time{}) + if err != nil { + return nil, fmt.Errorf("failed to query Prometheus: %w", err) + } + + if len(warnings) > 0 { + slog.Info("Prometheus query returned warnings", + "query", query, + "warnings", warnings, + ) + } + + return result, nil +} + +// QueryRange sends a range query to Prometheus. +func (ins *PrometheusInstance) QueryRange(query string, promRange v1.Range) (model.Value, error) { + ctx, cancel := context.WithTimeout(context.Background(), ins.queryTimeout) + defer cancel() + + return ins.QueryRangeWithCtx(ctx, query, promRange) +} + +// QueryRangeWithCtx sends a range query to Prometheus with the specified context. +func (ins *PrometheusInstance) QueryRangeWithCtx(ctx context.Context, query string, promRange v1.Range) (model.Value, error) { + if err := ins.ensureAPIClient(); err != nil { + return nil, err + } + + result, warnings, err := ins.apiClient.QueryRange(ctx, query, promRange) + if err != nil { + return nil, fmt.Errorf("failed to query Prometheus: %w", err) + } + + if len(warnings) > 0 { + slog.Info("Prometheus range query returned warnings", + "query", query, + "range", promRange, + "warnings", warnings, + ) + } + + return result, nil +} + +// GetFirstValueOfPrometheusVector returns the first value of a Prometheus vector. +func GetFirstValueOfPrometheusVector(val model.Value) (float64, error) { + res, ok := val.(model.Vector) + if !ok { + return 0, fmt.Errorf("expected a vector, got %T", val) + } + + if len(res) == 0 { + return 0, errors.New("empty vector") + } + + return float64(res[0].Value), nil +} + +// WritePrometheusMatrixToCSVFile writes a Prometheus matrix to a CSV file. +func WritePrometheusMatrixToCSVFile(fileName string, value model.Value) error { + file, err := os.Create(fileName) + if err != nil { + return err + } + defer file.Close() + + csvWriter := csv.NewWriter(file) + + matrix, ok := value.(model.Matrix) + if !ok { + return fmt.Errorf("expected a matrix, got %T", value) + } + + for _, sample := range matrix { + for _, pair := range sample.Values { + record := []string{fmt.Sprint(pair.Timestamp.Unix()), pair.Value.String()} + if err := csvWriter.Write(record); err != nil { + return err + } + } + } + + csvWriter.Flush() + + return nil +} diff --git a/tests/framework/resourcemanager.go b/tests/framework/resourcemanager.go index fdc996c5d6..9b5c19475b 100644 --- a/tests/framework/resourcemanager.go +++ b/tests/framework/resourcemanager.go @@ -27,9 +27,12 @@ import ( "fmt" "io" "net/http" + "reflect" "strings" "time" + "k8s.io/client-go/util/retry" + apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -71,7 +74,17 @@ func (rm *ResourceManager) Apply(resources []client.Object) error { defer cancel() for _, resource := range resources { - if err := rm.K8sClient.Get(ctx, client.ObjectKeyFromObject(resource), resource); err != nil { + var obj client.Object + + unstructuredObj, ok := resource.(*unstructured.Unstructured) + if ok { + obj = unstructuredObj.DeepCopy() + } else { + t := reflect.TypeOf(resource).Elem() + obj = reflect.New(t).Interface().(client.Object) + } + + if err := rm.K8sClient.Get(ctx, client.ObjectKeyFromObject(resource), obj); err != nil { if !apierrors.IsNotFound(err) { return fmt.Errorf("error getting resource: %w", err) } @@ -83,7 +96,19 @@ func (rm *ResourceManager) Apply(resources []client.Object) error { continue } - if err := rm.K8sClient.Update(ctx, resource); err != nil { + // Some tests modify resources that are also modified by NGF (to update their status), so conflicts are possible + // For example, a Gateway resource. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := rm.K8sClient.Get(ctx, client.ObjectKeyFromObject(resource), obj); err != nil { + return err + } + resource.SetResourceVersion(obj.GetResourceVersion()) + if err := rm.K8sClient.Update(ctx, resource); err != nil { + return err + } + return nil + }) + if err != nil { return fmt.Errorf("error updating resource: %w", err) } } @@ -112,8 +137,19 @@ func (rm *ResourceManager) ApplyFromFiles(files []string, namespace string) erro return nil } - obj.SetResourceVersion(fetchedObj.GetResourceVersion()) - if err := rm.K8sClient.Update(ctx, &obj); err != nil { + // Some tests modify resources that are also modified by NGF (to update their status), so conflicts are possible + // For example, a Gateway resource. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := rm.K8sClient.Get(ctx, nsName, fetchedObj); err != nil { + return err + } + obj.SetResourceVersion(fetchedObj.GetResourceVersion()) + if err := rm.K8sClient.Update(ctx, &obj); err != nil { + return err + } + return nil + }) + if err != nil { return fmt.Errorf("error updating resource: %w", err) } @@ -137,7 +173,7 @@ func (rm *ResourceManager) Delete(resources []client.Object, opts ...client.Dele return nil } -// DeleteFromFile deletes Kubernetes resources defined within the provided YAML files. +// DeleteFromFiles deletes Kubernetes resources defined within the provided YAML files. func (rm *ResourceManager) DeleteFromFiles(files []string, namespace string) error { handlerFunc := func(obj unstructured.Unstructured) error { obj.SetNamespace(namespace) @@ -241,7 +277,13 @@ func (rm *ResourceManager) WaitForAppsToBeReady(namespace string) error { ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.CreateTimeout) defer cancel() - if err := rm.waitForPodsToBeReady(ctx, namespace); err != nil { + return rm.WaitForAppsToBeReadyWithCtx(ctx, namespace) +} + +// WaitForAppsToBeReadyWithCtx waits for all apps in the specified namespace to be ready or +// until the provided context is cancelled. +func (rm *ResourceManager) WaitForAppsToBeReadyWithCtx(ctx context.Context, namespace string) error { + if err := rm.WaitForPodsToBeReady(ctx, namespace); err != nil { return err } @@ -252,7 +294,9 @@ func (rm *ResourceManager) WaitForAppsToBeReady(namespace string) error { return rm.waitForGatewaysToBeReady(ctx, namespace) } -func (rm *ResourceManager) waitForPodsToBeReady(ctx context.Context, namespace string) error { +// WaitForPodsToBeReady waits for all Pods in the specified namespace to be ready or +// until the provided context is cancelled. +func (rm *ResourceManager) WaitForPodsToBeReady(ctx context.Context, namespace string) error { return wait.PollUntilContextCancel( ctx, 500*time.Millisecond, @@ -447,6 +491,37 @@ func (rm *ResourceManager) GetPodNames(namespace string, labels client.MatchingL return names, nil } +// GetPods returns all Pods in the specified namespace that match the given labels. +func (rm *ResourceManager) GetPods(namespace string, labels client.MatchingLabels) ([]core.Pod, error) { + ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.GetTimeout) + defer cancel() + + var podList core.PodList + if err := rm.K8sClient.List( + ctx, + &podList, + client.InNamespace(namespace), + labels, + ); err != nil { + return nil, fmt.Errorf("error getting list of Pods: %w", err) + } + + return podList.Items, nil +} + +// GetPod returns the Pod in the specified namespace with the given name. +func (rm *ResourceManager) GetPod(namespace, name string) (*core.Pod, error) { + ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.GetTimeout) + defer cancel() + + var pod core.Pod + if err := rm.K8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &pod); err != nil { + return nil, fmt.Errorf("error getting Pod: %w", err) + } + + return &pod, nil +} + // GetPodLogs returns the logs from the specified Pod func (rm *ResourceManager) GetPodLogs(namespace, name string, opts *core.PodLogOptions) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.GetTimeout) @@ -494,6 +569,24 @@ func (rm *ResourceManager) GetNGFDeployment(namespace, releaseName string) (*app return &deployment, nil } +// ScaleDeployment scales the Deployment to the specified number of replicas. +func (rm *ResourceManager) ScaleDeployment(namespace, name string, replicas int32) error { + ctx, cancel := context.WithTimeout(context.Background(), rm.TimeoutConfig.UpdateTimeout) + defer cancel() + + var deployment apps.Deployment + if err := rm.K8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &deployment); err != nil { + return fmt.Errorf("error getting Deployment: %w", err) + } + + deployment.Spec.Replicas = &replicas + if err := rm.K8sClient.Update(ctx, &deployment); err != nil { + return fmt.Errorf("error updating Deployment: %w", err) + } + + return nil +} + // GetReadyNGFPodNames returns the name(s) of the NGF Pod(s). func GetReadyNGFPodNames( k8sClient client.Client, diff --git a/tests/framework/results.go b/tests/framework/results.go index 7d5b8ad2ee..0e02a0037e 100644 --- a/tests/framework/results.go +++ b/tests/framework/results.go @@ -1,6 +1,7 @@ package framework import ( + "encoding/csv" "fmt" "io" "os" @@ -64,24 +65,48 @@ func WriteSystemInfoToFile(file *os.File, ci ClusterInfo, plus bool) error { return nil } -// GeneratePNG generates a PNG using gnuplot. -func GeneratePNG(resultsDir, inputFilename, outputFilename string) ([]byte, error) { +func generatePNG(resultsDir, inputFilename, outputFilename, configFilename string) error { pwd, err := os.Getwd() if err != nil { - return nil, err + return err } - gnuplotCfg := filepath.Join(filepath.Dir(pwd), "scripts", "requests-plot.gp") + gnuplotCfg := filepath.Join(filepath.Dir(pwd), "scripts", configFilename) files := fmt.Sprintf("inputfile='%s';outputfile='%s'", inputFilename, outputFilename) cmd := exec.Command("gnuplot", "-e", files, "-c", gnuplotCfg) cmd.Dir = resultsDir - return cmd.CombinedOutput() + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to generate PNG: %w; output: %s", err, string(output)) + } + + return nil +} + +// GenerateRequestsPNG generates a Requests PNG using gnuplot. +func GenerateRequestsPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "requests-plot.gpgp") +} + +// GenerateTTRPNG generates a TTR PNG using gnuplot. +func GenerateTTRPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "ttr-plot.gp") +} + +// GenerateCPUPNG generates a CPU usage PNG using gnuplot. +func GenerateCPUPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "cpu-plot.gp") } -// WriteResults writes the vegeta metrics results to the results file in text format. -func WriteResults(resultsFile *os.File, metrics *Metrics) error { +// GenerateMemoryPNG generates a Memory usage PNG using gnuplot. +func GenerateMemoryPNG(resultsDir, inputFilename, outputFilename string) error { + return generatePNG(resultsDir, inputFilename, outputFilename, "memory-plot.gp") +} + +// WriteVegetaResults writes the vegeta metrics results to the results file in text format. +func WriteVegetaResults(resultsFile *os.File, metrics *Metrics) error { reporter := vegeta.NewTextReporter(&metrics.Metrics) return reporter.Report(resultsFile) @@ -96,7 +121,27 @@ func WriteContent(resultsFile *os.File, content string) error { return nil } -// NewCSVEncoder returns a vegeta CSV encoder. -func NewCSVEncoder(w io.Writer) vegeta.Encoder { +// NewVegetaCSVEncoder returns a vegeta CSV encoder. +func NewVegetaCSVEncoder(w io.Writer) vegeta.Encoder { return vegeta.NewCSVEncoder(w) } + +// NewCSVResultsWriter creates and returns a CSV results file and writer. +func NewCSVResultsWriter(resultsDir, fileName string, resultHeaders ...string) (*os.File, *csv.Writer, error) { + if err := os.MkdirAll(resultsDir, 0o750); err != nil { + return nil, nil, err + } + + file, err := os.OpenFile(filepath.Join(resultsDir, fileName), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + return nil, nil, err + } + + writer := csv.NewWriter(file) + + if err = writer.Write(resultHeaders); err != nil { + return nil, nil, err + } + + return file, writer, nil +} diff --git a/tests/framework/timeout.go b/tests/framework/timeout.go index 2aee2e5a21..4b837c757c 100644 --- a/tests/framework/timeout.go +++ b/tests/framework/timeout.go @@ -6,6 +6,9 @@ type TimeoutConfig struct { // CreateTimeout represents the maximum time for a Kubernetes object to be created. CreateTimeout time.Duration + // UpdateTimeout represents the maximum time for a Kubernetes object to be updated. + UpdateTimeout time.Duration + // DeleteTimeout represents the maximum time for a Kubernetes object to be deleted. DeleteTimeout time.Duration @@ -29,6 +32,7 @@ type TimeoutConfig struct { func DefaultTimeoutConfig() TimeoutConfig { return TimeoutConfig{ CreateTimeout: 60 * time.Second, + UpdateTimeout: 60 * time.Second, DeleteTimeout: 10 * time.Second, GetTimeout: 10 * time.Second, ManifestFetchTimeout: 10 * time.Second, diff --git a/tests/scale/manifests/prom-clusterrole.yaml b/tests/scale/manifests/prom-clusterrole.yaml deleted file mode 100644 index f8aefdd36e..0000000000 --- a/tests/scale/manifests/prom-clusterrole.yaml +++ /dev/null @@ -1,44 +0,0 @@ -apiVersion: v1 -kind: Namespace -metadata: - name: prom ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: prometheus - namespace: prom ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: prometheus - namespace: prom -rules: -- apiGroups: [""] - resources: - - nodes - - services - - endpoints - - pods - verbs: ["get", "list", "watch"] -- apiGroups: [""] - resources: - - configmaps - verbs: ["get"] -- nonResourceURLs: ["/metrics"] - verbs: ["get"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: prometheus - namespace: prom -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: prometheus -subjects: -- kind: ServiceAccount - name: prometheus - namespace: prom diff --git a/tests/scale/scale_test.go b/tests/scale/scale_test.go deleted file mode 100644 index 89f87a1d82..0000000000 --- a/tests/scale/scale_test.go +++ /dev/null @@ -1,259 +0,0 @@ -//go:build scale -// +build scale - -package scale - -import ( - "context" - "crypto/tls" - "encoding/csv" - "flag" - "fmt" - "net/http" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "testing" - "time" - - "k8s.io/apimachinery/pkg/util/wait" -) - -// testing flags -var ( - numIterations = flag.Int("i", 1, "number of times to scale the resource") - delay = flag.Duration("delay", 0, "delay between each scaling iteration") - version = flag.String("version", "1.2.0", "version of NGF under test") - plus = flag.Bool("plus", false, "nginx-plus enabled") -) - -func TestScale_Listeners(t *testing.T) { - ip := getIP(t) - url := fmt.Sprintf("http://%s/", ip) - - runScaleTest( - t, - []string{"# Listeners", "Time to Ready (s)", "Error"}, - func(dir string) error { - return generateScaleListenerManifests(*numIterations, dir, false /*non-tls*/) - }, - url, - ) -} - -func TestScale_HTTPSListeners(t *testing.T) { - ip := getIP(t) - url := fmt.Sprintf("https://%s/", ip) - - runScaleTest( - t, - []string{"# HTTPS Listeners", "Time to Ready (s)", "Error"}, - func(dir string) error { - return generateScaleListenerManifests(*numIterations, dir, true /*tls*/) - }, - url, - ) -} - -func TestScale_HTTPRoutes(t *testing.T) { - ip := getIP(t) - url := fmt.Sprintf("http://%s/", ip) - - runScaleTest( - t, - []string{"# HTTPRoutes", "Time to Ready (s)", "Error"}, - func(dir string) error { - return generateScaleHTTPRouteManifests(*numIterations, dir) - }, - url, - ) -} - -func runScaleTest( - t *testing.T, - resultHeaders []string, - generateManifests func(dir string) error, - url string, -) { - t.Helper() - manifestDir := t.Name() - - writer := newResultsWriter(t, t.Name(), resultHeaders...) - - if err := generateManifests(manifestDir); err != nil { - t.Fatalf("failed to generate manifests: %s", err) - } - - startTime := time.Now() - startUnix := fmt.Sprintf("%d", startTime.Unix()) - - if err := kubectlApply(getPrereqDirName(manifestDir)); err != nil { - t.Fatalf("failed to apply prerequisite resources: %s", err) - } - - t.Log("Waiting for all Pods to be Ready") - if err := kubectlWaitAllPodsReady(); err != nil { - t.Fatalf("failed to wait for all Pods to be Ready: %s", err) - } - - for i := 0; i < *numIterations; i++ { - t.Logf("Scaling up to %d resources", i) - - manifestFile := filepath.Join(manifestDir, fmt.Sprintf("manifest-%d.yaml", i)) - - if err := kubectlApply(manifestFile); err != nil { - t.Errorf("failed to scale up: %s", err) - } - - host := fmt.Sprintf("%d.example.com", i) - - t.Logf("Sending request to url %s with host %s...", url, host) - - ttr, err := waitForResponseForHost(url, host) - - seconds := ttr.Seconds() - record := []string{strconv.Itoa(i + 1), strconv.FormatFloat(seconds, 'f', -1, 64)} - if err != nil { - record = append(record, err.Error()) - } - - if err = writer.Write(record); err != nil { - t.Fatalf("failed to write time to ready to csv file: %s", err) - } - - time.Sleep(*delay) - } - - endTime := time.Now() - endUnix := fmt.Sprintf("%d", endTime.Unix()) - - // This accounts for prometheus 10s scraping window - endUnixPlusTen := fmt.Sprintf("%d", endTime.Add(10*time.Second).Unix()) - - records := [][]string{ - {"Test Start", "Test End", "Test End + 10s", "Duration"}, - {startUnix, endUnix, endUnixPlusTen, endTime.Sub(startTime).String()}, - } - - if err := writer.WriteAll(records); err != nil { - t.Logf("failed to write records to csv") - } -} - -func getIP(t *testing.T) string { - t.Helper() - - ip := os.Getenv("NGF_IP") - if ip == "" { - t.Fatalf("NGF_IP env var not set") - } - - return ip -} - -func newResultsWriter(t *testing.T, testName string, resultHeaders ...string) *csv.Writer { - t.Helper() - - versionDir := filepath.Join("results", *version) - if err := os.Mkdir(versionDir, 0o750); err != nil && !os.IsExist(err) { - t.Fatalf("failed to create results version directory: %s", err) - } - - testDirName := testName - if *plus { - testDirName += "_Plus" - } - - dir := filepath.Join(versionDir, testDirName) - if err := os.Mkdir(dir, 0o750); err != nil { - t.Fatalf("failed to create results test directory: %s", err) - } - - file, err := os.Create(filepath.Join(dir, "results.csv")) - if err != nil { - t.Fatalf("failed to create results csv file: %s", err) - } - - writer := csv.NewWriter(file) - - if err = writer.Write(resultHeaders); err != nil { - t.Fatalf("failed to write headers to csv file: %s", err) - } - - t.Cleanup(func() { - writer.Flush() - _ = file.Close() - }) - - return writer -} - -func kubectlApply(filename string) error { - if err := kubectlExec("apply", "-f", filename); err != nil { - return fmt.Errorf("error applying %s: %w", filename, err) - } - - return nil -} - -func kubectlWaitAllPodsReady() error { - if err := kubectlExec("wait", "pod", "--all", "--for=condition=Ready"); err != nil { - return fmt.Errorf("error waiting for all pods to be ready:%w", err) - } - - return nil -} - -func kubectlExec(arg ...string) error { - cmd := exec.Command("kubectl", arg...) - - return cmd.Run() -} - -func waitForResponseForHost(url, host string) (time.Duration, error) { - client := &http.Client{} - - if strings.HasPrefix(url, "https") { - customTransport := http.DefaultTransport.(*http.Transport) - customTransport.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: true, // nolint: gosec - ServerName: host, - } - client.Transport = customTransport - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return 0, err - } - - req.Host = host - - start := time.Now() - - err = wait.PollUntilContextCancel( - ctx, - 200*time.Millisecond, - true, - func(ctx context.Context) (done bool, err error) { - resp, err := client.Do(req) - if err != nil { - fmt.Println("Retrying GET request", "error", err) - return false, err - } - - if resp.StatusCode == http.StatusOK { - return true, nil - } - - fmt.Println("Retrying GET request", "host", host, "status", resp.Status) - return false, nil - }) - - return time.Since(start), err -} diff --git a/tests/scripts/cpu-plot.gp b/tests/scripts/cpu-plot.gp new file mode 100644 index 0000000000..71584811a3 --- /dev/null +++ b/tests/scripts/cpu-plot.gp @@ -0,0 +1,20 @@ +set terminal png size 800,600 +set title "CPU Usage" +set datafile separator "," +set output outputfile . "" + +# X-axis settings +set xlabel "Timestamp" +set xdata time +set timefmt "%s" +set format x "%M:%S" +set xrange [*:*] +set xtics nomirror + +# Y-axis settings +set yrange [0:*] +set ylabel "CPU Usage (core seconds)" +set format y "%.2f" + +# Plotting data +plot inputfile using 1:2 with lines lw 2 notitle diff --git a/tests/scripts/create-gke-cluster.sh b/tests/scripts/create-gke-cluster.sh index 9d034e1c62..b5a2061f8b 100644 --- a/tests/scripts/create-gke-cluster.sh +++ b/tests/scripts/create-gke-cluster.sh @@ -6,6 +6,16 @@ ip_random_digit=$((1 + $RANDOM % 250)) IS_CI=${1:-false} +if [ -z "$GKE_MACHINE_TYPE" ]; then + # If the environment variable is not set, use a default value + GKE_MACHINE_TYPE="e2-medium" +fi + +if [ -z "$GKE_NUM_NODES" ]; then + # If the environment variable is not set, use a default value + GKE_NUM_NODES="3" +fi + gcloud container clusters create ${GKE_CLUSTER_NAME} \ --project ${GKE_PROJECT} \ --zone ${GKE_CLUSTER_ZONE} \ @@ -16,7 +26,9 @@ gcloud container clusters create ${GKE_CLUSTER_NAME} \ --master-ipv4-cidr 172.16.${ip_random_digit}.32/28 \ --metadata=block-project-ssh-keys=TRUE \ --monitoring=SYSTEM,POD,DEPLOYMENT \ - --logging=SYSTEM,WORKLOAD + --logging=SYSTEM,WORKLOAD \ + --machine-type ${GKE_MACHINE_TYPE} \ + --num-nodes ${GKE_NUM_NODES} # Add current IP to GKE master control node access, if this script is not invoked during a CI run. if [ "${IS_CI}" = "false" ]; then diff --git a/tests/scripts/memory-plot.gp b/tests/scripts/memory-plot.gp new file mode 100644 index 0000000000..c25614db70 --- /dev/null +++ b/tests/scripts/memory-plot.gp @@ -0,0 +1,21 @@ +# Define a function to convert bytes to Mebibytes +bytes_to_MiB(bytes) = bytes / (1024.0 * 1024.0) + +set terminal png size 800,600 +set title "Memory Usage" +set datafile separator "," +set output outputfile . "" + +# X-axis settings +set xlabel "Timestamp" +set xdata time +set timefmt "%s" +set format x "%M:%S" +set xrange [*:*] # Specify a range covering all timestamps + +# Y-axis settings +set yrange [0:*] +set ylabel "Memory Usage (MiB)" + +# Plotting data +plot inputfile using 1:(bytes_to_MiB($2)) with lines lw 2 notitle diff --git a/tests/scripts/ttr-plot.gp b/tests/scripts/ttr-plot.gp new file mode 100644 index 0000000000..97d0eb2891 --- /dev/null +++ b/tests/scripts/ttr-plot.gp @@ -0,0 +1,18 @@ +set terminal png size 800,600 +set title "Scaling resources" +set datafile separator "," +set output outputfile . "" + +# X-axis settings +set xrange [0:70] +set xtics 10 +set xlabel "# Resources" +set grid xtics + +# Y-axis settings +set yrange [0:*] +set ylabel "Time to Ready (s)" +set format y "%.1f" + +# Plotting data +plot inputfile using 1:2 with lines lw 2 notitle diff --git a/tests/scripts/vars.env-example b/tests/scripts/vars.env-example index c4163b0120..52138f113f 100644 --- a/tests/scripts/vars.env-example +++ b/tests/scripts/vars.env-example @@ -20,3 +20,5 @@ SOURCE_IP_RANGE= PLUS_ENABLED= NGF_VERSION= +GKE_MACHINE_TYPE= +GKE_NUM_NODES= diff --git a/tests/suite/dataplane_perf_test.go b/tests/suite/dataplane_perf_test.go index 743e79977c..ffb07d85f7 100644 --- a/tests/suite/dataplane_perf_test.go +++ b/tests/suite/dataplane_perf_test.go @@ -97,7 +97,7 @@ var _ = Describe("Dataplane performance", Ordered, Label("nfr", "performance"), } _, metrics := framework.RunLoadTest(cfg) - Expect(framework.WriteResults(outFile, &metrics)).To(Succeed()) + Expect(framework.WriteVegetaResults(outFile, &metrics)).To(Succeed()) _, err = fmt.Fprint(outFile, "```\n") Expect(err).ToNot(HaveOccurred()) diff --git a/tests/scale/manifests/scale-matches.yaml b/tests/suite/manifests/scale/matches.yaml similarity index 100% rename from tests/scale/manifests/scale-matches.yaml rename to tests/suite/manifests/scale/matches.yaml diff --git a/tests/scale/manifests/scale-upstreams.yaml b/tests/suite/manifests/scale/upstreams.yaml similarity index 100% rename from tests/scale/manifests/scale-upstreams.yaml rename to tests/suite/manifests/scale/upstreams.yaml diff --git a/tests/suite/scale_test.go b/tests/suite/scale_test.go new file mode 100644 index 0000000000..db87c32593 --- /dev/null +++ b/tests/suite/scale_test.go @@ -0,0 +1,817 @@ +package suite + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "text/template" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctlr "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/nginxinc/nginx-gateway-fabric/tests/framework" +) + +var _ = Describe("Scale test", Ordered, Label("nfr", "scale"), func() { + // One of the tests - scales upstream servers - requires a big cluster to provision 648 pods. + // On GKE, you can use the following configuration: + // - A Kubernetes cluster with 12 nodes on GKE + // - Node: n2d-standard-16 (16 vCPU, 64GB memory) + + var ( + matchesManifests = []string{ + "scale/matches.yaml", + } + upstreamsManifests = []string{ + "scale/upstreams.yaml", + } + + ns = &core.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "scale", + }, + } + + scrapeInterval = 15 * time.Second + queryRangeStep = 15 * time.Second + + resultsDir string + outFile *os.File + ngfPodName string + promInstance framework.PrometheusInstance + promPortForwardStopCh = make(chan struct{}) + ) + + const ( + httpListenerCount = 64 + httpsListenerCount = 64 + httpRouteCount = 1000 + upstreamServerCount = 648 + ) + + BeforeAll(func() { + // Scale tests need a dedicated NGF instance + // Because they analyze the logs of NGF and NGINX, and they don't want to analyze the logs of other tests. + teardown(releaseName) + + Expect(resourceManager.Apply([]client.Object{ns})).To(Succeed()) + + var err error + resultsDir, err = framework.CreateResultsDir("scale", version) + Expect(err).ToNot(HaveOccurred()) + + filename := filepath.Join(resultsDir, fmt.Sprintf("%s.md", version)) + outFile, err = framework.CreateResultsFile(filename) + Expect(err).ToNot(HaveOccurred()) + Expect(framework.WriteSystemInfoToFile(outFile, clusterInfo, *plusEnabled)).To(Succeed()) + + promCfg := framework.PrometheusConfig{ + ScrapeInterval: scrapeInterval, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + promInstance, err = framework.InstallPrometheus(ctx, resourceManager, promCfg) + Expect(err).ToNot(HaveOccurred()) + + k8sConfig := ctlr.GetConfigOrDie() + + if !clusterInfo.IsGKE { + Expect(promInstance.PortForward(k8sConfig, promPortForwardStopCh)).To(Succeed()) + } + }) + + BeforeEach(func() { + // Scale tests need a dedicated NGF instance per test. + // Because they analyze the logs of NGF and NGINX, and they don't want to analyze the logs of other tests. + cfg := getDefaultSetupCfg() + labelFilter := GinkgoLabelFilter() + cfg.nfr = isNFR(labelFilter) + setup(cfg) + + podNames, err := framework.GetReadyNGFPodNames(k8sClient, ngfNamespace, releaseName, timeoutConfig.GetTimeout) + Expect(err).ToNot(HaveOccurred()) + Expect(podNames).To(HaveLen(1)) + ngfPodName = podNames[0] + }) + + createResponseChecker := func(url, address string) func() error { + return func() error { + status, _, err := framework.Get(url, address, timeoutConfig.RequestTimeout) + if err != nil { + return fmt.Errorf("bad response: %w", err) + } + + if status != 200 { + return fmt.Errorf("unexpected status code: %d", status) + } + + return nil + } + } + + createMetricExistChecker := func(query string, getTime func() time.Time, modifyTime func()) func() error { + return func() error { + queryWithTimestamp := fmt.Sprintf("%s @ %d", query, getTime().Unix()) + + result, err := promInstance.Query(queryWithTimestamp) + if err != nil { + return fmt.Errorf("failed to query Prometheus: %w", err) + } + + if result.String() == "" { + modifyTime() + return errors.New("empty result") + } + + return nil + } + } + + createEndTimeFinder := func(query string, startTime time.Time, t *time.Time) func() error { + return func() error { + result, err := promInstance.QueryRange(query, v1.Range{ + Start: startTime, + End: *t, + Step: queryRangeStep, + }) + if err != nil { + return fmt.Errorf("failed to query Prometheus: %w", err) + } + + if result.String() == "" { + *t = time.Now() + return errors.New("empty result") + } + + return nil + } + } + + getFirstValueOfVector := func(query string) float64 { + result, err := promInstance.Query(query) + Expect(err).ToNot(HaveOccurred()) + + val, err := framework.GetFirstValueOfPrometheusVector(result) + Expect(err).ToNot(HaveOccurred()) + + return val + } + + getBuckets := func(query string) []bucket { + result, err := promInstance.Query(query) + Expect(err).ToNot(HaveOccurred()) + + res, ok := result.(model.Vector) + Expect(ok).To(BeTrue()) + + buckets := make([]bucket, 0, len(res)) + + for _, sample := range res { + le := sample.Metric["le"] + val := float64(sample.Value) + bucket := bucket{ + Le: string(le), + Val: int(val), + } + buckets = append(buckets, bucket) + } + + return buckets + } + + checkLogErrors := func(containerName string, substrings []string, fileName string) int { + logs, err := resourceManager.GetPodLogs(ngfNamespace, ngfPodName, &core.PodLogOptions{ + Container: containerName, + }) + Expect(err).ToNot(HaveOccurred()) + + logLines := strings.Split(logs, "\n") + errors := 0 + + outer: + for _, line := range logLines { + for _, substr := range substrings { + if strings.Contains(line, substr) { + errors++ + continue outer + } + } + } + + // attach full logs + if errors > 0 { + f, err := os.Create(fileName) + Expect(err).ToNot(HaveOccurred()) + defer f.Close() + + _, err = io.WriteString(f, logs) + Expect(err).ToNot(HaveOccurred()) + } + return errors + } + + runTestWithMetricsAndLogs := func(testName, testResultsDir string, test func()) { + var ( + metricExistTimeout = 2 * time.Minute + metricExistPolling = 1 * time.Second + ) + + startTime := time.Now() + + // We need to make sure that for the startTime, the metrics exists in Prometheus. + // if they don't exist, we increase the startTime and try again. + // Note: it's important that Polling interval in Eventually is greater than the startTime increment. + + getStartTime := func() time.Time { return startTime } + modifyStartTime := func() { startTime = startTime.Add(500 * time.Millisecond) } + + queries := []string{ + fmt.Sprintf(`container_memory_usage_bytes{pod="%s",container="nginx-gateway"}`, ngfPodName), + fmt.Sprintf(`container_cpu_usage_seconds_total{pod="%s",container="nginx-gateway"}`, ngfPodName), + // We don't need to check all nginx_gateway_fabric_* metrics, as they are collected at the same time + fmt.Sprintf(`nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`, ngfPodName), + } + + for _, q := range queries { + Eventually( + createMetricExistChecker( + q, + getStartTime, + modifyStartTime, + ), + ).WithTimeout(metricExistTimeout).WithPolling(metricExistPolling).Should(Succeed()) + } + + test() + + // We sleep for 2 scape intervals to ensure that Prometheus scrapes the metrics after the test() finishes + // before endTime, so that we don't lose any metric values like reloads. + time.Sleep(2 * scrapeInterval) + + endTime := time.Now() + + // Now we check that Prometheus has the metrics for the endTime + + // If the test duration is small (which can happen if you run the test with small number of resources), + // the rate query may not return any data. + // To ensure it returns data, we increase the startTime. + Eventually( + createEndTimeFinder( + fmt.Sprintf(`rate(container_cpu_usage_seconds_total{pod="%s",container="nginx-gateway"}[2m])`, ngfPodName), + startTime, + &endTime, + ), + ).WithTimeout(metricExistTimeout).WithPolling(metricExistPolling).Should(Succeed()) + + getEndTime := func() time.Time { return endTime } + noOpModifier := func() {} + + queries = []string{ + fmt.Sprintf(`container_memory_usage_bytes{pod="%s",container="nginx-gateway"}`, ngfPodName), + // We don't need to check all nginx_gateway_fabric_* metrics, as they are collected at the same time + fmt.Sprintf(`nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`, ngfPodName), + } + + for _, q := range queries { + Eventually( + createMetricExistChecker( + q, + getEndTime, + noOpModifier, + ), + ).WithTimeout(metricExistTimeout).WithPolling(metricExistPolling).Should(Succeed()) + } + + // Collect metric values + // For some metrics, generate PNGs + + result, err := promInstance.QueryRange( + fmt.Sprintf(`container_memory_usage_bytes{pod="%s",container="nginx-gateway"}`, ngfPodName), + v1.Range{ + Start: startTime, + End: endTime, + Step: queryRangeStep, + }, + ) + Expect(err).ToNot(HaveOccurred()) + + memCSV := filepath.Join(testResultsDir, "memory.csv") + + Expect(framework.WritePrometheusMatrixToCSVFile(memCSV, result)).To(Succeed()) + + Expect( + framework.GenerateMemoryPNG(testResultsDir, memCSV, "memory.png"), + ).To(Succeed()) + + Expect(os.Remove(memCSV)).To(Succeed()) + + result, err = promInstance.QueryRange( + fmt.Sprintf(`rate(container_cpu_usage_seconds_total{pod="%s",container="nginx-gateway"}[2m])`, ngfPodName), + v1.Range{ + Start: startTime, + End: endTime, + Step: queryRangeStep, + }, + ) + Expect(err).ToNot(HaveOccurred()) + + cpuCSV := filepath.Join(testResultsDir, "cpu.csv") + + Expect(framework.WritePrometheusMatrixToCSVFile(cpuCSV, result)).To(Succeed()) + + Expect( + framework.GenerateCPUPNG(testResultsDir, cpuCSV, "cpu.png"), + ).To(Succeed()) + + Expect(os.Remove(cpuCSV)).To(Succeed()) + + reloadCount := getFirstValueOfVector( + fmt.Sprintf( + `nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_total{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + reloadErrsCount := getFirstValueOfVector( + fmt.Sprintf( + `nginx_gateway_fabric_nginx_reload_errors_total{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reload_errors_total{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + reloadAvgTime := getFirstValueOfVector( + fmt.Sprintf( + `(nginx_gateway_fabric_nginx_reloads_milliseconds_sum{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_milliseconds_sum{pod="%s"} @ %d)`+ + ` / `+ + `(nginx_gateway_fabric_nginx_reloads_total{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_total{pod="%s"} @ %d)`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ngfPodName, + ngfPodName, + startTime.Unix(), + )) + + reloadBuckets := getBuckets( + fmt.Sprintf( + `nginx_gateway_fabric_nginx_reloads_milliseconds_bucket{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_nginx_reloads_milliseconds_bucket{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + eventsCount := getFirstValueOfVector( + fmt.Sprintf( + `nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + eventsAvgTime := getFirstValueOfVector( + fmt.Sprintf( + `(nginx_gateway_fabric_event_batch_processing_milliseconds_sum{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_sum{pod="%s"} @ %d)`+ + ` / `+ + `(nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_count{pod="%s"} @ %d)`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + eventsBuckets := getBuckets( + fmt.Sprintf( + `nginx_gateway_fabric_event_batch_processing_milliseconds_bucket{pod="%s"}`+ + ` - `+ + `nginx_gateway_fabric_event_batch_processing_milliseconds_bucket{pod="%s"} @ %d`, + ngfPodName, + ngfPodName, + startTime.Unix(), + ), + ) + + // Check container logs for errors + + ngfErrors := checkLogErrors( + "nginx-gateway", + []string{"error"}, + filepath.Join(testResultsDir, "ngf.log"), + ) + nginxErrors := checkLogErrors( + "nginx", + []string{"[error]", "[emerg]", "[crit]", "[alert]"}, + filepath.Join(testResultsDir, "nginx.log"), + ) + + // Check container restarts + + pod, err := resourceManager.GetPod(ngfNamespace, ngfPodName) + Expect(err).ToNot(HaveOccurred()) + + findRestarts := func(name string) int { + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name == name { + return int(containerStatus.RestartCount) + } + } + Fail(fmt.Sprintf("container %s not found", name)) + return 0 + } + + ngfRestarts := findRestarts("nginx-gateway") + nginxRestarts := findRestarts("nginx") + + // Write results + + results := scaleTestResults{ + Name: testName, + ReloadCount: int(reloadCount), + ReloadErrsCount: int(reloadErrsCount), + ReloadAvgTime: int(reloadAvgTime), + ReloadBuckets: reloadBuckets, + EventsCount: int(eventsCount), + EventsAvgTime: int(eventsAvgTime), + EventsBuckets: eventsBuckets, + NGFErrors: ngfErrors, + NginxErrors: nginxErrors, + NGFContainerRestarts: ngfRestarts, + NginxContainerRestarts: nginxRestarts, + } + + err = writeScaleResults(outFile, results) + Expect(err).ToNot(HaveOccurred()) + } + + runScaleResources := func(objects framework.ScaleObjects, testResultsDir string, protocol string) { + ttrCsvFile, writer, err := framework.NewCSVResultsWriter(testResultsDir, "ttr.csv") + Expect(err).ToNot(HaveOccurred()) + + Expect(resourceManager.Apply(objects.BaseObjects)).To(Succeed()) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + Expect(resourceManager.WaitForPodsToBeReady(ctx, ns.Name)).To(Succeed()) + + for i := 0; i < len(objects.ScaleIterationGroups); i++ { + Expect(resourceManager.Apply(objects.ScaleIterationGroups[i])).To(Succeed()) + + url := fmt.Sprintf("%s://%d.example.com", protocol, i) + + if protocol == "http" && portFwdPort != 0 { + url = fmt.Sprintf("%s://%d.example.com:%d", protocol, i, portFwdPort) + } else if protocol == "https" && portFwdHTTPSPort != 0 { + url = fmt.Sprintf("%s://%d.example.com:%d", protocol, i, portFwdHTTPSPort) + } + + startCheck := time.Now() + + Eventually( + createResponseChecker(url, address), + ).WithTimeout(30 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) + + ttr := time.Since(startCheck) + + seconds := ttr.Seconds() + record := []string{strconv.Itoa(i + 1), strconv.FormatFloat(seconds, 'f', -1, 64)} + + Expect(writer.Write(record)).To(Succeed()) + } + + writer.Flush() + Expect(ttrCsvFile.Close()).To(Succeed()) + + Expect( + framework.GenerateTTRPNG(testResultsDir, ttrCsvFile.Name(), "ttr.png"), + ).To(Succeed()) + + Expect(os.Remove(ttrCsvFile.Name())).To(Succeed()) + + Expect(resourceManager.Delete(objects.BaseObjects)).To(Succeed()) + for i := 0; i < len(objects.ScaleIterationGroups); i++ { + Expect(resourceManager.Delete(objects.ScaleIterationGroups[i])).To(Succeed()) + } + } + + runScaleUpstreams := func() { + Expect(resourceManager.ApplyFromFiles(upstreamsManifests, ns.Name)).To(Succeed()) + Expect(resourceManager.WaitForAppsToBeReady(ns.Name)).To(Succeed()) + + url := "http://hello.example.com" + if portFwdPort != 0 { + url = fmt.Sprintf("http://hello.example.com:%d", portFwdPort) + } + + Eventually( + createResponseChecker(url, address), + ).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) + + Expect( + resourceManager.ScaleDeployment(ns.Name, "backend", upstreamServerCount), + ).To(Succeed()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + Expect(resourceManager.WaitForPodsToBeReady(ctx, ns.Name)).To(Succeed()) + + Eventually( + createResponseChecker(url, address), + ).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) + + Expect( + resourceManager.DeleteFromFiles(upstreamsManifests, ns.Name), + ).To(Succeed()) + } + + setNamespace := func(objects framework.ScaleObjects) { + for _, obj := range objects.BaseObjects { + obj.SetNamespace(ns.Name) + } + for _, objs := range objects.ScaleIterationGroups { + for _, obj := range objs { + obj.SetNamespace(ns.Name) + } + } + } + + It(fmt.Sprintf("scales HTTP listeners to %d", httpListenerCount), func() { + const testName = "TestScale_Listeners" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0755)).To(Succeed()) + + objects, err := framework.GenerateScaleListenerObjects(httpListenerCount, false /*non-tls*/) + Expect(err).ToNot(HaveOccurred()) + + setNamespace(objects) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleResources( + objects, + testResultsDir, + "http", + ) + }, + ) + + Expect( + os.RemoveAll(filepath.Join(testResultsDir, "manifests")), + ).To(Succeed()) + }) + + It(fmt.Sprintf("scales HTTPS listeners to %d", httpsListenerCount), func() { + const testName = "TestScale_HTTPSListeners" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0o755)).To(Succeed()) + + objects, err := framework.GenerateScaleListenerObjects(httpsListenerCount, true /*tls*/) + Expect(err).ToNot(HaveOccurred()) + + setNamespace(objects) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleResources( + objects, + testResultsDir, + "https", + ) + }, + ) + + Expect( + os.RemoveAll(filepath.Join(testResultsDir, "manifests")), + ).To(Succeed()) + }) + + It(fmt.Sprintf("scales HTTP routes to %d", httpRouteCount), func() { + const testName = "TestScale_HTTPRoutes" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0o755)).To(Succeed()) + + objects, err := framework.GenerateScaleHTTPRouteObjects(httpRouteCount) + Expect(err).ToNot(HaveOccurred()) + + setNamespace(objects) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleResources( + objects, + testResultsDir, + "http", + ) + }, + ) + + Expect( + os.RemoveAll(filepath.Join(testResultsDir, "manifests")), + ).To(Succeed()) + }) + + It(fmt.Sprintf("scales upstream servers to %d", upstreamServerCount), func() { + const testName = "TestScale_UpstreamServers" + + testResultsDir := filepath.Join(resultsDir, testName) + Expect(os.MkdirAll(testResultsDir, 0o755)).To(Succeed()) + + runTestWithMetricsAndLogs( + testName, + testResultsDir, + func() { + runScaleUpstreams() + }, + ) + }) + + It("scale HTTP matches", func() { + const testName = "TestScale_HTTPMatches" + + Expect(resourceManager.ApplyFromFiles(matchesManifests, ns.Name)).To(Succeed()) + Expect(resourceManager.WaitForAppsToBeReady(ns.Name)).To(Succeed()) + + var port int + if portFwdPort != 0 { + port = portFwdPort + } else { + port = 80 + } + + addr := fmt.Sprintf("%s:%d", address, port) + + baseURL := "http://cafe.example.com" + + text := fmt.Sprintf("\n## Test %s\n\n", testName) + + _, err := fmt.Fprint(outFile, text) + Expect(err).ToNot(HaveOccurred()) + + run := func(t framework.Target) { + cfg := framework.LoadTestConfig{ + Targets: []framework.Target{t}, + Rate: 1000, + Duration: 30 * time.Second, + Description: "First matches", + Proxy: addr, + ServerName: "cafe.example.com", + } + _, metrics := framework.RunLoadTest(cfg) + + _, err = fmt.Fprintln(outFile, "```text") + Expect(err).ToNot(HaveOccurred()) + Expect(framework.WriteVegetaResults(outFile, &metrics)).To(Succeed()) + _, err = fmt.Fprintln(outFile, "```") + Expect(err).ToNot(HaveOccurred()) + } + + run(framework.Target{ + Method: "GET", + URL: fmt.Sprintf("%s%s", baseURL, "/latte"), + Header: map[string][]string{ + "header-1": {"header-1-val"}, + }, + }) + + run(framework.Target{ + Method: "GET", + URL: fmt.Sprintf("%s%s", baseURL, "/latte"), + Header: map[string][]string{ + "header-50": {"header-50-val"}, + }, + }) + + Expect(resourceManager.DeleteFromFiles(matchesManifests, ns.Name)).To(Succeed()) + }) + + AfterEach(func() { + teardown(releaseName) + }) + + AfterAll(func() { + close(promPortForwardStopCh) + Expect(framework.UninstallPrometheus()).To(Succeed()) + Expect(resourceManager.Delete([]client.Object{ns})).To(Succeed()) + Expect(outFile.Close()).To(Succeed()) + + // restoring NGF shared among tests in the suite + cfg := getDefaultSetupCfg() + labelFilter := GinkgoLabelFilter() + cfg.nfr = isNFR(labelFilter) + setup(cfg) + }) +}) + +type bucket struct { + Le string + Val int +} + +type scaleTestResults struct { + Name string + + ReloadCount int + ReloadErrsCount int + ReloadAvgTime int + ReloadBuckets []bucket + + EventsCount int + EventsAvgTime int + EventsBuckets []bucket + + NGFErrors int + NginxErrors int + + NGFContainerRestarts int + NginxContainerRestarts int +} + +const scaleResultTemplate = ` +## Test {{ .Name }} + +### Reloads + +- Total: {{ .ReloadCount }} +- Total Errors: {{ .ReloadErrsCount }} +- Average Time: {{ .ReloadAvgTime }}ms +- Reload distribution: +{{- range .ReloadBuckets }} + - {{ .Le }}ms: {{ .Val }} +{{- end }} + +### Event Batch Processing + +- Total: {{ .EventsCount }} +- Average Time: {{ .EventsAvgTime }}ms +- Event Batch Processing distribution: +{{- range .EventsBuckets }} + - {{ .Le }}ms: {{ .Val }} +{{- end }} + +### Errors + +- NGF errors: {{ .NGFErrors }} +- NGF container restarts: {{ .NGFContainerRestarts }} +- NGINX errors: {{ .NginxErrors }} +- NGINX container restarts: {{ .NginxContainerRestarts }} + +### Graphs and Logs + +See [output directory](./{{ .Name }}) for more details. +The logs are attached only if there are errors. +` + +func writeScaleResults(dest io.Writer, results scaleTestResults) error { + tmpl, err := template.New("results").Parse(scaleResultTemplate) + if err != nil { + return err + } + + return tmpl.Execute(dest, results) +} diff --git a/tests/suite/system_suite_test.go b/tests/suite/system_suite_test.go index 7c1218d15f..fad17d8e41 100644 --- a/tests/suite/system_suite_test.go +++ b/tests/suite/system_suite_test.go @@ -4,6 +4,7 @@ import ( "context" "embed" "flag" + "fmt" "path" "path/filepath" "runtime" @@ -64,8 +65,9 @@ var ( manifests embed.FS k8sClient client.Client resourceManager framework.ResourceManager - portForwardStopCh = make(chan struct{}, 1) + portForwardStopCh chan struct{} portFwdPort int + portFwdHTTPSPort int timeoutConfig framework.TimeoutConfig localChartPath string address string @@ -75,8 +77,10 @@ var ( ) const ( - releaseName = "ngf-test" - ngfNamespace = "nginx-gateway" + releaseName = "ngf-test" + ngfNamespace = "nginx-gateway" + ngfHTTPForwardedPort = 10080 + ngfHTTPSForwardedPort = 10443 ) type setupConfig struct { @@ -179,8 +183,12 @@ func setup(cfg setupConfig, extraInstallArgs ...string) { Expect(podNames).ToNot(BeEmpty()) if *serviceType != "LoadBalancer" { - portFwdPort, err = framework.PortForward(k8sConfig, installCfg.Namespace, podNames[0], portForwardStopCh) + ports := []string{fmt.Sprintf("%d:80", ngfHTTPForwardedPort), fmt.Sprintf("%d:443", ngfHTTPSForwardedPort)} + portForwardStopCh = make(chan struct{}) + err = framework.PortForward(k8sConfig, installCfg.Namespace, podNames[0], ports, portForwardStopCh) address = "127.0.0.1" + portFwdPort = ngfHTTPForwardedPort + portFwdHTTPSPort = ngfHTTPSForwardedPort } else { address, err = resourceManager.GetLBIPAddress(installCfg.Namespace) } @@ -189,7 +197,9 @@ func setup(cfg setupConfig, extraInstallArgs ...string) { func teardown(relName string) { if portFwdPort != 0 { - portForwardStopCh <- struct{}{} + close(portForwardStopCh) + portFwdPort = 0 + portFwdHTTPSPort = 0 } cfg := framework.InstallationConfig{ @@ -242,15 +252,18 @@ var _ = BeforeSuite(func() { cfg.nfr = isNFR(labelFilter) // Skip deployment if: - // - running upgrade test (this test will deploy its own version) - // - running longevity teardown (deployment will already exist) - // - running telemetry test (NGF will be deployed as part of the test) - if strings.Contains(labelFilter, "upgrade") || - strings.Contains(labelFilter, "longevity-teardown") || - strings.Contains(labelFilter, "telemetry") || - strings.Contains(labelFilter, "graceful-recovery") { - - cfg.deploy = false + skipSubstrings := []string{ + "upgrade", // - running upgrade test (this test will deploy its own version) + "longevity-teardown", // - running longevity teardown (deployment will already exist) + "telemetry", // - running telemetry test (NGF will be deployed as part of the test) + "scale", // - running scale test (this test will deploy its own version) + "graceful-recovery", // - running graceful recovery test (this test will deploy its own version) + } + for _, s := range skipSubstrings { + if strings.Contains(labelFilter, s) { + cfg.deploy = false + break + } } // use a different release name for longevity to allow us to filter on a specific label when collecting diff --git a/tests/suite/upgrade_test.go b/tests/suite/upgrade_test.go index 09b3ecb8cf..a6fe32b2bf 100644 --- a/tests/suite/upgrade_test.go +++ b/tests/suite/upgrade_test.go @@ -157,7 +157,7 @@ var _ = Describe("Upgrade testing", Label("nfr", "upgrade"), func() { } buf := new(bytes.Buffer) - encoder := framework.NewCSVEncoder(buf) + encoder := framework.NewVegetaCSVEncoder(buf) for _, res := range results { res := res Expect(encoder.Encode(&res)).To(Succeed()) @@ -173,8 +173,9 @@ var _ = Describe("Upgrade testing", Label("nfr", "upgrade"), func() { csvFile.Close() pngName := framework.CreateResultsFilename("png", scheme, *plusEnabled) - output, err := framework.GeneratePNG(resultsDir, csvName, pngName) - Expect(err).ToNot(HaveOccurred(), string(output)) + Expect( + framework.GenerateRequestsPNG(resultsDir, csvName, pngName), + ).To(Succeed()) metricsCh <- &metricsRes }(test) @@ -251,7 +252,7 @@ var _ = Describe("Upgrade testing", Label("nfr", "upgrade"), func() { _, err := fmt.Fprint(resultsFile, res.testName) Expect(err).ToNot(HaveOccurred()) - Expect(framework.WriteResults(resultsFile, res.metrics)).To(Succeed()) + Expect(framework.WriteVegetaResults(resultsFile, res.metrics)).To(Succeed()) link := fmt.Sprintf("\n\n![%[1]v.png](%[1]v.png)\n", res.scheme) if *plusEnabled {