Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
70722: sql: add assignment casts for INSERTs r=mgartner a=mgartner

#### sql: rename pkg/sql/sem/tree/casts.go to cast.go

Release note: None

#### tree: add OID->OID cast map

This commit adds a new map that describes valid casts from OID to OID.
It introduces three cast contexts: explicit casts, assignment casts, and
implicit casts. See the comments for CastContext and castMap for
details.

This map will enable us to properly follow Postgres's casting behavior.
Most immediately, it will allow us to support assignment casts.

Future work includes moving volatility in castMap. In the longer term,
cast functions can be moved into castMap as well.

Release note: None

#### sql: set "char" type width to 1

The `"char"` type is a special single-character type. This commit
adds a `types.QChar` with a width one. It removes the `types.MakeQChar`
function so that it is impossible to create `"char"` types with any
other width.

Release note: None

#### sql: add assignment casts for INSERTs

Casts in Postgres are performed in one of three contexts [1]:

  1. An explicit context with `CAST(x AS T)` or `x::T`.
  2. An assignment context performed implicitly during an INSERT,
     UPSERT, or UPDATE.
  3. An implicit context during the evaluation of an expression. For
     example the DATE in `'2021-01-02'::DATE < now()` will be implicitly
     cast to a TIMESTAMPTZ so that the values can be compared.

Not all casts can be performed in all contexts. Postgres's pg_cast table
lists valid casts and specifies the maximum cast context in which each
can be performed. A cast with a max context of explicit can only be
performed in an explicit context. A cast with an assignment max context
can be performed in an explicit context or an assignment context. A cast
with an implicit max context can be performed in all contexts.

Much to my personal disappointment and frustration, there are valid
casts that are not listed in Postgres's pg_cast table. These casts are
called "automatic I/O conversions" and they allow casting most types to
and from the string types: TEXT, VARCHAR, CHAR, NAME, and "char" [2].
We cannot determine these casts' maximum cast context from the pg_cast
table, so we rely on the documentation which states that conversions to
string types are assignment casts and conversions from string types are
explicit casts [3].

--

This commit implements assignment casts for INSERTs. Follow up work will
implement assignment casts for UPSERTs and UPDATEs.

A cast performed in an assignment context has slightly different
behavior than the same cast performed in an explicit context. In an
assignment context, the cast will error if the width of the value is too
large for the given type. In an explicit context, the value will be
truncated to match the width. The one exception is assignment casts to
the special "char" type which do truncate values.

To support different cast behaviors for different contexts, a new
built-in, `crdb_internal.assignment_cast` has been introduced. This
function takes two arguments: a value and a type. Because SQL
does not have first-class types, a type cannot be passed directly to the
built-in. Instead, a `NULL` cast to a type is used as a workaround,
similar to the `json_populate_record` built-in. For example, an integer
can be assignment-cast to a string with:

    crdb_internal.assignment_cast(1::INT, NULL::STRING)

The optimizer is responsible for wrapping INSERT columns with the
assignment cast built-in function. If an insert column type `T1` is not
identical to the table's corresponding target column type `T2`, the
optimizer will check if there is a valid cast from `T1` to `T2` with a
maximum context that allows an assignment cast. If there is a such a
cast, a projection will wrap the column in the assignment cast built-in
function. If there is no such cast, a user error will be produced.

Some changes to prepared statement placeholder type inference were
required in order to better match Postgres's behavior (this is a
best-effort match thus far and there are still inconsistencies). Most
notably, widths and precision are no longer inferred for the types of
placeholders. The effect of this is that assignment casts will be
correctly added by the optimizer in order to make sure that values for
placeholders are correctly coerced to the target column type during
execution of a prepared insert.

The introduction of assignment casts fixes minor bugs and addresses some
inconsistencies with Postgres's behavior. In general, INSERTS now
successfully cast values to target table column types in more cases. As
one example, inserting a string into an integer column now succeeds:

    CREATE TABLE t (i INT)
    INSERT INTO t VALUES ('1'::STRING)

