Skip to content

Commit

Permalink
sqlproxyccl: update query cancel key during session migration
Browse files Browse the repository at this point in the history
This requires protecting certain fields using a mutex.

Release note: None
  • Loading branch information
rafiss committed Aug 5, 2022
1 parent 9942243 commit 043ea4a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 14 deletions.
19 changes: 11 additions & 8 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,37 +165,40 @@ var authenticate = func(
// we should merge them back in the future. Instead of having the writer as the
// other end, the writer should be the same connection. That way, a
// sqlproxyccl.Conn can be used to read-from, or write-to the same component.
var readTokenAuthResult = func(conn net.Conn) error {
var readTokenAuthResult = func(conn net.Conn) (*pgproto3.BackendKeyData, error) {
// This interceptor is discarded once this function returns. Just like
// pgproto3.NewFrontend, this serverConn object has an internal buffer.
// Discarding the buffer is fine since there won't be any other messages
// from the server once we receive the ReadyForQuery message because the
// caller (i.e. proxy) does not forward client messages until then.
serverConn := interceptor.NewFrontendConn(conn)

var backendKeyData *pgproto3.BackendKeyData
// The auth step should require only a few back and forths so 20 iterations
// should be enough.
var i int
for ; i < 20; i++ {
backendMsg, err := serverConn.ReadMsg()
if err != nil {
return newErrorf(codeBackendReadFailed, "unable to receive message from backend: %v", err)
return nil, newErrorf(codeBackendReadFailed, "unable to receive message from backend: %v", err)
}

switch tp := backendMsg.(type) {
case *pgproto3.AuthenticationOk, *pgproto3.ParameterStatus, *pgproto3.BackendKeyData:
// Do nothing.
case *pgproto3.AuthenticationOk, *pgproto3.ParameterStatus:
// Do nothing.
case *pgproto3.BackendKeyData:
backendKeyData = tp

case *pgproto3.ErrorResponse:
return newErrorf(codeAuthFailed, "authentication failed: %s", tp.Message)
return nil, newErrorf(codeAuthFailed, "authentication failed: %s", tp.Message)

case *pgproto3.ReadyForQuery:
return nil
return backendKeyData, nil

default:
return newErrorf(codeBackendDisconnected, "received unexpected backend message type: %v", tp)
return nil, newErrorf(codeBackendDisconnected, "received unexpected backend message type: %v", tp)
}
}

return newErrorf(codeBackendDisconnected, "authentication took more than %d iterations", i)
return nil, newErrorf(codeBackendDisconnected, "authentication took more than %d iterations", i)
}
11 changes: 7 additions & 4 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestReadTokenAuthResult(t *testing.T) {
require.NoError(t, err)
}()

err := readTokenAuthResult(cli)
_, err := readTokenAuthResult(cli)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
Expand All @@ -243,7 +243,7 @@ func TestReadTokenAuthResult(t *testing.T) {
require.NoError(t, err)
}()

err := readTokenAuthResult(cli)
_, err := readTokenAuthResult(cli)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
Expand All @@ -252,6 +252,7 @@ func TestReadTokenAuthResult(t *testing.T) {

t.Run("successful", func(t *testing.T) {
cli, srv := net.Pipe()
crdbBackendKeyData := &pgproto3.BackendKeyData{ProcessID: 42, SecretKey: 99}

go func() {
_, err := srv.Write((&pgproto3.AuthenticationOk{}).Encode(nil))
Expand All @@ -260,13 +261,15 @@ func TestReadTokenAuthResult(t *testing.T) {
_, err = srv.Write((&pgproto3.ParameterStatus{Name: "Server Version", Value: "1.3"}).Encode(nil))
require.NoError(t, err)

_, err = srv.Write((&pgproto3.BackendKeyData{ProcessID: uint32(42)}).Encode(nil))
_, err = srv.Write(crdbBackendKeyData.Encode(nil))
require.NoError(t, err)

_, err = srv.Write((&pgproto3.ReadyForQuery{}).Encode(nil))
require.NoError(t, err)
}()

require.NoError(t, readTokenAuthResult(cli))
receivedCrdbBackendKeyData, err := readTokenAuthResult(cli)
require.NoError(t, err)
require.Equal(t, crdbBackendKeyData, receivedCrdbBackendKeyData)
})
}
4 changes: 3 additions & 1 deletion pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ func (c *connector) OpenTenantConnWithToken(
// only return once we've seen a ReadyForQuery message.
//
// NOTE: This will need to be updated when we implement query cancellation.
if err := readTokenAuthResult(serverConn); err != nil {
newBackendKeyData, err := readTokenAuthResult(serverConn)
if err != nil {
return nil, err
}
c.CancelInfo.setNewBackend(newBackendKeyData, serverConn.RemoteAddr().(*net.TCPAddr))
log.Infof(ctx, "connected to %s through token-based auth", serverConn.RemoteAddr())
return serverConn, nil
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
}

// Cancel requests are sent on a separate connection, and have no response,
// so we can close the connection immediately after handling them.
// so we can close the connection immediately, then handle the request. This
// prevents the client from using latency to learn if we are processing the
// request or not.
if cr := fe.CancelRequest; cr != nil {
_ = incomingConn.Close()
if err := handler.handleCancelRequest(cr, true /* allowForward */); err != nil {
Expand Down Expand Up @@ -435,6 +437,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
connBegin := timeutil.Now()
defer func() {
log.Infof(ctx, "closing after %.2fs", timeutil.Since(connBegin).Seconds())
handler.cancelInfoMap.deleteCancelInfo(connector.CancelInfo.proxySecretID())
}()

// Wrap the client connection with an error annotater. WARNING: The TLS
Expand Down

0 comments on commit 043ea4a

Please sign in to comment.