Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: http/2 stream leaks #5775

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func NewInitCmdDemo() *cobra.Command {
if export {
valuesResp, err := http.Get(demoValuesUrl)
ui.ExitOnError("cannot fetch values", err)
defer valuesResp.Body.Close()
valuesBytes, err := io.ReadAll(valuesResp.Body)
ui.ExitOnError("cannot fetch values", err)
values := string(valuesBytes)
Expand Down
1 change: 1 addition & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (DebugTransport) RoundTrip(r *http.Request) (*http.Response, error) {
func proxyPass(res http.ResponseWriter, req *http.Request) {
fmt.Printf("\n-------------\n")
body, _ := io.ReadAll(req.Body)
req.Body.Close()
fmt.Printf("%s\n", body)

prefix := fmt.Sprintf("/api/v1/namespaces/%s/services/testkube-api-server:%d/proxy", *namespace, *apiPort)
Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/artifacts/cloud_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (d *cloudUploader) putObject(url string, path string, file io.Reader, size
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
b, _ := io.ReadAll(res.Body)
return errors.Errorf("failed saving file: status code: %d / message: %s", res.StatusCode, string(b))
Expand Down
7 changes: 6 additions & 1 deletion cmd/testworkflow-toolkit/commands/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewTarballCmd() *cobra.Command {
os.Exit(0)
}

for _, pair := range pairs {
processPair := func(pair string) {
dirPath, url, found := strings.Cut(pair, "=")
if !found {
ui.Fail(fmt.Errorf("invalid tarball pair: %s", pair))
Expand All @@ -33,6 +33,7 @@ func NewTarballCmd() *cobra.Command {
// Start downloading the file
resp, err := http.Get(url)
ui.ExitOnError("download the tarball", err)
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
ui.Fail(fmt.Errorf("failed to download the tarball: status code %d", resp.StatusCode))
Expand All @@ -44,6 +45,10 @@ func NewTarballCmd() *cobra.Command {
ui.Fail(err)
}
}

for _, pair := range pairs {
processPair(pair)
}
},
}

Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/commands/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewTransferCmd() *cobra.Command {
}()
resp, err := http.Post(url, "application/tar+gzip", reader)
ui.ExitOnError("send the tarball request", err)
_ = resp.Body.Close()

if resp.StatusCode != http.StatusNoContent {
ui.Fail(fmt.Errorf("failed to send the tarball: status code %d", resp.StatusCode))
Expand Down
4 changes: 1 addition & 3 deletions cmd/testworkflow-toolkit/common/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func WriteTarball(stream io.Writer, dirPath string, files []string) error {
return err
}

func UnpackTarball(dirPath string, stream io.ReadCloser) error {
defer stream.Close()

func UnpackTarball(dirPath string, stream io.Reader) error {
// Process the files
uncompressedStream, err := gzip.NewReader(stream)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions contrib/executor/kubepug/pkg/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/kubeshop/testkube/pkg/utils/test"
)

const validateAgainstKubernetesVersion string = "v1.28.3"

func TestRunString_Integration(t *testing.T) {
test.IntegrationTest(t)
t.Parallel()
Expand All @@ -40,6 +42,8 @@ func TestRunString_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -85,6 +89,8 @@ metadata:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -173,6 +179,8 @@ spec:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -208,6 +216,8 @@ func TestRunFileURI_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -244,6 +254,8 @@ func TestRunFileURI_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -287,6 +299,8 @@ func TestRunGitFile_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -323,6 +337,8 @@ func TestRunGitFile_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -370,6 +386,8 @@ func TestRunGitDirectory_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -478,6 +496,8 @@ spec:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -566,6 +586,8 @@ spec:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
"--k8s-version=v1.7.0", // last version apps/v1beta2/Deployment was valid
Expand Down
4 changes: 2 additions & 2 deletions internal/app/api/v1/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *TestkubeAPI) GetArtifactHandler() fiber.Handler {
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: db could not get execution result: %w", errPrefix, err))
}

var file io.Reader
var file io.ReadCloser
var bucket string
artifactsStorage := s.ArtifactsStorage
folder := execution.Id
Expand All @@ -451,8 +451,8 @@ func (s *TestkubeAPI) GetArtifactHandler() fiber.Handler {
if err != nil {
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not download file: %w", errPrefix, err))
}
defer file.Close()

// SendStream promises to close file using io.Close() method
return c.SendStream(file)
}
}
Expand Down
6 changes: 4 additions & 2 deletions internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,14 @@ func (s *TestkubeAPI) GetTestWorkflowExecutionLogsHandler() fiber.Handler {
return s.ClientError(c, "get execution", err)
}

reader, err := s.TestWorkflowOutput.ReadLog(ctx, executionID, execution.Workflow.Name)
rc, err := s.TestWorkflowOutput.ReadLog(ctx, executionID, execution.Workflow.Name)
if err != nil {
return s.InternalError(c, "can't get log", executionID, err)
}
defer rc.Close()

c.Context().SetContentType(mediaTypePlainText)
_, err = io.Copy(c.Response().BodyWriter(), reader)
_, err = io.Copy(c.Response().BodyWriter(), rc)
return err
}
}
Expand Down Expand Up @@ -407,6 +408,7 @@ func (s *TestkubeAPI) GetTestWorkflowArtifactHandler() fiber.Handler {
if err != nil {
return s.InternalError(c, errPrefix, "could not download file", err)
}
defer file.Close()

return c.SendStream(file)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1/client/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (c CopyFileDirectClient) UploadFile(parentName string, parentType TestingTy
if err != nil {
return err
}
defer resp.Body.Close()

if err = httpResponseError(resp); err != nil {
return fmt.Errorf("api %s returned error: %w", uri, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloud/client/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (c RESTClient[T]) List() ([]T, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()

var orgsResponse ListResponse[T]
err = json.NewDecoder(resp.Body).Decode(&orgsResponse)
Expand All @@ -52,6 +53,7 @@ func (c RESTClient[T]) Get(id string) (e T, err error) {
if err != nil {
return e, err
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
d, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -87,6 +89,7 @@ func (c RESTClient[T]) Create(entity T, overridePath ...string) (e T, err error)
if err != nil {
return e, err
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
d, err := io.ReadAll(resp.Body)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/data/artifact/artifacts_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c *CloudArtifactsStorage) ListFiles(ctx context.Context, executionID, test
return commandResponse.Artifacts, nil
}

func (c *CloudArtifactsStorage) DownloadFile(ctx context.Context, file, executionID, testName, testSuiteName, testWorkflowName string) (io.Reader, error) {
func (c *CloudArtifactsStorage) DownloadFile(ctx context.Context, file, executionID, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) {
req := DownloadFileRequest{
File: file,
ExecutionID: executionID,
Expand Down Expand Up @@ -73,7 +73,7 @@ func (c *CloudArtifactsStorage) DownloadArchive(ctx context.Context, executionID
return nil, errors.WithStack(ErrOperationNotSupported)
}

func (c *CloudArtifactsStorage) getObject(ctx context.Context, url string) (io.Reader, error) {
func (c *CloudArtifactsStorage) getObject(ctx context.Context, url string) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/data/artifact/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (u *CloudUploader) putObject(ctx context.Context, url string, object *scrap
if err != nil {
return errors.Wrap(err, "failed to send file to cloud")
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
return errors.Errorf("error getting file from presigned url: expected 200 OK response code, got %d", rsp.StatusCode)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/cloud/data/testworkflow/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ func (r *CloudOutputRepository) SaveLog(ctx context.Context, id, workflowName st
if err != nil {
return errors.Wrap(err, "failed to save file in cloud storage")
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return errors.Errorf("error saving file with presigned url: expected 200 OK response code, got %d", res.StatusCode)
}
return nil
}

// ReadLog streams the output from Cloud
func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) {
// ReadLog streams the output from Cloud.
// The caller is responsible for closing the stream.
func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) {
url, err := r.PresignReadLog(ctx, id, workflowName)
if err != nil {
return nil, err
Expand All @@ -99,8 +101,10 @@ func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName st
return nil, errors.Wrap(err, "failed to get file from cloud storage")
}
if res.StatusCode != http.StatusOK {
_ = res.Body.Close()
return nil, errors.Errorf("error getting file from presigned url: expected 200 OK response code, got %d", res.StatusCode)
}
// Caller must close this stream.
return res.Body, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/testworkflow/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type OutputRepository interface {
// SaveLog streams the output from the workflow to Minio
SaveLog(ctx context.Context, id, workflowName string, reader io.Reader) error
// ReadLog streams the output from Minio
ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error)
ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error)
// HasLog checks if there is an output in Minio
HasLog(ctx context.Context, id, workflowName string) (bool, error)

Expand Down
4 changes: 2 additions & 2 deletions pkg/repository/testworkflow/minio_output_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (m *MinioRepository) SaveLog(ctx context.Context, id, workflowName string,
return m.storage.UploadFileToBucket(ctx, m.bucket, bucketFolder, id, reader, -1)
}

func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) {
func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) {
file, _, err := m.storage.DownloadFileFromBucket(ctx, m.bucket, bucketFolder, id)
if err != nil {
return nil, err
}
return file, nil
return io.NopCloser(file), nil
}

func (m *MinioRepository) HasLog(ctx context.Context, id, workflowName string) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/repository/testworkflow/mock_output_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/storage/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ArtifactsStorage interface {
// ListFiles lists available files in the configured bucket
ListFiles(ctx context.Context, executionId, testName, testSuiteName, testWorkflowName string) ([]testkube.Artifact, error)
// DownloadFile downloads file from configured
DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.Reader, error)
DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error)
// DownloadArchive downloads archive from configured
DownloadArchive(ctx context.Context, executionId string, masks []string) (io.Reader, error)
// UploadFile uploads file to configured bucket
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/artifacts_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/storage/minio/artifacts_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *ArtifactClient) ListFiles(ctx context.Context, executionId, testName, t
}

// DownloadFile downloads file from bucket from the config
func (c *ArtifactClient) DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.Reader, error) {
func (c *ArtifactClient) DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) {
return c.client.DownloadFile(ctx, executionId, file)
}

Expand Down
Loading