Skip to content

Commit

Permalink
sql/pgwire: handle broken connection better in processing loop
Browse files Browse the repository at this point in the history
Broken connections were not beinf detected correctly. Now, netutil
checks for all net.Errors more broadly. There's also a defensive check
in the pgwire loop so that if an error is mis-classified, we bail out
after a large number of repeated errors.

Release note (bug fix): Previously, some instances of a broken client
connection could cause an infinite loop while processing commands from
the client. This is fixed now.
  • Loading branch information
rafiss committed Oct 26, 2021
1 parent fb7bbfc commit 39067de
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 8 deletions.
17 changes: 13 additions & 4 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func (c *conn) serveImpl(
}

var terminateSeen bool

var authDone, ignoreUntilSync bool
var repeatedErrorCount int
for {
breakLoop, err := func() (bool, error) {
typ, n, err := c.readBuf.ReadTypedMsg(&c.rd)
Expand Down Expand Up @@ -482,11 +482,20 @@ func (c *conn) serveImpl(
if err != nil {
log.VEventf(ctx, 1, "pgwire: error processing message: %s", err)
ignoreUntilSync = true
// If we can't read data because the connection was closed or the context
// was canceled (e.g. during authentication), then we should break.
if netutil.IsClosedConnection(err) || errors.Is(err, context.Canceled) {
repeatedErrorCount++
const maxRepeatedErrorCount = 1 << 15
// If we can't read data because of any one of the following conditions,
// then we should break:
// 1. the connection was closed.
// 2. the context was canceled (e.g. during authentication).
// 3. we reached an arbitrary threshold of repeated errors.
if netutil.IsClosedConnection(err) ||
errors.Is(err, context.Canceled) ||
repeatedErrorCount > maxRepeatedErrorCount {
break
}
} else {
repeatedErrorCount = 0
}
if breakLoop {
break
Expand Down
9 changes: 8 additions & 1 deletion pkg/util/netutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ go_library(

go_test(
name = "netutil_test",
srcs = ["srv_test.go"],
srcs = [
"net_test.go",
"srv_test.go",
],
embed = [":netutil"],
deps = [
"//pkg/util/contextutil",
"@com_github_cockroachdb_cmux//:cmux",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
)
9 changes: 6 additions & 3 deletions pkg/util/netutil/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ func (s *Server) ServeWith(
}
}

// IsClosedConnection returns true if err is cmux.ErrListenerClosed,
// grpc.ErrServerStopped, io.EOF, or the net package's errClosed.
// IsClosedConnection returns true if err is a non-temporary net.Error or is
// cmux.ErrListenerClosed, grpc.ErrServerStopped, io.EOF, or net.ErrClosed.
func IsClosedConnection(err error) bool {
return errors.IsAny(err, cmux.ErrListenerClosed, grpc.ErrServerStopped, io.EOF) ||
if netError := net.Error(nil); errors.As(err, &netError) {
return !netError.Temporary()
}
return errors.IsAny(err, cmux.ErrListenerClosed, grpc.ErrServerStopped, io.EOF, net.ErrClosed) ||
strings.Contains(err.Error(), "use of closed network connection")
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/util/netutil/net_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package netutil

import (
"fmt"
"io"
"net"
"syscall"
"testing"

"github.com/cockroachdb/cmux"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

func TestIsClosedConnection(t *testing.T) {
for _, tc := range []struct {
err error
isClosedError bool
}{
{
fmt.Errorf("an error"),
false,
},
{
net.ErrClosed,
true,
},
{
cmux.ErrListenerClosed,
true,
},
{
grpc.ErrServerStopped,
true,
},
{
io.EOF,
true,
},
{
// TODO(rafi): should this be treated the same as EOF?
io.ErrUnexpectedEOF,
false,
},
{
&net.AddrError{Err: "addr", Addr: "err"},
true,
},
{
syscall.ECONNRESET,
true,
},
{
syscall.EADDRINUSE,
true,
},
{
syscall.ECONNABORTED,
true,
},
{
syscall.ECONNREFUSED,
true,
},
{
syscall.EBADMSG,
true,
},
{
syscall.EINTR,
false,
},
{
&contextutil.TimeoutError{},
false,
},
} {
assert.Equalf(t, tc.isClosedError, IsClosedConnection(tc.err),
"expected %q to be evaluated as %v", tc.err, tc.isClosedError,
)
}
}

0 comments on commit 39067de

Please sign in to comment.