Prior to this commit there was logic that mimicked assignment casts, but
it was not correct. Bugs in the implementation caused incorrect behavior
when inserting into tables with computed columns. Most notably, a
computed column expression that referenced another column `c` was
evaluated with the value of `c` before the assignment cast was
performed. This resulted in incorrect values for computed columns in
some cases.

In addition, assignment casts make the special logic for rounding
decimal values in optbuilder obsolete. The builtin function
`crdb_internal.round_decimal_values` and related logic in optbuilder
will be removed once assignment casts are implemented for UPSERTs and
UPDATEs.

Fixes #69327
Fixes #69665

[1] https://www.postgresql.org/docs/current/typeconv.html
[2] https://www.postgresql.org/docs/13/catalog-pg-cast.html#CATALOG-PG-CAST
[3] https://www.postgresql.org/docs/13/sql-createcast.html#SQL-CREATECAST-NOTES

Release note (sql change): Implicit casts performed during INSERT
statements now more closely follow Postgres's behavior. Several minor
bugs related to these types of casts have been fixed.


70950: instancestorage: Add SQL test for sql_instances. r=knz a=rimadeodhar

This PR adds a unit test to verify that the sql_instances
table can be accessed through SQL API.

Release note: None

71412: sqlproxyccl: rework sqlproxy connection throttler r=JeffSwenson a=JeffSwenson

This change switches the sqlproxy connection throttling logic back to
exponential backoff. The tokenbucket approach was introduced by
PR #69041. There are a few behavior differences between this and the
original exponential backoff implementation.

1. The throttling logic is maintained per (ip, tenant) instead of per
   (ip). Some platform as a service provides share a single outbound ip
   address between multiple clients. These users would occasionaly see
   throttling caused by a second user sharing their IP.
2. The throttling logic was triggered before there was an authentication
   failure. It takes ~100ms-1000ms to authenticate with the tenant
   process.  Any requests that arrived after the first request, but
   before it was processed, would trigger the throttle. Now, we only
   trigger the throttle in response to an explict authorization error.

Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: rimadeodhar <[email protected]>
Co-authored-by: Jeff <[email protected]>
  • Loading branch information
4 people committed Oct 13, 2021
4 parents 2f2c517 + 17276b5 + 60ac3d8 + 2559b4b commit c908959
Show file tree
Hide file tree
Showing 71 changed files with 5,321 additions and 3,161 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2831,6 +2831,8 @@ may increase either contention or retry errors, or both.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.approximate_timestamp"></a><code>crdb_internal.approximate_timestamp(timestamp: <a href="decimal.html">decimal</a>) &rarr; <a href="timestamp.html">timestamp</a></code></td><td><span class="funcdesc"><p>Converts the crdb_internal_mvcc_timestamp column into an approximate timestamp.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.assignment_cast"></a><code>crdb_internal.assignment_cast(val: anyelement, type: anyelement) &rarr; anyelement</code></td><td><span class="funcdesc"><p>This function is used internally to perform assignment casts during mutations.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.check_consistency"></a><code>crdb_internal.check_consistency(stats_only: <a href="bool.html">bool</a>, start_key: <a href="bytes.html">bytes</a>, end_key: <a href="bytes.html">bytes</a>) &rarr; tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail}</code></td><td><span class="funcdesc"><p>Runs a consistency check on ranges touching the specified key range. an empty start or end key is treated as the minimum and maximum possible, respectively. stats_only should only be set to false when targeting a small number of ranges to avoid overloading the cluster. Each returned row contains the range ID, the status (a roachpb.CheckConsistencyResponse_Status), and verbose detail.</p>
<p>Example usage:
SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
"//pkg/roachpb:with-mocks",
"//pkg/security",
Expand Down
93 changes: 64 additions & 29 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,25 @@ package sqlproxyccl
import (
"net"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/jackc/pgproto3/v2"
)

