Skip to content

Commit

Permalink
ccl/sqlproxyccl: add partial connection migration support to the forw…
Browse files Browse the repository at this point in the history
…arder

Informs #76000.

This commit adds connection migration support (without session deserialization)
to the forwarder within sqlproxy. The idea is as described in the RFC. The
session deserialization part will come in a follow-up PR.

One item that was overlooked is that connections that were obtained through
token-based authentication will still send back the initial connection data
(e.g. AuthenticationOk and ParameterStatus messages). To address that, we
added a new slurpInitialConnData function which will be used by the connector
when opening such a connection to slurp all unused messages. This ensures that
connections returned by connector.OpenTenantConnWithToken are ready to accept
queries.

Release note: None
  • Loading branch information
jaylim-crl committed Feb 25, 2022
1 parent bc8cbe6 commit 87b8ea5
Show file tree
Hide file tree
Showing 10 changed files with 2,350 additions and 49 deletions.
40 changes: 40 additions & 0 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,43 @@ var authenticate = func(clientConn, crdbConn net.Conn, throttleHook func(throttl
}
return newErrorf(codeBackendDisconnected, "authentication took more than %d iterations", i)
}

// slurpInitialConnData discards the initial connection data until we get a
// ReadyForQuery message. This will return an error if authentication failed.
// This will be used for the token-based authentication during connection
// migration.
var slurpInitialConnData = func(crdbConn net.Conn) error {
// Use pgproto3 directly for now even though there is an internal buffer
// within the chunkreader. This is fine since there won't be any other
// messages from the server once we receive the ReadyForQuery message.
//
// TODO(jaylim-crl): Refactor this and the authenticator logic to use
// interceptors directly.
be := pgproto3.NewFrontend(pgproto3.NewChunkReader(crdbConn), crdbConn)

// The auth step should require only a few back and forths so 20 iterations
// should be enough.
var i int
for ; i < 20; i++ {
backendMsg, err := be.Receive()
if err != nil {
return newErrorf(codeBackendReadFailed, "unable to receive message from backend: %v", err)
}

switch tp := backendMsg.(type) {
case *pgproto3.AuthenticationOk, *pgproto3.ParameterStatus, *pgproto3.BackendKeyData:
// Do nothing.

case *pgproto3.ErrorResponse:
return newErrorf(codeAuthFailed, "authentication failed: %s", tp.Message)

case *pgproto3.ReadyForQuery:
return nil

default:
return newErrorf(codeBackendDisconnected, "received unexpected backend message type: %v", tp)
}
}

return newErrorf(codeBackendDisconnected, "authentication took more than %d iterations", i)
}
57 changes: 57 additions & 0 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,60 @@ func TestAuthenticateUnexpectedMessage(t *testing.T) {
require.True(t, errors.As(err, &codeErr))
require.Equal(t, codeBackendDisconnected, codeErr.code)
}

func TestSlurpInitialConnData(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("unexpected message", func(t *testing.T) {
cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)

go func() {
err := be.Send(&pgproto3.BindComplete{})
require.NoError(t, err)
}()

err := slurpInitialConnData(cli)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
require.Equal(t, codeBackendDisconnected, codeErr.code)
})

t.Run("error_response", func(t *testing.T) {
cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)

go func() {
err := be.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"})
require.NoError(t, err)
}()

err := slurpInitialConnData(cli)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
require.Equal(t, codeAuthFailed, codeErr.code)
})

t.Run("successful", func(t *testing.T) {
cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)

go func() {
err := be.Send(&pgproto3.AuthenticationOk{})
require.NoError(t, err)

err = be.Send(&pgproto3.ParameterStatus{Name: "Server Version", Value: "1.3"})
require.NoError(t, err)

err = be.Send(&pgproto3.BackendKeyData{ProcessID: uint32(42)})
require.NoError(t, err)

err = be.Send(&pgproto3.ReadyForQuery{})
require.NoError(t, err)
}()

require.NoError(t, slurpInitialConnData(cli))
})
}
Loading

0 comments on commit 87b8ea5

Please sign in to comment.