From 7de09100810cc221446e9779ee41458314c74392 Mon Sep 17 00:00:00 2001 From: Kubeshop <174873053+ed382@users.noreply.github.com> Date: Wed, 21 Aug 2024 11:14:11 +0200 Subject: [PATCH 1/2] fix: http/2 stream leaks --- cmd/kubectl-testkube/commands/init.go | 1 + cmd/proxy/main.go | 1 + cmd/testworkflow-toolkit/artifacts/cloud_uploader.go | 1 + cmd/testworkflow-toolkit/commands/tarball.go | 7 ++++++- cmd/testworkflow-toolkit/commands/transfer.go | 1 + cmd/testworkflow-toolkit/common/tarball.go | 4 +--- internal/app/api/v1/executions.go | 4 ++-- internal/app/api/v1/testworkflowexecutions.go | 6 ++++-- pkg/api/v1/client/uploads.go | 1 + pkg/cloud/client/rest.go | 3 +++ pkg/cloud/data/artifact/artifacts_storage.go | 4 ++-- pkg/cloud/data/artifact/uploader.go | 1 + pkg/cloud/data/testworkflow/output.go | 8 ++++++-- pkg/repository/testworkflow/interface.go | 2 +- pkg/repository/testworkflow/minio_output_repository.go | 4 ++-- pkg/repository/testworkflow/mock_output_repository.go | 4 ++-- pkg/storage/artifacts.go | 2 +- pkg/storage/artifacts_mock.go | 4 ++-- pkg/storage/minio/artifacts_storage.go | 2 +- 19 files changed, 39 insertions(+), 21 deletions(-) diff --git a/cmd/kubectl-testkube/commands/init.go b/cmd/kubectl-testkube/commands/init.go index b0342b5d553..472b31332c0 100644 --- a/cmd/kubectl-testkube/commands/init.go +++ b/cmd/kubectl-testkube/commands/init.go @@ -115,6 +115,7 @@ func NewInitCmdDemo() *cobra.Command { if export { valuesResp, err := http.Get(demoValuesUrl) ui.ExitOnError("cannot fetch values", err) + defer valuesResp.Body.Close() valuesBytes, err := io.ReadAll(valuesResp.Body) ui.ExitOnError("cannot fetch values", err) values := string(valuesBytes) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 7af002e5d1f..671085fa0e7 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -45,6 +45,7 @@ func (DebugTransport) RoundTrip(r *http.Request) (*http.Response, error) { func proxyPass(res http.ResponseWriter, req *http.Request) { fmt.Printf("\n-------------\n") body, _ := io.ReadAll(req.Body) + req.Body.Close() fmt.Printf("%s\n", body) prefix := fmt.Sprintf("/api/v1/namespaces/%s/services/testkube-api-server:%d/proxy", *namespace, *apiPort) diff --git a/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go b/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go index 62aa587da94..0b56f0cfaae 100644 --- a/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go +++ b/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go @@ -104,6 +104,7 @@ func (d *cloudUploader) putObject(url string, path string, file io.Reader, size if err != nil { return err } + defer res.Body.Close() if res.StatusCode != http.StatusOK { b, _ := io.ReadAll(res.Body) return errors.Errorf("failed saving file: status code: %d / message: %s", res.StatusCode, string(b)) diff --git a/cmd/testworkflow-toolkit/commands/tarball.go b/cmd/testworkflow-toolkit/commands/tarball.go index 9330ecc3b48..06172bde9bf 100644 --- a/cmd/testworkflow-toolkit/commands/tarball.go +++ b/cmd/testworkflow-toolkit/commands/tarball.go @@ -23,7 +23,7 @@ func NewTarballCmd() *cobra.Command { os.Exit(0) } - for _, pair := range pairs { + processPair := func(pair string) { dirPath, url, found := strings.Cut(pair, "=") if !found { ui.Fail(fmt.Errorf("invalid tarball pair: %s", pair)) @@ -33,6 +33,7 @@ func NewTarballCmd() *cobra.Command { // Start downloading the file resp, err := http.Get(url) ui.ExitOnError("download the tarball", err) + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { ui.Fail(fmt.Errorf("failed to download the tarball: status code %d", resp.StatusCode)) @@ -44,6 +45,10 @@ func NewTarballCmd() *cobra.Command { ui.Fail(err) } } + + for _, pair := range pairs { + processPair(pair) + } }, } diff --git a/cmd/testworkflow-toolkit/commands/transfer.go b/cmd/testworkflow-toolkit/commands/transfer.go index d3c86a2ea81..04ab4630697 100644 --- a/cmd/testworkflow-toolkit/commands/transfer.go +++ b/cmd/testworkflow-toolkit/commands/transfer.go @@ -48,6 +48,7 @@ func NewTransferCmd() *cobra.Command { }() resp, err := http.Post(url, "application/tar+gzip", reader) ui.ExitOnError("send the tarball request", err) + _ = resp.Body.Close() if resp.StatusCode != http.StatusNoContent { ui.Fail(fmt.Errorf("failed to send the tarball: status code %d", resp.StatusCode)) diff --git a/cmd/testworkflow-toolkit/common/tarball.go b/cmd/testworkflow-toolkit/common/tarball.go index 1f2c38b3cab..cd687754347 100644 --- a/cmd/testworkflow-toolkit/common/tarball.go +++ b/cmd/testworkflow-toolkit/common/tarball.go @@ -79,9 +79,7 @@ func WriteTarball(stream io.Writer, dirPath string, files []string) error { return err } -func UnpackTarball(dirPath string, stream io.ReadCloser) error { - defer stream.Close() - +func UnpackTarball(dirPath string, stream io.Reader) error { // Process the files uncompressedStream, err := gzip.NewReader(stream) if err != nil { diff --git a/internal/app/api/v1/executions.go b/internal/app/api/v1/executions.go index 8cab531e84c..41605d61a52 100644 --- a/internal/app/api/v1/executions.go +++ b/internal/app/api/v1/executions.go @@ -429,7 +429,7 @@ func (s *TestkubeAPI) GetArtifactHandler() fiber.Handler { return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: db could not get execution result: %w", errPrefix, err)) } - var file io.Reader + var file io.ReadCloser var bucket string artifactsStorage := s.ArtifactsStorage folder := execution.Id @@ -451,8 +451,8 @@ func (s *TestkubeAPI) GetArtifactHandler() fiber.Handler { if err != nil { return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not download file: %w", errPrefix, err)) } + defer file.Close() - // SendStream promises to close file using io.Close() method return c.SendStream(file) } } diff --git a/internal/app/api/v1/testworkflowexecutions.go b/internal/app/api/v1/testworkflowexecutions.go index 3f3bc89230e..5aae948d2eb 100644 --- a/internal/app/api/v1/testworkflowexecutions.go +++ b/internal/app/api/v1/testworkflowexecutions.go @@ -191,13 +191,14 @@ func (s *TestkubeAPI) GetTestWorkflowExecutionLogsHandler() fiber.Handler { return s.ClientError(c, "get execution", err) } - reader, err := s.TestWorkflowOutput.ReadLog(ctx, executionID, execution.Workflow.Name) + rc, err := s.TestWorkflowOutput.ReadLog(ctx, executionID, execution.Workflow.Name) if err != nil { return s.InternalError(c, "can't get log", executionID, err) } + defer rc.Close() c.Context().SetContentType(mediaTypePlainText) - _, err = io.Copy(c.Response().BodyWriter(), reader) + _, err = io.Copy(c.Response().BodyWriter(), rc) return err } } @@ -407,6 +408,7 @@ func (s *TestkubeAPI) GetTestWorkflowArtifactHandler() fiber.Handler { if err != nil { return s.InternalError(c, errPrefix, "could not download file", err) } + defer file.Close() return c.SendStream(file) } diff --git a/pkg/api/v1/client/uploads.go b/pkg/api/v1/client/uploads.go index b54893620cc..446708e9b40 100644 --- a/pkg/api/v1/client/uploads.go +++ b/pkg/api/v1/client/uploads.go @@ -73,6 +73,7 @@ func (c CopyFileDirectClient) UploadFile(parentName string, parentType TestingTy if err != nil { return err } + defer resp.Body.Close() if err = httpResponseError(resp); err != nil { return fmt.Errorf("api %s returned error: %w", uri, err) diff --git a/pkg/cloud/client/rest.go b/pkg/cloud/client/rest.go index 113b9a99b2a..9e35bcbeba3 100644 --- a/pkg/cloud/client/rest.go +++ b/pkg/cloud/client/rest.go @@ -35,6 +35,7 @@ func (c RESTClient[T]) List() ([]T, error) { if err != nil { return nil, err } + defer resp.Body.Close() var orgsResponse ListResponse[T] err = json.NewDecoder(resp.Body).Decode(&orgsResponse) @@ -52,6 +53,7 @@ func (c RESTClient[T]) Get(id string) (e T, err error) { if err != nil { return e, err } + defer resp.Body.Close() if resp.StatusCode > 299 { d, err := io.ReadAll(resp.Body) @@ -87,6 +89,7 @@ func (c RESTClient[T]) Create(entity T, overridePath ...string) (e T, err error) if err != nil { return e, err } + defer resp.Body.Close() if resp.StatusCode > 299 { d, err := io.ReadAll(resp.Body) diff --git a/pkg/cloud/data/artifact/artifacts_storage.go b/pkg/cloud/data/artifact/artifacts_storage.go index a6410a86eb3..bd1055a2542 100644 --- a/pkg/cloud/data/artifact/artifacts_storage.go +++ b/pkg/cloud/data/artifact/artifacts_storage.go @@ -44,7 +44,7 @@ func (c *CloudArtifactsStorage) ListFiles(ctx context.Context, executionID, test return commandResponse.Artifacts, nil } -func (c *CloudArtifactsStorage) DownloadFile(ctx context.Context, file, executionID, testName, testSuiteName, testWorkflowName string) (io.Reader, error) { +func (c *CloudArtifactsStorage) DownloadFile(ctx context.Context, file, executionID, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) { req := DownloadFileRequest{ File: file, ExecutionID: executionID, @@ -73,7 +73,7 @@ func (c *CloudArtifactsStorage) DownloadArchive(ctx context.Context, executionID return nil, errors.WithStack(ErrOperationNotSupported) } -func (c *CloudArtifactsStorage) getObject(ctx context.Context, url string) (io.Reader, error) { +func (c *CloudArtifactsStorage) getObject(ctx context.Context, url string) (io.ReadCloser, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err diff --git a/pkg/cloud/data/artifact/uploader.go b/pkg/cloud/data/artifact/uploader.go index ddeae1bf5d2..7bf7c90619b 100644 --- a/pkg/cloud/data/artifact/uploader.go +++ b/pkg/cloud/data/artifact/uploader.go @@ -81,6 +81,7 @@ func (u *CloudUploader) putObject(ctx context.Context, url string, object *scrap if err != nil { return errors.Wrap(err, "failed to send file to cloud") } + defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { return errors.Errorf("error getting file from presigned url: expected 200 OK response code, got %d", rsp.StatusCode) } diff --git a/pkg/cloud/data/testworkflow/output.go b/pkg/cloud/data/testworkflow/output.go index 5848b3e0e08..1744bf9ce1a 100644 --- a/pkg/cloud/data/testworkflow/output.go +++ b/pkg/cloud/data/testworkflow/output.go @@ -78,14 +78,16 @@ func (r *CloudOutputRepository) SaveLog(ctx context.Context, id, workflowName st if err != nil { return errors.Wrap(err, "failed to save file in cloud storage") } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return errors.Errorf("error saving file with presigned url: expected 200 OK response code, got %d", res.StatusCode) } return nil } -// ReadLog streams the output from Cloud -func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) { +// ReadLog streams the output from Cloud. +// The caller is responsible for closing the stream. +func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) { url, err := r.PresignReadLog(ctx, id, workflowName) if err != nil { return nil, err @@ -99,8 +101,10 @@ func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName st return nil, errors.Wrap(err, "failed to get file from cloud storage") } if res.StatusCode != http.StatusOK { + _ = res.Body.Close() return nil, errors.Errorf("error getting file from presigned url: expected 200 OK response code, got %d", res.StatusCode) } + // Caller must close this stream. return res.Body, nil } diff --git a/pkg/repository/testworkflow/interface.go b/pkg/repository/testworkflow/interface.go index cea8a37053e..3ac3d2b0589 100644 --- a/pkg/repository/testworkflow/interface.go +++ b/pkg/repository/testworkflow/interface.go @@ -83,7 +83,7 @@ type OutputRepository interface { // SaveLog streams the output from the workflow to Minio SaveLog(ctx context.Context, id, workflowName string, reader io.Reader) error // ReadLog streams the output from Minio - ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) + ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) // HasLog checks if there is an output in Minio HasLog(ctx context.Context, id, workflowName string) (bool, error) diff --git a/pkg/repository/testworkflow/minio_output_repository.go b/pkg/repository/testworkflow/minio_output_repository.go index 8bcb3e28128..ae2b3fd572c 100644 --- a/pkg/repository/testworkflow/minio_output_repository.go +++ b/pkg/repository/testworkflow/minio_output_repository.go @@ -47,12 +47,12 @@ func (m *MinioRepository) SaveLog(ctx context.Context, id, workflowName string, return m.storage.UploadFileToBucket(ctx, m.bucket, bucketFolder, id, reader, -1) } -func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) { +func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) { file, _, err := m.storage.DownloadFileFromBucket(ctx, m.bucket, bucketFolder, id) if err != nil { return nil, err } - return file, nil + return io.NopCloser(file), nil } func (m *MinioRepository) HasLog(ctx context.Context, id, workflowName string) (bool, error) { diff --git a/pkg/repository/testworkflow/mock_output_repository.go b/pkg/repository/testworkflow/mock_output_repository.go index de7247db86e..179906d84e6 100644 --- a/pkg/repository/testworkflow/mock_output_repository.go +++ b/pkg/repository/testworkflow/mock_output_repository.go @@ -109,10 +109,10 @@ func (mr *MockOutputRepositoryMockRecorder) PresignSaveLog(arg0, arg1, arg2 inte } // ReadLog mocks base method. -func (m *MockOutputRepository) ReadLog(arg0 context.Context, arg1, arg2 string) (io.Reader, error) { +func (m *MockOutputRepository) ReadLog(arg0 context.Context, arg1, arg2 string) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadLog", arg0, arg1, arg2) - ret0, _ := ret[0].(io.Reader) + ret0, _ := ret[0].(io.ReadCloser) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/storage/artifacts.go b/pkg/storage/artifacts.go index 9ce0815dc43..21f7dd3dd7d 100644 --- a/pkg/storage/artifacts.go +++ b/pkg/storage/artifacts.go @@ -12,7 +12,7 @@ type ArtifactsStorage interface { // ListFiles lists available files in the configured bucket ListFiles(ctx context.Context, executionId, testName, testSuiteName, testWorkflowName string) ([]testkube.Artifact, error) // DownloadFile downloads file from configured - DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.Reader, error) + DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) // DownloadArchive downloads archive from configured DownloadArchive(ctx context.Context, executionId string, masks []string) (io.Reader, error) // UploadFile uploads file to configured bucket diff --git a/pkg/storage/artifacts_mock.go b/pkg/storage/artifacts_mock.go index 19db9eef45f..e4e2d7c3777 100644 --- a/pkg/storage/artifacts_mock.go +++ b/pkg/storage/artifacts_mock.go @@ -52,10 +52,10 @@ func (mr *MockArtifactsStorageMockRecorder) DownloadArchive(arg0, arg1, arg2 int } // DownloadFile mocks base method. -func (m *MockArtifactsStorage) DownloadFile(arg0 context.Context, arg1, arg2, arg3, arg4, arg5 string) (io.Reader, error) { +func (m *MockArtifactsStorage) DownloadFile(arg0 context.Context, arg1, arg2, arg3, arg4, arg5 string) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DownloadFile", arg0, arg1, arg2, arg3, arg4, arg5) - ret0, _ := ret[0].(io.Reader) + ret0, _ := ret[0].(io.ReadCloser) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/storage/minio/artifacts_storage.go b/pkg/storage/minio/artifacts_storage.go index 36b3ec99cff..5a617273e7a 100644 --- a/pkg/storage/minio/artifacts_storage.go +++ b/pkg/storage/minio/artifacts_storage.go @@ -23,7 +23,7 @@ func (c *ArtifactClient) ListFiles(ctx context.Context, executionId, testName, t } // DownloadFile downloads file from bucket from the config -func (c *ArtifactClient) DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.Reader, error) { +func (c *ArtifactClient) DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) { return c.client.DownloadFile(ctx, executionId, file) } From 302165b3bafcd8ebb483863fe0b263a99631b6a2 Mon Sep 17 00:00:00 2001 From: Kubeshop <174873053+ed382@users.noreply.github.com> Date: Wed, 21 Aug 2024 17:50:57 +0200 Subject: [PATCH 2/2] fix: kubepug integration test --- .../kubepug/pkg/runner/runner_test.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/contrib/executor/kubepug/pkg/runner/runner_test.go b/contrib/executor/kubepug/pkg/runner/runner_test.go index aac948245d9..39034a34888 100644 --- a/contrib/executor/kubepug/pkg/runner/runner_test.go +++ b/contrib/executor/kubepug/pkg/runner/runner_test.go @@ -14,6 +14,8 @@ import ( "github.com/kubeshop/testkube/pkg/utils/test" ) +const validateAgainstKubernetesVersion string = "v1.28.3" + func TestRunString_Integration(t *testing.T) { test.IntegrationTest(t) t.Parallel() @@ -40,6 +42,8 @@ func TestRunString_Integration(t *testing.T) { execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -85,6 +89,8 @@ metadata: execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -173,6 +179,8 @@ spec: execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -208,6 +216,8 @@ func TestRunFileURI_Integration(t *testing.T) { execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -244,6 +254,8 @@ func TestRunFileURI_Integration(t *testing.T) { execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -287,6 +299,8 @@ func TestRunGitFile_Integration(t *testing.T) { execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -323,6 +337,8 @@ func TestRunGitFile_Integration(t *testing.T) { execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -370,6 +386,8 @@ func TestRunGitDirectory_Integration(t *testing.T) { execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -478,6 +496,8 @@ spec: execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", } @@ -566,6 +586,8 @@ spec: execution.Command = []string{"kubepug"} execution.Args = []string{ "--format=json", + "--k8s-version", + validateAgainstKubernetesVersion, "--input-file", "", "--k8s-version=v1.7.0", // last version apps/v1beta2/Deployment was valid