// authenticate handles the startup of the pgwire protocol to the point where
// the connections is considered authenticated. If that doesn't happen, it
// returns an error.
var authenticate = func(clientConn, crdbConn net.Conn) error {
var authenticate = func(clientConn, crdbConn net.Conn, throttleHook func(throttler.AttemptStatus) *pgproto3.ErrorResponse) error {
fe := pgproto3.NewBackend(pgproto3.NewChunkReader(clientConn), clientConn)
be := pgproto3.NewFrontend(pgproto3.NewChunkReader(crdbConn), crdbConn)

feSend := func(msg pgproto3.BackendMessage) error {
err := fe.Send(msg)
if err != nil {
return newErrorf(codeClientWriteFailed, "unable to send message %v to client: %v", msg, err)
}
return nil
}

// The auth step should require only a few back and forths so 20 iterations
// should be enough.
var i int
Expand All @@ -32,39 +41,18 @@ var authenticate = func(clientConn, crdbConn net.Conn) error {
return newErrorf(codeBackendReadFailed, "unable to receive message from backend: %v", err)
}

err = fe.Send(backendMsg)
if err != nil {
return newErrorf(
codeClientWriteFailed, "unable to send message %v to client: %v", backendMsg, err,
)
}

// Decide what to do based on the type of the server response.
// The cases in this switch are roughly sorted in the order the server will send them.
switch tp := backendMsg.(type) {
case *pgproto3.ReadyForQuery:
// Server has authenticated the connection successfully and is ready to
// serve queries.
return nil
case *pgproto3.AuthenticationOk:
// Server has authenticated the connection; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered which signifies that server
// is ready to serve queries.
case *pgproto3.ParameterStatus:
// Server sent status message; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.BackendKeyData:
// Server sent backend key data; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.ErrorResponse:
// Server has rejected the authentication response from the client and
// has closed the connection.
return newErrorf(codeAuthFailed, "authentication failed: %s", tp.Message)

// The backend is requesting the user to authenticate.
// Read the client response and forward it to server.
case
*pgproto3.AuthenticationCleartextPassword,
*pgproto3.AuthenticationMD5Password,
*pgproto3.AuthenticationSASL:
// The backend is requesting the user to authenticate.
// Read the client response and forward it to server.
if err = feSend(backendMsg); err != nil {
return err
}
fntMsg, err := fe.Receive()
if err != nil {
return newErrorf(codeClientReadFailed, "unable to receive message from client: %v", err)
Expand All @@ -75,6 +63,53 @@ var authenticate = func(clientConn, crdbConn net.Conn) error {
codeBackendWriteFailed, "unable to send message %v to backend: %v", fntMsg, err,
)
}

// Server has authenticated the connection; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered which signifies that server
// is ready to serve queries.
case *pgproto3.AuthenticationOk:
throttleError := throttleHook(throttler.AttemptOK)
if throttleError != nil {
if err = feSend(throttleError); err != nil {
return err
}
return newErrorf(codeProxyRefusedConnection, "connection attempt throttled")
}
if err = feSend(backendMsg); err != nil {
return err
}

// Server has rejected the authentication response from the client and
// has closed the connection.
case *pgproto3.ErrorResponse:
throttleError := throttleHook(throttler.AttemptInvalidCredentials)
if throttleError != nil {
if err = feSend(throttleError); err != nil {
return err
}
return newErrorf(codeProxyRefusedConnection, "connection attempt throttled")
}
if err = feSend(backendMsg); err != nil {
return err
}
return newErrorf(codeAuthFailed, "authentication failed: %s", tp.Message)

// Information provided by the server to the client before the connection is ready
// to accept queries. These are typically returned after AuthenticationOk and before
// ReadyForQuery.
case *pgproto3.ParameterStatus, *pgproto3.BackendKeyData:
if err = feSend(backendMsg); err != nil {
return err
}

// Server has authenticated the connection successfully and is ready to
// serve queries.
case *pgproto3.ReadyForQuery:
if err = feSend(backendMsg); err != nil {
return err
}
return nil

default:
return newErrorf(codeBackendDisconnected, "received unexpected backend message type: %v", tp)
}
Expand Down
90 changes: 83 additions & 7 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ import (
"net"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/require"
)

var nilThrottleHook = func(state throttler.AttemptStatus) *pgproto3.ErrorResponse {
return nil
}

func TestAuthenticateOK(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -33,7 +38,7 @@ func TestAuthenticateOK(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.ReadyForQuery{})
}()

require.NoError(t, authenticate(srv, cli))
require.NoError(t, authenticate(srv, cli, nilThrottleHook))
}

func TestAuthenticateClearText(t *testing.T) {
Expand Down Expand Up @@ -75,7 +80,76 @@ func TestAuthenticateClearText(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.ReadyForQuery{})
}()

require.NoError(t, authenticate(srv, cli))
require.NoError(t, authenticate(srv, cli, nilThrottleHook))
}

