From 010d4b328ea2d22112b0ee4e4a20f1296eb7cc8b Mon Sep 17 00:00:00 2001 From: Paul Balogh Date: Mon, 17 Apr 2023 16:41:17 -0500 Subject: [PATCH] Migrate the implementation for pods.exec and jobs.wait to helpers Fixes #89 Signed-off-by: Paul Balogh --- README.md | 16 +++--- examples/create-job-wait.js | 29 ---------- examples/exec-command.js | 42 -------------- examples/job_operations.js | 14 +++-- examples/pod_operations.js | 14 ++++- examples/wait-job.js | 34 ----------- internal/testutils/fake.go | 6 ++ kubernetes.go | 12 ++-- kubernetes_test.go | 3 +- pkg/api/api.go | 15 ++++- pkg/helpers/helpers.go | 11 +++- pkg/helpers/jobs.go | 72 +++++++++++++++++++++++ pkg/helpers/jobs_test.go | 107 +++++++++++++++++++++++++++++++++++ pkg/helpers/pods.go | 64 +++++++++++++++++++++ pkg/helpers/pods_test.go | 3 +- pkg/helpers/services_test.go | 6 +- 16 files changed, 313 insertions(+), 135 deletions(-) delete mode 100644 examples/create-job-wait.js delete mode 100644 examples/exec-command.js delete mode 100644 examples/wait-job.js create mode 100644 pkg/helpers/jobs.go create mode 100644 pkg/helpers/jobs_test.go diff --git a/README.md b/README.md index 80c24c7..a6887cc 100644 --- a/README.md +++ b/README.md @@ -322,14 +322,14 @@ export default function () { ### (Deprecated) `Client.jobs` -| Method | Description | Example | -| ------------ | ------ | --------------------------------------- | -| apply | creates the Kubernetes resource given a YAML configuration || -| create | creates the Kubernetes resource given an object configuration | [create-job-wait.js](./examples/create-job-wait.js) | -| delete | removes the named Job | | -| get | returns the named Jobs | | -| list | returns a collection of Jobs | | -| wait | wait for all Jobs to complete | [wait-job.js](./examples/wait-job.js) | +| Method | Description | +| ------------ | ------ | +| apply | creates the Kubernetes resource given a YAML configuration | +| create | creates the Kubernetes resource given an object configuration | +| delete | removes the named Job | +| get | returns the named Jobs | +| list | returns a collection of Jobs | +| wait | wait for all Jobs to complete | ```javascript import { Kubernetes } from 'k6/x/kubernetes'; diff --git a/examples/create-job-wait.js b/examples/create-job-wait.js deleted file mode 100644 index 6a10a22..0000000 --- a/examples/create-job-wait.js +++ /dev/null @@ -1,29 +0,0 @@ - -import { sleep } from 'k6'; -import { Kubernetes } from 'k6/x/kubernetes'; - -function getJobNames(nameSpace, kubernetes) { - return kubernetes.jobs.list(nameSpace).map(function(job){ - return job.name - }) -} - -export default function () { - const kubernetes = new Kubernetes({ - // config_path: "/path/to/kube/config", ~/.kube/config by default - }) - const namespace = "default" - const jobName = "new-job" - const image = "perl" - const command = ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] - - kubernetes.jobs.create({ - namespace: namespace, - name: jobName, - image: image, - command: command, - wait: "30s" - }) - console.log(jobName + " job completed") -} - diff --git a/examples/exec-command.js b/examples/exec-command.js deleted file mode 100644 index f6337f1..0000000 --- a/examples/exec-command.js +++ /dev/null @@ -1,42 +0,0 @@ - -import { sleep } from 'k6'; -import { Kubernetes } from 'k6/x/kubernetes'; - -export default function () { - const kubernetes = new Kubernetes({ - }) - const namespace = "default" - const podName = "new-pod" - const image = "busybox" - const command = ["sh", "-c", "sleep 300"] - - kubernetes.pods.create({ - namespace: namespace, - name: podName, - image: image, - command: command - }) - sleep(3) - - const newPod = kubernetes.pods.list(namespace).find(function(pod) { return pod.name == podName}) - if (!newPod) { - throw podName + " pod was not created" - } - - const container = newPod.spec.containers[0].name - const result = kubernetes.pods.exec({ - namespace: namespace, - pod: podName, - container: container.name, - command: ["echo", "'hello xk6'"], - stadin: [] - }) - - const stdout = String.fromCharCode(...result.stdout) - - if (stdout.includes("xk6")) { - console.log("command executed") - } else { - throw "command not executed correctly" - } -} diff --git a/examples/job_operations.js b/examples/job_operations.js index 470f072..a608d69 100644 --- a/examples/job_operations.js +++ b/examples/job_operations.js @@ -52,23 +52,25 @@ export default function () { describe('JSON-based resources', () => { const name = json.metadata.name const ns = json.metadata.namespace + const helpers = kubernetes.helpers(ns) let job - describe('Create our job using the JSON definition', () => { + describe('Create our job using the JSON definition and wait until completed', () => { job = kubernetes.create(json) expect(job.metadata, 'new job').to.have.property('uid') - }) - describe('Retrieve all available jobs', () => { - expect(kubernetes.list("Job.batch", ns).length, 'total jobs').to.be.at.least(1) - }) + let timeout = 10 + expect(helpers.waitJobCompleted(name, timeout), `job completion within ${timeout}s`).to.be.true - describe('Retrieve our job by name and namespace', () => { let fetched = kubernetes.get("Job.batch", name, ns) expect(job.metadata.uid, 'created and fetched uids').to.equal(fetched.metadata.uid) }) + describe('Retrieve all available jobs', () => { + expect(kubernetes.list("Job.batch", ns).length, 'total jobs').to.be.at.least(1) + }) + describe('Remove our jobs to cleanup', () => { kubernetes.delete("Job.batch", name, ns) }) diff --git a/examples/pod_operations.js b/examples/pod_operations.js index 85a0ea6..2f56b67 100644 --- a/examples/pod_operations.js +++ b/examples/pod_operations.js @@ -58,9 +58,21 @@ export default function () { expect(kubernetes.list("Pod", ns).length, 'total pods').to.be.at.least(1) }) - describe('Retrieve our pod by name and namespace', () => { + describe('Retrieve our pod by name and namespace, then execute a command within the pod', () => { let fetched = kubernetes.get("Pod", name, ns) expect(pod.metadata.uid, 'created and fetched uids').to.equal(fetched.metadata.uid) + + let greeting = 'hello xk6-kubernetes' + let exec = { + pod: name, + container: fetched.spec.containers[0].name, + command: ["echo", greeting] + } + let result = helpers.executeInPod(exec) + const stdout = String.fromCharCode(...result.stdout) + const stderr = String.fromCharCode(...result.stderr) + expect(stdout, 'execution result').to.contain(greeting) + expect(stderr, 'execution error').to.be.empty }) describe('Remove our pods to cleanup', () => { diff --git a/examples/wait-job.js b/examples/wait-job.js deleted file mode 100644 index 79d2d61..0000000 --- a/examples/wait-job.js +++ /dev/null @@ -1,34 +0,0 @@ - -import { sleep } from 'k6'; -import { Kubernetes } from 'k6/x/kubernetes'; - -function getJobNames(nameSpace, kubernetes) { - return kubernetes.jobs.list(nameSpace).map(function(job){ - return job.name - }) -} - -export default function () { - const kubernetes = new Kubernetes({ - // config_path: "/path/to/kube/config", ~/.kube/config by default - }) - const namespace = "default" - const jobName = "new-job" - const image = "perl" - const command = ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] - - kubernetes.jobs.create({ - namespace: namespace, - name: jobName, - image: image, - command: command - }) - - const completed = kubernetes.jobs.wait({ - namespace: namespace, - name: jobName, - timeout: "30s" - }) - const jobStatus = completed? "completed": "not completed" - console.log(jobName + " " + jobStatus) -} diff --git a/internal/testutils/fake.go b/internal/testutils/fake.go index e518eb2..0e33aa0 100644 --- a/internal/testutils/fake.go +++ b/internal/testutils/fake.go @@ -2,6 +2,7 @@ package testutils import ( "fmt" + k8s "k8s.io/client-go/kubernetes" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" @@ -10,6 +11,11 @@ import ( "k8s.io/client-go/kubernetes/fake" ) +// NewFakeClientset creates a new instance of a fake Kubernetes clientset +func NewFakeClientset(objs ...runtime.Object) k8s.Interface { + return fake.NewSimpleClientset(objs...) +} + // NewFakeDynamic creates a new instance of a fake dynamic client with a default scheme func NewFakeDynamic(objs ...runtime.Object) (*dynamicfake.FakeDynamicClient, error) { scheme := runtime.NewScheme() diff --git a/kubernetes.go b/kubernetes.go index af114a1..2a5605b 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -135,8 +135,9 @@ func (mi *ModuleInstance) newClient(c goja.ConstructorCall) *goja.Object { if mi.dynamic == nil { k8s, err := api.NewFromConfig( api.KubernetesConfig{ - Config: config, - Context: ctx, + Clientset: obj.client, + Config: config, + Context: ctx, }, ) if err != nil { @@ -147,9 +148,10 @@ func (mi *ModuleInstance) newClient(c goja.ConstructorCall) *goja.Object { // Pre-configured dynamic client and RESTMapper are injected for unit testing k8s, err := api.NewFromConfig( api.KubernetesConfig{ - Client: mi.dynamic, - Mapper: mi.mapper, - Context: ctx, + Clientset: obj.client, + Client: mi.dynamic, + Mapper: mi.mapper, + Context: ctx, }, ) if err != nil { diff --git a/kubernetes_test.go b/kubernetes_test.go index bb7e459..7d0ef54 100644 --- a/kubernetes_test.go +++ b/kubernetes_test.go @@ -15,7 +15,6 @@ import ( "go.k6.io/k6/lib/testutils" "go.k6.io/k6/metrics" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" ) // setupTestEnv should be called from each test to build the execution environment for the test @@ -49,7 +48,7 @@ func setupTestEnv(t *testing.T, objs ...runtime.Object) *goja.Runtime { require.True(t, ok) require.NoError(t, rt.Set("Kubernetes", m.Exports().Named["Kubernetes"])) - m.clientset = fake.NewSimpleClientset(objs...) + m.clientset = localutils.NewFakeClientset(objs...) dynamic, err := localutils.NewFakeDynamic() if err != nil { diff --git a/pkg/api/api.go b/pkg/api/api.go index f20ecc5..d24081e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -4,6 +4,7 @@ package api import ( "context" + k8s "k8s.io/client-go/kubernetes" "github.com/grafana/xk6-kubernetes/pkg/helpers" "github.com/grafana/xk6-kubernetes/pkg/resources" @@ -30,6 +31,8 @@ type KubernetesConfig struct { Context context.Context // kubernetes rest config Config *rest.Config + // Clientset provides access to various API-specific clients + Clientset k8s.Interface // Client is a pre-configured dynamic client. If provided, the rest config is not used Client dynamic.Interface // Mapper is a pre-configured RESTMapper. If provided, the rest config is not used @@ -38,8 +41,10 @@ type KubernetesConfig struct { // kubernetes holds references to implementation of the Kubernetes interface type kubernetes struct { - ctx context.Context + ctx context.Context + Clientset k8s.Interface *resources.Client + Config *rest.Config *restmapper.DeferredDiscoveryRESTMapper } @@ -75,8 +80,10 @@ func NewFromConfig(c KubernetesConfig) (Kubernetes, error) { } return &kubernetes{ - ctx: ctx, - Client: client, + ctx: ctx, + Clientset: c.Clientset, + Client: client, + Config: c.Config, }, nil } @@ -86,7 +93,9 @@ func (k *kubernetes) Helpers(namespace string) helpers.Helpers { } return helpers.NewHelper( k.ctx, + k.Clientset, k.Client, + k.Config, namespace, ) } diff --git a/pkg/helpers/helpers.go b/pkg/helpers/helpers.go index 329bdd2..8bf2d49 100644 --- a/pkg/helpers/helpers.go +++ b/pkg/helpers/helpers.go @@ -3,12 +3,15 @@ package helpers import ( "context" + k8s "k8s.io/client-go/kubernetes" "github.com/grafana/xk6-kubernetes/pkg/resources" + "k8s.io/client-go/rest" ) // Helpers offers Helper functions grouped by the objects they handle type Helpers interface { + JobHelper PodHelper ServiceHelper } @@ -16,14 +19,18 @@ type Helpers interface { // helpers struct holds the data required by the helpers type helpers struct { client *resources.Client + clientset k8s.Interface + config *rest.Config ctx context.Context namespace string } -// NewHelper creates a set of helper functions on the default namespace -func NewHelper(ctx context.Context, client *resources.Client, namespace string) Helpers { +// NewHelper creates a set of helper functions on the specified namespace +func NewHelper(ctx context.Context, clientset k8s.Interface, client *resources.Client, config *rest.Config, namespace string) Helpers { return &helpers{ client: client, + clientset: clientset, + config: config, ctx: ctx, namespace: namespace, } diff --git a/pkg/helpers/jobs.go b/pkg/helpers/jobs.go new file mode 100644 index 0000000..7b9dc3a --- /dev/null +++ b/pkg/helpers/jobs.go @@ -0,0 +1,72 @@ +package helpers + +import ( + "fmt" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + "time" +) + +// JobHelper defines helper functions for manipulating Jobs +type JobHelper interface { + // WaitJobCompleted waits for the Job to be completed for up to the given timeout (in seconds) and returns + // a boolean indicating if the status was reached. If the job is Failed an error is returned. + WaitJobCompleted(name string, timeout uint) (bool, error) +} + +// isCompleted returns if the job is completed or not. Returns an error if the job is failed. +func isCompleted(job *batchv1.Job) (bool, error) { + for _, condition := range job.Status.Conditions { + if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue { + return false, fmt.Errorf("job failed with reason: %v", condition.Reason) + } + if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue { + return true, nil + } + } + return false, nil +} + +func (h *helpers) WaitJobCompleted(name string, timeout uint) (bool, error) { + deadline := time.Duration(timeout) * time.Second + selector := fields.Set{ + "metadata.name": name, + }.AsSelector() + watcher, err := h.clientset.BatchV1().Jobs(h.namespace).Watch( + h.ctx, + metav1.ListOptions{ + FieldSelector: selector.String(), + }, + ) + if err != nil { + return false, err + } + defer watcher.Stop() + + for { + select { + case <-time.After(deadline): + return false, nil + case event := <-watcher.ResultChan(): + if event.Type == watch.Error { + return false, fmt.Errorf("error watching for job: %v", event.Object) + } + if event.Type == watch.Modified { + job, isJob := event.Object.(*batchv1.Job) + if !isJob { + return false, fmt.Errorf("received unknown object while watching for jobs") + } + completed, jobError := isCompleted(job) + if jobError != nil { + return false, jobError + } + if completed { + return true, nil + } + } + } + } +} diff --git a/pkg/helpers/jobs_test.go b/pkg/helpers/jobs_test.go new file mode 100644 index 0000000..fc4ff80 --- /dev/null +++ b/pkg/helpers/jobs_test.go @@ -0,0 +1,107 @@ +package helpers + +import ( + "context" + "github.com/grafana/xk6-kubernetes/pkg/resources" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" //nolint:typecheck + k8stest "k8s.io/client-go/testing" + "testing" + "time" + + "github.com/grafana/xk6-kubernetes/internal/testutils" +) + +const ( + jobName = "test-job" +) + +func TestWaitJobCompleted(t *testing.T) { + t.Parallel() + type TestCase struct { + test string + status string + delay time.Duration + expectError bool + expectedResult bool + timeout uint + } + + testCases := []TestCase{ + { + test: "job completed before timeout", + status: "Complete", + delay: 1 * time.Second, + expectError: false, + expectedResult: true, + timeout: 60, + }, + { + test: "timeout waiting for job to complete", + status: "Complete", + delay: 10 * time.Second, + expectError: false, + expectedResult: false, + timeout: 5, + }, + { + test: "job failed before timeout", + status: "Failed", + delay: 1 * time.Second, + expectError: true, + expectedResult: false, + timeout: 60, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.test, func(t *testing.T) { + t.Parallel() + + clientset := testutils.NewFakeClientset().(*fake.Clientset) + watcher := watch.NewRaceFreeFake() + clientset.PrependWatchReactor("jobs", k8stest.DefaultWatchReactor(watcher, nil)) + + fake, _ := testutils.NewFakeDynamic() + client := resources.NewFromClient(context.TODO(), fake).WithMapper(&testutils.FakeRESTMapper{}) + + fixture := NewHelper(context.TODO(), clientset, client, nil, "default") + job := testutils.NewJob(jobName, "default") + _, err := client.Structured().Create(job) + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + go func(tc TestCase) { + time.Sleep(tc.delay) + job = testutils.NewJobWithStatus(jobName, "default", tc.status) + _, e := client.Structured().Update(job) + if e != nil { + t.Errorf("unexpected error: %v", e) + return + } + watcher.Modify(job) + }(tc) + + result, err := fixture.WaitJobCompleted( + jobName, + tc.timeout, + ) + + if !tc.expectError && err != nil { + t.Errorf("unexpected error: %v", err) + return + } + if tc.expectError && err == nil { + t.Error("expected an error but none returned") + return + } + if result != tc.expectedResult { + t.Errorf("expected result %t but %t returned", tc.expectedResult, result) + return + } + }) + } +} diff --git a/pkg/helpers/pods.go b/pkg/helpers/pods.go index 9f4df09..71096e8 100644 --- a/pkg/helpers/pods.go +++ b/pkg/helpers/pods.go @@ -1,7 +1,10 @@ package helpers import ( + "bytes" "fmt" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" "time" "github.com/grafana/xk6-kubernetes/pkg/utils" @@ -11,6 +14,8 @@ import ( // PodHelper defines helper functions for manipulating Pods type PodHelper interface { + // ExecuteInPod executes a non-interactive command described in options and returns the stdout and stderr outputs + ExecuteInPod(options PodExecOptions) (*PodExecResult, error) // WaitPodRunning waits for the Pod to be running for up to given timeout (in seconds) and returns // a boolean indicating if the status was reached. If the pod is Failed returns error. WaitPodRunning(name string, timeout uint) (bool, error) @@ -35,3 +40,62 @@ func (h *helpers) WaitPodRunning(name string, timeout uint) (bool, error) { return false, nil }) } + +// PodExecOptions describe the command to be executed and the target container +type PodExecOptions struct { + Pod string // name of the Pod to execute the command in + Container string // name of the container to execute the command in + Command []string // command to be executed with its parameters + Stdin []byte // stdin to be supplied to the command + Timeout uint // number of seconds allowed to wait for completion +} + +// PodExecResult contains the output obtained from the execution of a command +type PodExecResult struct { + Stdout []byte + Stderr []byte +} + +func (h *helpers) ExecuteInPod(options PodExecOptions) (*PodExecResult, error) { + result := PodExecResult{} + _, err := utils.Retry(time.Duration(options.Timeout)*time.Second, time.Second, func() (bool, error) { + req := h.clientset.CoreV1().RESTClient(). + Post(). + Namespace(h.namespace). + Resource("pods"). + Name(options.Pod). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: options.Container, + Command: options.Command, + Stdin: true, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(h.config, "POST", req.URL()) + if err != nil { + return false, err + //return nil, err + } + + var stdout, stderr bytes.Buffer + stdin := bytes.NewReader(options.Stdin) + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: &stdout, + Stderr: &stderr, + Tty: true, + }) + if err != nil { + return false, err + } + + result = PodExecResult{ + Stdout: stdout.Bytes(), + Stderr: stderr.Bytes(), + } + return true, nil + }) + return &result, err +} diff --git a/pkg/helpers/pods_test.go b/pkg/helpers/pods_test.go index 74a7e46..e506836 100644 --- a/pkg/helpers/pods_test.go +++ b/pkg/helpers/pods_test.go @@ -85,7 +85,8 @@ func TestPods_Wait(t *testing.T) { t.Parallel() fake, _ := testutils.NewFakeDynamic() client := resources.NewFromClient(context.TODO(), fake).WithMapper(&testutils.FakeRESTMapper{}) - h := NewHelper(context.TODO(), client, testNamespace) + clientset := testutils.NewFakeClientset() + h := NewHelper(context.TODO(), clientset, client, nil, testNamespace) pod := buildPod() _, err := client.Structured().Create(pod) if err != nil { diff --git a/pkg/helpers/services_test.go b/pkg/helpers/services_test.go index 38078df..70ec55d 100644 --- a/pkg/helpers/services_test.go +++ b/pkg/helpers/services_test.go @@ -192,7 +192,8 @@ func Test_WaitServiceReady(t *testing.T) { } fake, _ := testutils.NewFakeDynamic(objs...) client := resources.NewFromClient(context.TODO(), fake).WithMapper(&testutils.FakeRESTMapper{}) - h := NewHelper(context.TODO(), client, "default") + clientset := testutils.NewFakeClientset() + h := NewHelper(context.TODO(), clientset, client, nil, "default") go func(tc TestCase) { if tc.updated == nil { @@ -266,7 +267,8 @@ func Test_GetServiceIP(t *testing.T) { fake, _ := testutils.NewFakeDynamic() client := resources.NewFromClient(context.TODO(), fake).WithMapper(&testutils.FakeRESTMapper{}) - h := NewHelper(context.TODO(), client, "default") + clientset := testutils.NewFakeClientset() + h := NewHelper(context.TODO(), clientset, client, nil, "default") svc := buildService() _, err := client.Structured().Create(svc)