Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/pgwire: send placeholder BackendKeyData #60281

Merged
merged 1 commit into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func authenticate(clientConn, crdbConn net.Conn) error {
case *pgproto3.ParameterStatus:
// Server sent status message; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.BackendKeyData:
// Server sent backend key data; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.ErrorResponse:
// Server has rejected the authentication response from the client and
// has closed the connection.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func TestAuthenticateUnexpectedMessage(t *testing.T) {
fe := pgproto3.NewFrontend(pgproto3.NewChunkReader(cli), cli)

go func() {
err := be.Send(&pgproto3.BackendKeyData{})
err := be.Send(&pgproto3.BindComplete{})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.BackendKeyData{})
require.Equal(t, beMsg, &pgproto3.BindComplete{})
}()

err := authenticate(srv, cli)
Expand Down
33 changes: 26 additions & 7 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,8 @@ func (c *conn) serveImpl(
dummyCh := make(chan error)
close(dummyCh)
procCh = dummyCh
// An initial readyForQuery message is part of the handshake.
c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)
c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))
if err := c.msgBuilder.finishMsg(c.conn); err != nil {

if err := c.sendReadyForQuery(); err != nil {
reserved.Close(ctx)
return
}
Expand Down Expand Up @@ -688,14 +686,35 @@ func (c *conn) sendInitialConnData(
if err := c.sendParamStatus("is_superuser", superUserVal); err != nil {
return sql.ConnectionHandler{}, err
}
if err := c.sendReadyForQuery(); err != nil {
return sql.ConnectionHandler{}, err
}
return connHandler, nil
}

// An initial readyForQuery message is part of the handshake.
// sendReadyForQuery sends the final messages of the connection handshake.
// This includes a placeholder BackendKeyData message and a ServerMsgReady
// message indicating that there is no active transaction.
func (c *conn) sendReadyForQuery() error {
// Send the client a dummy BackendKeyData message. This is necessary for
// compatibility with tools that require this message. This information is
// normally used by clients to send a CancelRequest message:
// https://www.postgresql.org/docs/9.6/static/protocol-flow.html#AEN112861
// CockroachDB currently ignores all CancelRequests.
c.msgBuilder.initMsg(pgwirebase.ServerMsgBackendKeyData)
c.msgBuilder.putInt32(0)
c.msgBuilder.putInt32(0)
if err := c.msgBuilder.finishMsg(c.conn); err != nil {
return err
}

// An initial ServerMsgReady message is part of the handshake.
c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)
c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))
if err := c.msgBuilder.finishMsg(c.conn); err != nil {
return sql.ConnectionHandler{}, err
return err
}
return connHandler, nil
return nil
}

// An error is returned iff the statement buffer has been closed. In that case,
Expand Down
44 changes: 44 additions & 0 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,3 +1521,47 @@ func TestSetSessionArguments(t *testing.T) {
t.Fatal(err)
}
}

// TestCancelQuery uses the pgwire-level query cancellation protocol provided
// by lib/pq to make sure that canceling a query has no effect, and makes sure
// the dummy BackendKeyData does not cause problems.
func TestCancelQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

cancelCtx, cancel := context.WithCancel(context.Background())
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
BeforeExecute: func(ctx context.Context, stmt string) {
if strings.Contains(stmt, "pg_sleep") {
cancel()
}
},
},
},
}
s, _, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(cancelCtx)

pgURL, cleanupFunc := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "TestCancelQuery" /* prefix */, url.User(security.RootUser),
)
defer cleanupFunc()

db, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer db.Close()

// Cancellation has no effect on ongoing query.
if _, err := db.QueryContext(cancelCtx, "select pg_sleep(0)"); err != nil {
t.Fatalf("unexpected error: %s", err)
}

// Context is already canceled, so error should come before execution.
if _, err := db.QueryContext(cancelCtx, "select 1"); err == nil {
t.Fatal("expected error")
} else if err.Error() != "context canceled" {
t.Fatalf("unexpected error: %s", err)
}
}
1 change: 1 addition & 0 deletions pkg/sql/pgwire/pgwirebase/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
ClientMsgTerminate ClientMessageType = 'X'

ServerMsgAuth ServerMessageType = 'R'
ServerMsgBackendKeyData ServerMessageType = 'K'
ServerMsgBindComplete ServerMessageType = '2'
ServerMsgCommandComplete ServerMessageType = 'C'
ServerMsgCloseComplete ServerMessageType = '3'
Expand Down
28 changes: 16 additions & 12 deletions pkg/sql/pgwire/pgwirebase/servermessagetype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/testutils/pgtest/pgtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,22 @@ func NewPGTest(ctx context.Context, addr, user string) (*PGTest, error) {
}
msgs, err := p.Until(false /* keepErrMsg */, &pgproto3.ReadyForQuery{})
foundCrdb := false
var backendKeyData *pgproto3.BackendKeyData
for _, msg := range msgs {
if s, ok := msg.(*pgproto3.ParameterStatus); ok && s.Name == "crdb_version" {
foundCrdb = true
}
if d, ok := msg.(*pgproto3.BackendKeyData); ok {
// We inspect the BackendKeyData outside of the loop since we only
// want to do the assertions if foundCrdb==true.
backendKeyData = d
}
}
if backendKeyData == nil {
return nil, errors.Errorf("did not receive BackendKeyData")
}
if foundCrdb && (backendKeyData.ProcessID != 0 || backendKeyData.SecretKey != 0) {
return nil, errors.Errorf("unexpected BackendKeyData: %+v", d)
}
p.isCockroachDB = foundCrdb
success = err == nil
Expand Down