func TestAuthenticateThrottled(t *testing.T) {
defer leaktest.AfterTest(t)()

server := func(t *testing.T, be *pgproto3.Backend, authResponse pgproto3.BackendMessage) {
require.NoError(t, be.Send(&pgproto3.AuthenticationCleartextPassword{}))

msg, err := be.Receive()
require.NoError(t, err)
require.Equal(t, msg, &pgproto3.PasswordMessage{Password: "password"})

require.NoError(t, be.Send(authResponse))
}

client := func(t *testing.T, fe *pgproto3.Frontend) {
msg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, msg, &pgproto3.AuthenticationCleartextPassword{})

require.NoError(t, fe.Send(&pgproto3.PasswordMessage{Password: "password"}))

msg, err = fe.Receive()
require.NoError(t, err)
require.Equal(t, msg, &pgproto3.ErrorResponse{Message: "throttled"})

// Try reading from the connection. This check ensures authorize
// swallowed the OK/Error response from the sql server.
_, err = fe.Receive()
require.Error(t, err)
}

type testCase struct {
name string
result pgproto3.BackendMessage
expectedStatus throttler.AttemptStatus
}
for _, tc := range []testCase{
{
name: "AuthenticationOkay",
result: &pgproto3.AuthenticationOk{},
expectedStatus: throttler.AttemptOK,
},
{
name: "AuthenticationError",
result: &pgproto3.ErrorResponse{Message: "wrong password"},
expectedStatus: throttler.AttemptInvalidCredentials,
},
} {
t.Run(tc.name, func(t *testing.T) {
proxyToServer, serverToProxy := net.Pipe()
proxyToClient, clientToProxy := net.Pipe()
sqlServer := pgproto3.NewBackend(pgproto3.NewChunkReader(serverToProxy), serverToProxy)
sqlClient := pgproto3.NewFrontend(pgproto3.NewChunkReader(clientToProxy), clientToProxy)

go server(t, sqlServer, &pgproto3.AuthenticationOk{})
go client(t, sqlClient)

err := authenticate(proxyToClient, proxyToServer, func(status throttler.AttemptStatus) *pgproto3.ErrorResponse {
require.Equal(t, throttler.AttemptOK, status)
return &pgproto3.ErrorResponse{Message: "throttled"}
})
require.Error(t, err)
require.Contains(t, err.Error(), "connection attempt throttled")

proxyToServer.Close()
proxyToClient.Close()
})
}
}

func TestAuthenticateError(t *testing.T) {
Expand All @@ -93,7 +167,7 @@ func TestAuthenticateError(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"})
}()

err := authenticate(srv, cli)
err := authenticate(srv, cli, nilThrottleHook)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
Expand All @@ -110,12 +184,14 @@ func TestAuthenticateUnexpectedMessage(t *testing.T) {
go func() {
err := be.Send(&pgproto3.BindComplete{})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.BindComplete{})
_, err = fe.Receive()
require.Error(t, err)
}()

err := authenticate(srv, cli)
err := authenticate(srv, cli, nilThrottleHook)

srv.Close()

require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
Expand Down
35 changes: 19 additions & 16 deletions pkg/ccl/sqlproxyccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ func updateMetricsAndSendErrToClient(err error, conn net.Conn, metrics *metrics)
SendErrToClient(conn, err)
}

