Skip to content

Commit

Permalink
Automate scale test
Browse files Browse the repository at this point in the history
Problem:
Non-functional scale test needs to be run manually.

Solution:
- Automate scale test.
- Use in-cluster Prometheus to collect CPU, memory and NGF metrics.
- Use Kubernetes API server to get NGF logs.

For development and troubleshooting, it is possible to run scale test
locally in Kind cluster. However, it is necessary to bring down
the number of HTTPRoutes to 50 or less (roughly).

Testing:
- Ran this test locally with 64 listeners, 50 routes and 50 upstreams.
- Ran this test on GKE with the default configuration.

Out of scope: ensuring this test runs successfully via GitHub pipeline.

Closes nginx#1368

Largely based on work by Ciara in
nginx#1804

Co-authored-by: Ciara Stacke <[email protected]>
  • Loading branch information
pleshakov and ciarams87 committed May 2, 2024
1 parent 8505f8b commit 2ba7288
Show file tree
Hide file tree
Showing 20 changed files with 1,536 additions and 435 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//go:build scale
// +build scale

package scale
package framework

import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"io"
"text/template"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var gwTmplTxt = `apiVersion: gateway.networking.k8s.io/v1
const gwTmplTxt = `apiVersion: gateway.networking.k8s.io/v1
kind: Gateway
metadata:
name: gateway
Expand All @@ -33,7 +34,7 @@ spec:
{{- end -}}
{{- end -}}`

var hrTmplTxt = `apiVersion: gateway.networking.k8s.io/v1
const hrTmplTxt = `apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: {{ .Name }}
Expand All @@ -53,7 +54,7 @@ spec:
port: 80`

