Skip to content

Commit

Permalink
ccl/sqlproxyccl: complete connection migration support in the forwarder
Browse files Browse the repository at this point in the history
Informs #76000.

This commit completes the connection migration feature in the the forwarder
within sqlproxy. The idea is as described in the RFC.

A couple of new sqlproxy metrics have been added as well:
- proxy.conn_migration.success
- proxy.conn_migration.error_fatal
- proxy.conn_migration.error_recoverable
- proxy.conn_migration.attempted

For more details, see metrics.go in the sqlproxyccl package.

Release justification: This completes the first half of the connection
migration feature. This is low risk as part of the code is guarded behind the
connection migration feature, which is currently not being used in production.
To add on, CockroachCloud is the only user of sqlproxy.

Release note: None
  • Loading branch information
jaylim-crl committed Mar 10, 2022
1 parent fae1e2b commit 9661512
Show file tree
Hide file tree
Showing 13 changed files with 1,277 additions and 93 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down Expand Up @@ -78,6 +79,8 @@ go_test(
"//pkg/sql",
"//pkg/sql/pgwire",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand All @@ -90,6 +93,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down
266 changes: 265 additions & 1 deletion pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,277 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/interceptor"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pgproto3 "github.com/jackc/pgproto3/v2"
)

// defaultTransferTimeout corresponds to the timeout period for the connection
// migration process. If the timeout gets triggered, and we're in a non
// recoverable state, the connection will be closed.
//
// This is a variable instead of a constant to support testing hooks.
var defaultTransferTimeout = 15 * time.Second

// Used in testing.
var transferConnectionConnectorTestHook func(context.Context, string) (net.Conn, error) = nil

type transferContext struct {
context.Context
mu struct {
syncutil.Mutex
recoverableConn bool
}
}

func newTransferContext(backgroundCtx context.Context) (*transferContext, context.CancelFunc) {
transferCtx, cancel := context.WithTimeout(backgroundCtx, defaultTransferTimeout) // nolint:context
ctx := &transferContext{
Context: transferCtx,
}
ctx.mu.recoverableConn = true
return ctx, cancel
}

func (t *transferContext) markRecoverable(r bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.recoverableConn = r
}

func (t *transferContext) isRecoverable() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.mu.recoverableConn
}

// tryBeginTransfer returns true if the transfer can be started, and false
// otherwise. If the transfer can be started, it updates the state of the
// forwarder to indicate that a transfer is in progress, and a cleanup function
// will be returned.
func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) {
f.mu.Lock()
defer f.mu.Unlock()

// Transfer is already in progress. No concurrent transfers are allowed.
if f.mu.isTransferring {
return false, nil
}

if !isSafeTransferPoint(f.mu.request, f.mu.response) {
return false, nil
}

f.mu.isTransferring = true

return true, func() {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.isTransferring = false
}
}

var errTransferCannotStart = errors.New("transfer cannot be started")

func (f *forwarder) runTransfer() (retErr error) {
// A previous non-recoverable transfer would have closed the forwarder, so
// return right away.
if f.ctx.Err() != nil {
return f.ctx.Err()
}

started, cleanupFn := f.tryBeginTransfer()
if !started {
return errTransferCannotStart
}
defer cleanupFn()

f.metrics.ConnMigrationAttemptedCount.Inc(1)

// Create a transfer context, and timeout handler which gets triggered
// whenever the context expires. We have to close the forwarder because
// the transfer may be blocked on I/O, and the only way for now is to close
// the connections. This then allow runTransfer to return and cleanup.
ctx, cancel := newTransferContext(f.ctx)
defer cancel()

go func() {
<-ctx.Done()
if !ctx.isRecoverable() {
f.Close()
}
}()

// Use a separate context for logging because f.ctx will be closed whenever
// the connection is non-recoverable.
logCtx := logtags.WithTags(context.Background(), logtags.FromContext(f.ctx))
defer func() {
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, err=%v", retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
} else {
// Transfer was successful.
if retErr == nil {
log.Infof(logCtx, "transfer successful")
f.metrics.ConnMigrationSuccessCount.Inc(1)
} else {
log.Infof(logCtx, "transfer failed: connection recovered, err=%v", retErr)
f.metrics.ConnMigrationErrorRecoverableCount.Inc(1)
}
if err := f.resumeProcessors(); err != nil {
log.Infof(logCtx, "unable to resume processors: %v", err)
f.Close()
}
}
}()

f.mu.Lock()
defer f.mu.Unlock()

// Suspend both processors before starting the transfer.
if err := f.mu.request.suspend(ctx); err != nil {
return errors.Wrap(err, "suspending request processor")
}
if err := f.mu.response.suspend(ctx); err != nil {
return errors.Wrap(err, "suspending response processor")
}

