diff --git a/cmd/kes/update.go b/cmd/kes/update.go index 2ba846c5..3893a263 100644 --- a/cmd/kes/update.go +++ b/cmd/kes/update.go @@ -138,7 +138,7 @@ func updateCmd(args []string) { if err != nil { cli.Fatalf("failed to download KES release information: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) var response map[string]any if err = json.NewDecoder(mem.LimitReader(resp.Body, MaxBody)).Decode(&response); err != nil { @@ -195,7 +195,7 @@ func updateCmd(args []string) { if err != nil { cli.Fatalf("failed to download minisign signature: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) bytes, err := io.ReadAll(io.LimitReader(resp.Body, int64(1*mem.MB))) if err != nil { @@ -218,7 +218,7 @@ func updateCmd(args []string) { if err != nil { cli.Fatalf("failed to download binary: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) // If the outputFile does not exist we create an empty // one such that selfupdate can do a successful rename diff --git a/internal/http/close.go b/internal/http/close.go new file mode 100644 index 00000000..ac336375 --- /dev/null +++ b/internal/http/close.go @@ -0,0 +1,29 @@ +// Copyright 2024 - MinIO, Inc. All rights reserved. +// Use of this source code is governed by the AGPLv3 +// license that can be found in the LICENSE file. + +package http + +import ( + "io" +) + +// DrainBody close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to reuse the same connection for future requests. +func DrainBody(respBody io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to reuse a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if respBody != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + io.Copy(io.Discard, respBody) + respBody.Close() + } +} diff --git a/internal/http/retry.go b/internal/http/retry.go index fa7ae9d3..52098742 100644 --- a/internal/http/retry.go +++ b/internal/http/retry.go @@ -188,6 +188,11 @@ func (r *Retry) Do(req *http.Request) (*http.Response, error) { resp, err := r.Client.Do(req) for N > 0 && (isTemporary(err) || (resp != nil && resp.StatusCode >= http.StatusInternalServerError)) { + if resp != nil { + DrainBody(resp.Body) + resp = nil + } + N-- var delay time.Duration switch { @@ -222,6 +227,10 @@ func (r *Retry) Do(req *http.Request) (*http.Response, error) { resp, err = r.Client.Do(req) // Now, retry. } if isTemporary(err) { + if resp != nil { + DrainBody(resp.Body) + resp = nil + } // If the request still fails with a temporary error // we wrap the error to provide more information to the // caller. diff --git a/internal/keystore/aws/secrets-manager.go b/internal/keystore/aws/secrets-manager.go index 96a9c0a9..ed5e3638 100644 --- a/internal/keystore/aws/secrets-manager.go +++ b/internal/keystore/aws/secrets-manager.go @@ -17,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/secretsmanager" "github.com/minio/kes" + xhttp "github.com/minio/kes/internal/http" "github.com/minio/kes/internal/keystore" kesdk "github.com/minio/kms-go/kes" ) @@ -116,7 +117,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), diff --git a/internal/keystore/azure/key-vault.go b/internal/keystore/azure/key-vault.go index 654d0a23..d9704eca 100644 --- a/internal/keystore/azure/key-vault.go +++ b/internal/keystore/azure/key-vault.go @@ -16,6 +16,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azsecrets" "github.com/minio/kes" + xhttp "github.com/minio/kes/internal/http" "github.com/minio/kes/internal/keystore" kesdk "github.com/minio/kms-go/kes" ) @@ -63,7 +64,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), diff --git a/internal/keystore/entrust/keycontrol.go b/internal/keystore/entrust/keycontrol.go index a5d06a17..33a621c3 100644 --- a/internal/keystore/entrust/keycontrol.go +++ b/internal/keystore/entrust/keycontrol.go @@ -148,6 +148,7 @@ func (kc *KeyControl) Status(ctx context.Context) (kes.KeyStoreState, error) { Err: fmt.Errorf("keycontrol: failed to fetch status: %v", err), } } + defer xhttp.DrainBody(resp.Body) latency := time.Since(start) if resp.StatusCode != http.StatusOK { @@ -247,7 +248,7 @@ func (kc *KeyControl) Get(ctx context.Context, name string) ([]byte, error) { if err != nil { return nil, fmt.Errorf("keycontrol: failed to fetch key: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return nil, parseErrorResponse(resp) @@ -295,6 +296,7 @@ func (kc *KeyControl) Delete(ctx context.Context, name string) error { if err != nil { return fmt.Errorf("keycontrol: failed to delete key: %v", err) } + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return parseErrorResponse(resp) } @@ -380,6 +382,7 @@ func (kc *KeyControl) List(ctx context.Context, prefix string, n int) ([]string, if err != nil { return nil, "", fmt.Errorf("keycontrol: failed to list keys: %v", err) } + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return nil, "", parseErrorResponse(resp) } @@ -491,7 +494,7 @@ func login(ctx context.Context, rt http.RoundTripper, endpoint, vaultID, usernam if err != nil { return "", time.Time{}, err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return "", time.Time{}, parseErrorResponse(resp) @@ -539,7 +542,7 @@ func renewToken(ctx context.Context, rt http.RoundTripper, endpoint, token strin if err != nil { return "", time.Time{}, err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return "", time.Time{}, parseErrorResponse(resp) diff --git a/internal/keystore/fortanix/keystore.go b/internal/keystore/fortanix/keystore.go index 145ee3b0..358f4036 100644 --- a/internal/keystore/fortanix/keystore.go +++ b/internal/keystore/fortanix/keystore.go @@ -193,7 +193,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), @@ -476,7 +476,7 @@ func parseErrorResponse(resp *http.Response) error { if resp.Body == nil { return kesdk.NewError(resp.StatusCode, resp.Status) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) const MaxSize = 1 * mem.MiB size := mem.Size(resp.ContentLength) diff --git a/internal/keystore/gcp/secret-manager.go b/internal/keystore/gcp/secret-manager.go index b7a14609..5cb25b32 100644 --- a/internal/keystore/gcp/secret-manager.go +++ b/internal/keystore/gcp/secret-manager.go @@ -13,6 +13,7 @@ import ( "time" "github.com/minio/kes" + xhttp "github.com/minio/kes/internal/http" "github.com/minio/kes/internal/keystore" kesdk "github.com/minio/kms-go/kes" gcpiterator "google.golang.org/api/iterator" @@ -116,7 +117,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), diff --git a/internal/keystore/gemalto/client.go b/internal/keystore/gemalto/client.go index 41bc025e..79fd50c0 100644 --- a/internal/keystore/gemalto/client.go +++ b/internal/keystore/gemalto/client.go @@ -78,7 +78,7 @@ func (c *client) Authenticate(ctx context.Context, endpoint string, login Creden if err != nil { return err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { response, err := parseServerError(resp) diff --git a/internal/keystore/gemalto/key-secure.go b/internal/keystore/gemalto/key-secure.go index feae5159..b530513f 100644 --- a/internal/keystore/gemalto/key-secure.go +++ b/internal/keystore/gemalto/key-secure.go @@ -126,7 +126,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), @@ -167,7 +167,7 @@ func (s *Store) Create(ctx context.Context, name string, value []byte) error { if err != nil { return fmt.Errorf("gemalto: failed to create key '%s': %v", name, err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode == http.StatusConflict { return kesdk.ErrKeyExists @@ -210,7 +210,7 @@ func (s *Store) Get(ctx context.Context, name string) ([]byte, error) { if err != nil { return nil, fmt.Errorf("gemalto: failed to access key '%s': %v", name, err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode == http.StatusNotFound { return nil, kesdk.ErrKeyNotFound @@ -250,7 +250,7 @@ func (s *Store) Delete(ctx context.Context, name string) error { if err != nil { return fmt.Errorf("gemalto: failed to delete key '%s': %v", name, err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound { // BUG(aead): The KeySecure server returns 404 NotFound if the @@ -320,7 +320,7 @@ func (s *Store) List(ctx context.Context, prefix string, n int) ([]string, strin if err != nil { return nil, "", err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { response, err := parseServerError(resp)