// nolint:all
var secretTmplTxt = `apiVersion: v1
const secretTmplTxt = `apiVersion: v1
kind: Secret
metadata:
name: {{ . }}
Expand All @@ -63,8 +64,7 @@ data:
tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2UUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktjd2dnU2pBZ0VBQW9JQkFRQzZtTnJSdUZ2WXZoSE4KbXI3c1FvNUtKSUVDN3N6TFVrNExFeklSNS9yMEVaUjQ2RnRTaGJQd0ZuaXAwMFBxekhpVkhKYy92TjdkQTVLeApQS1VmdFJuQ1J6YldVaTZBZzJpRU93bXF6WUhGbVNpZkFlVjk0RlAxOGtSbjl1ckV3OEpiRXJIUncrVW51L25tCmFMRHF1eGpFTVBweGhuRklCSnYwK1R3djNEVGx6TjNwUlV6dnpidGZvZCtEVTZBSmR6N3Rid1dTNmR6MHc1Z2kKbW9RelZnbFpnVDBJek9FZkV3NVpWMnRMZllHZWRlRVJ1VjhtR041c09va3R2aGxsMU1udHRaMkZNVHgySmVjUQo3K0xBRm9YVnBTS2NjbUFVZ1JBM0xOOHdVZXBVTHZZdFhiUm1QTFc4SjFINmhFeHJHTHBiTERZNmpzbGxBNlZpCk0xMjVjU0hsQWdNQkFBRUNnZ0VBQnpaRE50bmVTdWxGdk9HZlFYaHRFWGFKdWZoSzJBenRVVVpEcUNlRUxvekQKWlV6dHdxbkNRNlJLczUyandWNTN4cU9kUU94bTNMbjNvSHdNa2NZcEliWW82MjJ2dUczYnkwaVEzaFlsVHVMVgpqQmZCcS9UUXFlL2NMdngvSkczQWhFNmJxdFRjZFlXeGFmTmY2eUtpR1dzZk11WVVXTWs4MGVJVUxuRmZaZ1pOCklYNTlSOHlqdE9CVm9Sa3hjYTVoMW1ZTDFsSlJNM3ZqVHNHTHFybmpOTjNBdWZ3ZGRpK1VDbGZVL2l0K1EvZkUKV216aFFoTlRpNVFkRWJLVStOTnYvNnYvb2JvandNb25HVVBCdEFTUE05cmxFemIralQ1WHdWQjgvLzRGY3VoSwoyVzNpcjhtNHVlQ1JHSVlrbGxlLzhuQmZ0eVhiVkNocVRyZFBlaGlPM1FLQmdRRGlrR3JTOTc3cjg3Y1JPOCtQClpoeXltNXo4NVIzTHVVbFNTazJiOTI1QlhvakpZL2RRZDVTdFVsSWE4OUZKZnNWc1JRcEhHaTFCYzBMaTY1YjIKazR0cE5xcVFoUmZ1UVh0UG9GYXRuQzlPRnJVTXJXbDVJN0ZFejZnNkNQMVBXMEg5d2hPemFKZUdpZVpNYjlYTQoybDdSSFZOcC9jTDlYbmhNMnN0Q1lua2Iwd0tCZ1FEUzF4K0crakEyUVNtRVFWNXA1RnRONGcyamsyZEFjMEhNClRIQ2tTazFDRjhkR0Z2UWtsWm5ZbUt0dXFYeXNtekJGcnZKdmt2eUhqbUNYYTducXlpajBEdDZtODViN3BGcVAKQWxtajdtbXI3Z1pUeG1ZMXBhRWFLMXY4SDNINGtRNVl3MWdrTWRybVJHcVAvaTBGaDVpaGtSZS9DOUtGTFVkSQpDcnJjTzhkUVp3S0JnSHA1MzRXVWNCMVZibzFlYStIMUxXWlFRUmxsTWlwRFM2TzBqeWZWSmtFb1BZSEJESnp2ClIrdzZLREJ4eFoyWmJsZ05LblV0YlhHSVFZd3lGelhNcFB5SGxNVHpiZkJhYmJLcDFyR2JVT2RCMXpXM09PRkgKcmppb21TUm1YNmxhaDk0SjRHU0lFZ0drNGw1SHhxZ3JGRDZ2UDd4NGRjUktJWFpLZ0w2dVJSSUpBb0dCQU1CVApaL2p5WStRNTBLdEtEZHUrYU9ORW4zaGxUN3hrNXRKN3NBek5rbWdGMU10RXlQUk9Xd1pQVGFJbWpRbk9qbHdpCldCZ2JGcXg0M2ZlQ1Z4ZXJ6V3ZEM0txaWJVbWpCTkNMTGtYeGh3ZEVteFQwVit2NzZGYzgwaTNNYVdSNnZZR08KditwVVovL0F6UXdJcWZ6dlVmV2ZxdStrMHlhVXhQOGNlcFBIRyt0bEFvR0FmQUtVVWhqeFU0Ym5vVzVwVUhKegpwWWZXZXZ5TW54NWZyT2VsSmRmNzlvNGMvMHhVSjh1eFBFWDFkRmNrZW96dHNpaVFTNkN6MENRY09XVWxtSkRwCnVrdERvVzM3VmNSQU1BVjY3NlgxQVZlM0UwNm5aL2g2Tkd4Z28rT042Q3pwL0lkMkJPUm9IMFAxa2RjY1NLT3kKMUtFZlNnb1B0c1N1eEpBZXdUZmxDMXc9Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K
`

var appTmplTxt = `apiVersion: v1
apiVersion: apps/v1
const appTmplTxt = `apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ . }}
Expand Down Expand Up @@ -105,25 +105,55 @@ var (
appTmpl = template.Must(template.New("app").Parse(appTmplTxt))
)

type Listener struct {
type listener struct {
Name string
HostnamePrefix string
SecretName string
}

type Route struct {
type route struct {
Name string
ListenerName string
HostnamePrefix string
BackendName string
}

func getPrereqDirName(manifestDir string) string {
return filepath.Join(manifestDir, "prereqs")
// ScaleObjects contains objects for scale testing.
type ScaleObjects struct {
// BaseObjects contains objects that are common to all scale iterations.
BaseObjects []client.Object
// ScaleIterationGroups contains objects for each scale iteration.
ScaleIterationGroups [][]client.Object
}

func generateScaleListenerManifests(numListeners int, manifestDir string, tls bool) error {
listeners := make([]Listener, 0)
func decodeObjects(reader io.Reader) ([]client.Object, error) {
var objects []client.Object

decoder := yaml.NewYAMLOrJSONDecoder(reader, 4096)
for {
obj := unstructured.Unstructured{}
if err := decoder.Decode(&obj); err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("error decoding resource: %w", err)
}

if len(obj.Object) == 0 {
continue
}

objects = append(objects, &obj)
}

return objects, nil
}

// GenerateScaleListenerObjects generates objects for a given number of listeners for the scale test.
func GenerateScaleListenerObjects(numListeners int, tls bool) (ScaleObjects, error) {
var result ScaleObjects

listeners := make([]listener, 0)
backends := make([]string, 0)
secrets := make([]string, 0)

Expand All @@ -138,13 +168,13 @@ func generateScaleListenerManifests(numListeners int, manifestDir string, tls bo
secrets = append(secrets, secretName)
}

listeners = append(listeners, Listener{
listeners = append(listeners, listener{
Name: listenerName,
HostnamePrefix: hostnamePrefix,
SecretName: secretName,
})

route := Route{
r := route{
Name: fmt.Sprintf("route-%d", i),
ListenerName: listenerName,
HostnamePrefix: hostnamePrefix,
Expand All @@ -153,80 +183,101 @@ func generateScaleListenerManifests(numListeners int, manifestDir string, tls bo

backends = append(backends, backendName)

if err := generateManifests(manifestDir, i, listeners, []Route{route}); err != nil {
return err
objects, err := generateManifests(listeners, []route{r})
if err != nil {
return ScaleObjects{}, err
}

result.ScaleIterationGroups = append(result.ScaleIterationGroups, objects)
}

if err := generateSecrets(getPrereqDirName(manifestDir), secrets); err != nil {
return err
secretObjects, err := generateSecrets(secrets)
if err != nil {
return ScaleObjects{}, err
}

return generateBackendAppManifests(getPrereqDirName(manifestDir), backends)
}
result.BaseObjects = append(result.BaseObjects, secretObjects...)

func generateSecrets(secretsDir string, secrets []string) error {
err := os.Mkdir(secretsDir, 0o750)
if err != nil && !os.IsExist(err) {
return err
backendObjects, err := generateBackendAppObjects(backends)
if err != nil {
return ScaleObjects{}, err
}

result.BaseObjects = append(result.BaseObjects, backendObjects...)

return result, nil
}

func generateSecrets(secrets []string) ([]client.Object, error) {
objects := make([]client.Object, 0, len(secrets))

for _, secret := range secrets {
var buf bytes.Buffer

if err = secretTmpl.Execute(&buf, secret); err != nil {
return err
if err := secretTmpl.Execute(&buf, secret); err != nil {
return nil, err
}

path := filepath.Join(secretsDir, fmt.Sprintf("%s.yaml", secret))

fmt.Println("Writing", path)
if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil {
return err
objs, err := decodeObjects(&buf)
if err != nil {
return nil, err
}

objects = append(objects, objs...)
}

return nil
return objects, nil
}

func generateScaleHTTPRouteManifests(numRoutes int, manifestDir string) error {
l := Listener{
// GenerateScaleHTTPRouteObjects generates objects for a given number of routes for the scale test.
func GenerateScaleHTTPRouteObjects(numRoutes int) (ScaleObjects, error) {
var result ScaleObjects

l := listener{
Name: "listener",
HostnamePrefix: "*",
}

backendName := "backend"

for i := 0; i < numRoutes; i++ {

route := Route{
r := route{
Name: fmt.Sprintf("route-%d", i),
HostnamePrefix: fmt.Sprintf("%d", i),
ListenerName: "listener",
BackendName: backendName,
}

var listeners []Listener
var listeners []listener
if i == 0 {
// only generate a Gateway on the first iteration
listeners = []Listener{l}
listeners = []listener{l}
}

if err := generateManifests(manifestDir, i, listeners, []Route{route}); err != nil {
return err
objects, err := generateManifests(listeners, []route{r})
if err != nil {
return ScaleObjects{}, err
}

result.ScaleIterationGroups = append(result.ScaleIterationGroups, objects)
}

backendObjects, err := generateBackendAppObjects([]string{backendName})
if err != nil {
return ScaleObjects{}, err
}

return generateBackendAppManifests(getPrereqDirName(manifestDir), []string{backendName})
result.BaseObjects = backendObjects

return result, nil
}

func generateManifests(outDir string, version int, listeners []Listener, routes []Route) error {
func generateManifests(listeners []listener, routes []route) ([]client.Object, error) {
var buf bytes.Buffer

if len(listeners) > 0 {
if err := gwTmpl.Execute(&buf, listeners); err != nil {
return err
return nil, err
}
}

Expand All @@ -236,42 +287,30 @@ func generateManifests(outDir string, version int, listeners []Listener, routes
}

if err := hrTmpl.Execute(&buf, r); err != nil {
return err
return nil, err
}
}

err := os.Mkdir(outDir, 0o750)
if err != nil && !os.IsExist(err) {
return err
}

filename := fmt.Sprintf("manifest-%d.yaml", version)
path := filepath.Join(outDir, filename)

fmt.Println("Writing", path)
return os.WriteFile(path, buf.Bytes(), 0o600)
return decodeObjects(&buf)
}

func generateBackendAppManifests(outDir string, backends []string) error {
err := os.Mkdir(outDir, 0o750)
if err != nil && !os.IsExist(err) {
return err
}
func generateBackendAppObjects(backends []string) ([]client.Object, error) {
objects := make([]client.Object, 0, 2*len(backends))

for _, backend := range backends {
var buf bytes.Buffer

if err = appTmpl.Execute(&buf, backend); err != nil {
return err
if err := appTmpl.Execute(&buf, backend); err != nil {
return nil, err
}

path := filepath.Join(outDir, fmt.Sprintf("%s.yaml", backend))

fmt.Println("Writing", path)
if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil {
return err
objs, err := decodeObjects(&buf)
if err != nil {
return nil, err
}

objects = append(objects, objs...)
}

return nil
return objects, nil
}
46 changes: 29 additions & 17 deletions tests/framework/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@ import (
"net/http"
"net/url"
"path"
"time"

"log/slog"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

// PortForward starts a port-forward to the specified Pod and returns the local port being forwarded.
func PortForward(config *rest.Config, namespace, podName string, stopCh chan struct{}) (int, error) {
// PortForward starts a port-forward to the specified Pod.
func PortForward(config *rest.Config, namespace, podName string, ports []string, stopCh <-chan struct{}) error {
roundTripper, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
return 0, fmt.Errorf("error creating roundtripper: %w", err)
return fmt.Errorf("error creating roundtripper: %w", err)
}

serverURL, err := url.Parse(config.Host)
if err != nil {
return 0, fmt.Errorf("error parsing rest config host: %w", err)
return fmt.Errorf("error parsing rest config host: %w", err)
}

serverURL.Path = path.Join(
Expand All @@ -33,25 +36,34 @@ func PortForward(config *rest.Config, namespace, podName string, stopCh chan str

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, serverURL)

readyCh := make(chan struct{}, 1)
out, errOut := new(bytes.Buffer), new(bytes.Buffer)

forwarder, err := portforward.New(dialer, []string{":80"}, stopCh, readyCh, out, errOut)
if err != nil {
return 0, fmt.Errorf("error creating port forwarder: %w", err)
forward := func() error {
readyCh := make(chan struct{}, 1)

forwarder, err := portforward.New(dialer, ports, stopCh, readyCh, out, errOut)
if err != nil {
return fmt.Errorf("error creating port forwarder: %w", err)
}

return forwarder.ForwardPorts()
}

go func() {
if err := forwarder.ForwardPorts(); err != nil {
panic(err)
for {
if err := forward(); err != nil {
slog.Error("error forwarding ports", "error", err)
slog.Info("retrying port forward in 100ms...")
}

select {
case <-stopCh:
return
case <-time.After(100 * time.Millisecond):
// retrying
}
}
}()

<-readyCh
ports, err := forwarder.GetPorts()
if err != nil {
return 0, fmt.Errorf("error getting ports being forwarded: %w", err)
}

return int(ports[0].Local), nil
return nil
}
Loading

0 comments on commit 2ba7288

Please sign in to comment.