// Transfer the connection.
newServerConn, err := transferConnection(ctx, f.connector, f.mu.clientConn, f.mu.serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}

// Transfer was successful.
clockFn := makeLogicalClockFn()
f.mu.serverConn.Close()
f.mu.serverConn = newServerConn
f.mu.request = newProcessor(clockFn, f.mu.clientConn, f.mu.serverConn)
f.mu.response = newProcessor(clockFn, f.mu.serverConn, f.mu.clientConn)
return nil
}

// transferConnection performs the transfer operation for the current server
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

// Context was cancelled.
if ctx.Err() != nil {
return nil, ctx.Err()
}

transferKey := uuid.MakeV4().String()

// Send the SHOW TRANSFER STATE statement. At this point, connection is
// non-recoverable because the message has already been sent to the server.
ctx.markRecoverable(false)
if err := runShowTransferState(serverConn, transferKey); err != nil {
return nil, errors.Wrap(err, "sending transfer request")
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}

// Failures after this point are recoverable, and connections should not be
// terminated.
ctx.markRecoverable(true)

// If we consumed until ReadyForQuery without errors, but the transfer state
// response returns an error, we could still resume the connection, but the
// transfer process will need to be aborted.
//
// This case may happen pretty frequently (e.g. open transactions, temporary
// tables, etc.).
if transferErr != "" {
return nil, errors.Newf("%s", transferErr)
}

// Connect to a new SQL pod.
//
// TODO(jaylim-crl): There is a possibility where the same pod will get
// selected. Some ideas to solve this: pass in the remote address of
// serverConn to avoid choosing that pod, or maybe a filter callback?
// We can also consider adding a target pod as an argument to RequestTransfer.
// That way a central component gets to choose where the connections go.
connectFn := connector.OpenTenantConnWithToken
if transferConnectionConnectorTestHook != nil {
connectFn = transferConnectionConnectorTestHook
}
netConn, err := connectFn(ctx, revivalToken)
if err != nil {
return nil, errors.Wrap(err, "opening connection")
}
defer func() {
if retErr != nil {
netConn.Close()
}
}()
newServerConn := interceptor.NewPGConn(netConn)

// Deserialize session state within the new SQL pod.
if err := runAndWaitForDeserializeSession(
ctx, newServerConn.ToFrontendConn(), state,
); err != nil {
return nil, errors.Wrap(err, "deserializing session")
}

return newServerConn, nil
}

// isSafeTransferPoint returns true if we're at a point where we're safe to
// transfer, and false otherwise.
var isSafeTransferPoint = func(request *processor, response *processor) bool {
request.mu.Lock()
response.mu.Lock()
defer request.mu.Unlock()
defer response.mu.Unlock()

// Three conditions when evaluating a safe transfer point:
// 1. The last message sent to the SQL pod was a Sync(S) or SimpleQuery(Q),
// and a ReadyForQuery(Z) has been received after.
// 2. The last message sent to the SQL pod was a CopyDone(c), and a
// ReadyForQuery(Z) has been received after.
// 3. The last message sent to the SQL pod was a CopyFail(f), and a
// ReadyForQuery(Z) has been received after.

// The conditions above are not possible if this is true. They cannot be
// equal since the same logical clock is used (except during initialization).
if request.mu.lastMessageTransferredAt > response.mu.lastMessageTransferredAt {
return false
}

// We need to check zero values here to handle the initialization case
// since we would still want to be able to transfer connections which have
// not made any queries to the server.
switch pgwirebase.ClientMessageType(request.mu.lastMessageType) {
case pgwirebase.ClientMessageType(0),
pgwirebase.ClientMsgSync,
pgwirebase.ClientMsgSimpleQuery,
pgwirebase.ClientMsgCopyDone,
pgwirebase.ClientMsgCopyFail:

serverMsg := pgwirebase.ServerMessageType(response.mu.lastMessageType)
return serverMsg == pgwirebase.ServerMsgReady || serverMsg == pgwirebase.ServerMessageType(0)
default:
return false
}
}

// runShowTransferState sends a SHOW TRANSFER STATE query with the input
// transferKey to the given writer. The transferKey will be used to uniquely
// identify the request when parsing the response messages in
Expand All @@ -28,7 +292,7 @@ import (
// Unlike runAndWaitForDeserializeSession, we split the SHOW TRANSFER STATE
// operation into `run` and `wait` since doing so allows us to send the query
// ahead of time.
func runShowTransferState(w io.Writer, transferKey string) error {
var runShowTransferState = func(w io.Writer, transferKey string) error {
return writeQuery(w, "SHOW TRANSFER STATE WITH '%s'", transferKey)
}

Expand Down
Loading

0 comments on commit 9661512

Please sign in to comment.