Skip to content

Commit

Permalink
entrust: Close body to avoid some conn leaks (#485)
Browse files Browse the repository at this point in the history
Also ensure that we drain the body before closing to it to reuse the
existing connections when possible.

Co-authored-by: Anis Eleuch <[email protected]>
  • Loading branch information
vadmeste and Anis Eleuch authored Sep 3, 2024
1 parent c07d23a commit 1da59a0
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 17 deletions.
6 changes: 3 additions & 3 deletions cmd/kes/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func updateCmd(args []string) {
if err != nil {
cli.Fatalf("failed to download KES release information: %v", err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

var response map[string]any
if err = json.NewDecoder(mem.LimitReader(resp.Body, MaxBody)).Decode(&response); err != nil {
Expand Down Expand Up @@ -195,7 +195,7 @@ func updateCmd(args []string) {
if err != nil {
cli.Fatalf("failed to download minisign signature: %v", err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

bytes, err := io.ReadAll(io.LimitReader(resp.Body, int64(1*mem.MB)))
if err != nil {
Expand All @@ -218,7 +218,7 @@ func updateCmd(args []string) {
if err != nil {
cli.Fatalf("failed to download binary: %v", err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

// If the outputFile does not exist we create an empty
// one such that selfupdate can do a successful rename
Expand Down
29 changes: 29 additions & 0 deletions internal/http/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 - MinIO, Inc. All rights reserved.
// Use of this source code is governed by the AGPLv3
// license that can be found in the LICENSE file.

package http

import (
"io"
)

// DrainBody close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to reuse the same connection for future requests.
func DrainBody(respBody io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to reuse a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if respBody != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
io.Copy(io.Discard, respBody)
respBody.Close()
}
}
9 changes: 9 additions & 0 deletions internal/http/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func (r *Retry) Do(req *http.Request) (*http.Response, error) {

resp, err := r.Client.Do(req)
for N > 0 && (isTemporary(err) || (resp != nil && resp.StatusCode >= http.StatusInternalServerError)) {
if resp != nil {
DrainBody(resp.Body)
resp = nil
}

N--
var delay time.Duration
switch {
Expand Down Expand Up @@ -222,6 +227,10 @@ func (r *Retry) Do(req *http.Request) (*http.Response, error) {
resp, err = r.Client.Do(req) // Now, retry.
}
if isTemporary(err) {
if resp != nil {
DrainBody(resp.Body)
resp = nil
}
// If the request still fails with a temporary error
// we wrap the error to provide more information to the
// caller.
Expand Down
3 changes: 2 additions & 1 deletion internal/keystore/aws/secrets-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/minio/kes"
xhttp "github.com/minio/kes/internal/http"
"github.com/minio/kes/internal/keystore"
kesdk "github.com/minio/kms-go/kes"
)
Expand Down Expand Up @@ -116,7 +117,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) {
if err != nil {
return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err}
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

return kes.KeyStoreState{
Latency: time.Since(start),
Expand Down
3 changes: 2 additions & 1 deletion internal/keystore/azure/key-vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azsecrets"
"github.com/minio/kes"
xhttp "github.com/minio/kes/internal/http"
"github.com/minio/kes/internal/keystore"
kesdk "github.com/minio/kms-go/kes"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) {
if err != nil {
return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err}
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

return kes.KeyStoreState{
Latency: time.Since(start),
Expand Down
9 changes: 6 additions & 3 deletions internal/keystore/entrust/keycontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (kc *KeyControl) Status(ctx context.Context) (kes.KeyStoreState, error) {
Err: fmt.Errorf("keycontrol: failed to fetch status: %v", err),
}
}
defer xhttp.DrainBody(resp.Body)
latency := time.Since(start)

if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -247,7 +248,7 @@ func (kc *KeyControl) Get(ctx context.Context, name string) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("keycontrol: failed to fetch key: %v", err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode != http.StatusOK {
return nil, parseErrorResponse(resp)
Expand Down Expand Up @@ -295,6 +296,7 @@ func (kc *KeyControl) Delete(ctx context.Context, name string) error {
if err != nil {
return fmt.Errorf("keycontrol: failed to delete key: %v", err)
}
defer xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK {
return parseErrorResponse(resp)
}
Expand Down Expand Up @@ -380,6 +382,7 @@ func (kc *KeyControl) List(ctx context.Context, prefix string, n int) ([]string,
if err != nil {
return nil, "", fmt.Errorf("keycontrol: failed to list keys: %v", err)
}
defer xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, "", parseErrorResponse(resp)
}
Expand Down Expand Up @@ -491,7 +494,7 @@ func login(ctx context.Context, rt http.RoundTripper, endpoint, vaultID, usernam
if err != nil {
return "", time.Time{}, err
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode != http.StatusOK {
return "", time.Time{}, parseErrorResponse(resp)
Expand Down Expand Up @@ -539,7 +542,7 @@ func renewToken(ctx context.Context, rt http.RoundTripper, endpoint, token strin
if err != nil {
return "", time.Time{}, err
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode != http.StatusOK {
return "", time.Time{}, parseErrorResponse(resp)
Expand Down
4 changes: 2 additions & 2 deletions internal/keystore/fortanix/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) {
if err != nil {
return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err}
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

return kes.KeyStoreState{
Latency: time.Since(start),
Expand Down Expand Up @@ -476,7 +476,7 @@ func parseErrorResponse(resp *http.Response) error {
if resp.Body == nil {
return kesdk.NewError(resp.StatusCode, resp.Status)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

const MaxSize = 1 * mem.MiB
size := mem.Size(resp.ContentLength)
Expand Down
3 changes: 2 additions & 1 deletion internal/keystore/gcp/secret-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/minio/kes"
xhttp "github.com/minio/kes/internal/http"
"github.com/minio/kes/internal/keystore"
kesdk "github.com/minio/kms-go/kes"
gcpiterator "google.golang.org/api/iterator"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) {
if err != nil {
return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err}
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

return kes.KeyStoreState{
Latency: time.Since(start),
Expand Down
2 changes: 1 addition & 1 deletion internal/keystore/gemalto/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *client) Authenticate(ctx context.Context, endpoint string, login Creden
if err != nil {
return err
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode != http.StatusOK {
response, err := parseServerError(resp)
Expand Down
10 changes: 5 additions & 5 deletions internal/keystore/gemalto/key-secure.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) {
if err != nil {
return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err}
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

return kes.KeyStoreState{
Latency: time.Since(start),
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *Store) Create(ctx context.Context, name string, value []byte) error {
if err != nil {
return fmt.Errorf("gemalto: failed to create key '%s': %v", name, err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode == http.StatusConflict {
return kesdk.ErrKeyExists
Expand Down Expand Up @@ -210,7 +210,7 @@ func (s *Store) Get(ctx context.Context, name string) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("gemalto: failed to access key '%s': %v", name, err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode == http.StatusNotFound {
return nil, kesdk.ErrKeyNotFound
Expand Down Expand Up @@ -250,7 +250,7 @@ func (s *Store) Delete(ctx context.Context, name string) error {
if err != nil {
return fmt.Errorf("gemalto: failed to delete key '%s': %v", name, err)
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
// BUG(aead): The KeySecure server returns 404 NotFound if the
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *Store) List(ctx context.Context, prefix string, n int) ([]string, strin
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
defer xhttp.DrainBody(resp.Body)

if resp.StatusCode != http.StatusOK {
response, err := parseServerError(resp)
Expand Down

0 comments on commit 1da59a0

Please sign in to comment.