Skip to content

Commit

Permalink
Merge #71940
Browse files Browse the repository at this point in the history
71940: sql/pgwire: handle broken connection better in processing loop r=otan,knz a=rafiss

Broken connections were not being detected correctly. Now, netutil
checks for all net.Errors more broadly. There's also a defensive check
in the pgwire loop so that if an error is mis-classified, we bail out
after a large number of repeated errors.

Release note (bug fix): Previously, some instances of a broken client
connection could cause an infinite loop while processing commands from
the client. This is fixed now.

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Oct 26, 2021
2 parents e4a177d + 39067de commit 7de965a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 8 deletions.
17 changes: 13 additions & 4 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func (c *conn) serveImpl(
}

var terminateSeen bool

var authDone, ignoreUntilSync bool
var repeatedErrorCount int
for {
breakLoop, err := func() (bool, error) {
typ, n, err := c.readBuf.ReadTypedMsg(&c.rd)
Expand Down Expand Up @@ -482,11 +482,20 @@ func (c *conn) serveImpl(
if err != nil {
log.VEventf(ctx, 1, "pgwire: error processing message: %s", err)
ignoreUntilSync = true
// If we can't read data because the connection was closed or the context
// was canceled (e.g. during authentication), then we should break.
if netutil.IsClosedConnection(err) || errors.Is(err, context.Canceled) {
repeatedErrorCount++
const maxRepeatedErrorCount = 1 << 15
// If we can't read data because of any one of the following conditions,
// then we should break:
// 1. the connection was closed.
// 2. the context was canceled (e.g. during authentication).
// 3. we reached an arbitrary threshold of repeated errors.
if netutil.IsClosedConnection(err) ||
errors.Is(err, context.Canceled) ||
repeatedErrorCount > maxRepeatedErrorCount {
break
}
} else {
repeatedErrorCount = 0
}
if breakLoop {
break
Expand Down
9 changes: 8 additions & 1 deletion pkg/util/netutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ go_library(

go_test(
name = "netutil_test",
srcs = ["srv_test.go"],
srcs = [
"net_test.go",
"srv_test.go",
],
embed = [":netutil"],
deps = [
"//pkg/util/contextutil",
"@com_github_cockroachdb_cmux//:cmux",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
)
9 changes: 6 additions & 3 deletions pkg/util/netutil/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ func (s *Server) ServeWith(
}
}

// IsClosedConnection returns true if err is cmux.ErrListenerClosed,
// grpc.ErrServerStopped, io.EOF, or the net package's errClosed.
// IsClosedConnection returns true if err is a non-temporary net.Error or is
// cmux.ErrListenerClosed, grpc.ErrServerStopped, io.EOF, or net.ErrClosed.
func IsClosedConnection(err error) bool {
return errors.IsAny(err, cmux.ErrListenerClosed, grpc.ErrServerStopped, io.EOF) ||
if netError := net.Error(nil); errors.As(err, &netError) {
return !netError.Temporary()
}
return errors.IsAny(err, cmux.ErrListenerClosed, grpc.ErrServerStopped, io.EOF, net.ErrClosed) ||
strings.Contains(err.Error(), "use of closed network connection")
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/util/netutil/net_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package netutil

import (
"fmt"
"io"
"net"
"syscall"
"testing"

"github.com/cockroachdb/cmux"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

func TestIsClosedConnection(t *testing.T) {
for _, tc := range []struct {
err error
isClosedError bool
}{
{
fmt.Errorf("an error"),
false,
},
{
net.ErrClosed,
true,
},
{
cmux.ErrListenerClosed,
true,
},
{
grpc.ErrServerStopped,
true,
},
{
io.EOF,
true,
},
{
// TODO(rafi): should this be treated the same as EOF?
io.ErrUnexpectedEOF,
false,
},
{
&net.AddrError{Err: "addr", Addr: "err"},
true,
},
{
syscall.ECONNRESET,
true,
},
{
syscall.EADDRINUSE,
true,
},
{
syscall.ECONNABORTED,
true,
},
{
syscall.ECONNREFUSED,
true,
},
{
syscall.EBADMSG,
true,
},
{
syscall.EINTR,
false,
},
{
&contextutil.TimeoutError{},
false,
},
} {
assert.Equalf(t, tc.isClosedError, IsClosedConnection(tc.err),
"expected %q to be evaluated as %v", tc.err, tc.isClosedError,
)
}
}

0 comments on commit 7de965a

Please sign in to comment.