// SendErrToClient will encode and pass back to the SQL client an error message.
// It can be called by the implementors of proxyHandler to give more
// information to the end user in case of a problem.
var SendErrToClient = func(conn net.Conn, err error) {
if err == nil || conn == nil {
return
}
func toPgError(err error) *pgproto3.ErrorResponse {
codeErr := (*codeError)(nil)
if errors.As(err, &codeErr) {
var msg string
Expand All @@ -60,19 +54,28 @@ var SendErrToClient = func(conn net.Conn, err error) {
} else {
pgCode = "08004" // rejected connection
}
_, _ = conn.Write((&pgproto3.ErrorResponse{
return &pgproto3.ErrorResponse{
Severity: "FATAL",
Code: pgCode,
Message: msg,
}).Encode(nil))
} else {
// Return a generic "internal server error" message.
_, _ = conn.Write((&pgproto3.ErrorResponse{
Severity: "FATAL",
Code: "08004", // rejected connection
Message: "internal server error",
}).Encode(nil))
}
}
// Return a generic "internal server error" message.
return &pgproto3.ErrorResponse{
Severity: "FATAL",
Code: "08004", // rejected connection
Message: "internal server error",
}
}

// SendErrToClient will encode and pass back to the SQL client an error message.
// It can be called by the implementors of proxyHandler to give more
// information to the end user in case of a problem.
var SendErrToClient = func(conn net.Conn, err error) {
if err == nil || conn == nil {
return
}
_, _ = conn.Write(toPgError(err).Encode(nil))
}

// ConnectionCopy does a bi-directional copy between the backend and frontend
Expand Down
26 changes: 17 additions & 9 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ type ProxyOptions struct {
// DrainTimeout if set, will close DRAINING connections that have been idle
// for this duration.
DrainTimeout time.Duration

// Token bucket policy used to throttle (IP, TenantID) connection pairs that
// have no history of successful authentication.
ThrottlePolicy throttler.BucketPolicy
// ThrottleBaseDelay is the initial exponential backoff triggered in
// response to the first connection failure.
ThrottleBaseDelay time.Duration
}

// proxyHandler is the default implementation of a proxy handler.
Expand Down Expand Up @@ -135,6 +134,8 @@ type proxyHandler struct {
certManager *certmgr.CertManager
}

var throttledError = newErrorf(codeProxyRefusedConnection, "connection attempt throttled")

// newProxyHandler will create a new proxy handler with configuration based on
// the provided options.
func newProxyHandler(
Expand Down Expand Up @@ -163,7 +164,7 @@ func newProxyHandler(
}

handler.throttleService = throttler.NewLocalService(
throttler.WithPolicy(handler.ThrottlePolicy),
throttler.WithBaseDelay(handler.ThrottleBaseDelay),
)

if handler.DirectoryAddr != "" {
Expand Down Expand Up @@ -260,9 +261,10 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
defer removeListener()

throttleTags := throttler.ConnectionTags{IP: ipAddr, TenantID: tenID.String()}
if err := handler.throttleService.LoginCheck(throttleTags); err != nil {
throttleTime, err := handler.throttleService.LoginCheck(throttleTags)
if err != nil {
log.Errorf(ctx, "throttler refused connection: %v", err.Error())
err = newErrorf(codeProxyRefusedConnection, "connection attempt throttled")
err = throttledError
updateMetricsAndSendErrToClient(err, conn, handler.metrics)
return err
}
Expand Down Expand Up @@ -397,14 +399,20 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
defer func() { _ = crdbConn.Close() }()

// Perform user authentication.
if err := authenticate(conn, crdbConn); err != nil {
if err := authenticate(conn, crdbConn, func(status throttler.AttemptStatus) *pgproto3.ErrorResponse {
err := handler.throttleService.ReportAttempt(ctx, throttleTags, throttleTime, status)
if err != nil {
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return toPgError(throttledError)
}
return nil
}); err != nil {
handler.metrics.updateForError(err)
log.Ops.Errorf(ctx, "authenticate: %s", err)
return err
}

handler.metrics.SuccessfulConnCount.Inc(1)
handler.throttleService.ReportSuccess(throttleTags)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading

0 comments on commit c908959

Please sign in to comment.