Skip to content

Commit

Permalink
ccl/sqlproxyccl: complete connection migration support in the forwarder
Browse files Browse the repository at this point in the history
Informs #76000.

This commit completes the connection migration feature in the the forwarder
within sqlproxy. The idea is as described in the RFC.

A couple of new sqlproxy metrics have been added as well:
- proxy.conn_migration.success
- proxy.conn_migration.error_fatal
- proxy.conn_migration.error_recoverable
- proxy.conn_migration.attempted

For more details, see metrics.go in the sqlproxyccl package.

Release justification: This completes the first half of the connection
migration feature. This is low risk as part of the code is guarded behind the
connection migration feature, which is currently not being used in production.
To add on, CockroachCloud is the only user of sqlproxy.

Release note: None
  • Loading branch information
jaylim-crl committed Mar 10, 2022
1 parent 05b6f49 commit fddfd30
Show file tree
Hide file tree
Showing 10 changed files with 787 additions and 51 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down Expand Up @@ -78,6 +79,8 @@ go_test(
"//pkg/sql",
"//pkg/sql/pgwire",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand All @@ -90,6 +93,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down
241 changes: 241 additions & 0 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,254 @@ import (
"encoding/json"
"fmt"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/interceptor"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pgproto3 "github.com/jackc/pgproto3/v2"
)

// defaultTransferTimeout corresponds to the timeout period for the connection
// migration process. If the timeout gets triggered, and we're in a non
// recoverable state, the connection will be closed.
//
// This is a variable instead of a constant to support testing hooks.
var defaultTransferTimeout = 15 * time.Second

var errTransferCannotStart = errors.New("transfer cannot be started")

// tryBeginTransfer returns true if the transfer can be started, and false
// otherwise. If the transfer can be started, it updates the state of the
// forwarder to indicate that a transfer is in progress.
func (f *forwarder) tryBeginTransfer() bool {
f.mu.Lock()
defer f.mu.Unlock()

// Transfer is already in progress. No concurrent transfers are allowed.
if f.mu.isTransferring {
return false
}

if !isSafeTransferPoint(f.mu.request, f.mu.response) {
return false
}

f.mu.isTransferring = true
return true
}

type transferContext struct {
context.Context
mu struct {
syncutil.Mutex
recoverableConn bool
}
}

func newTransferContext(backgroundCtx context.Context) (*transferContext, context.CancelFunc) {
transferCtx, cancel := context.WithTimeout(backgroundCtx, defaultTransferTimeout) // nolint:context
ctx := &transferContext{
Context: transferCtx,
}
ctx.mu.recoverableConn = true
return ctx, cancel
}

func (t *transferContext) markRecoverable(r bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.recoverableConn = r
}

func (t *transferContext) isRecoverable() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.mu.recoverableConn
}

func (f *forwarder) runTransfer() (retErr error) {
// A previous non-recoverable transfer would have closed the forwarder, so
// return right away.
if f.ctx.Err() != nil {
return f.ctx.Err()
}

if !f.tryBeginTransfer() {
return errTransferCannotStart
}
defer func() {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.isTransferring = false
}()

f.metrics.ConnMigrationAttemptedCount.Inc(1)

// Create a transfer context, and timeout handler which gets triggered
// whenever the context expires. We have to close the forwarder because
// the transfer may be blocked on I/O, and the only way for now is to close
// the connections. This then allow runTransfer to return and cleanup.
ctx, cancel := newTransferContext(f.ctx)
defer cancel()

go func() {
<-ctx.Done()
if !ctx.isRecoverable() {
f.Close()
}
}()

// Use a separate context for logging because f.ctx will be closed whenever
// the connection is non-recoverable.
logCtx := logtags.WithTags(context.Background(), logtags.FromContext(f.ctx))
defer func() {
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, err=%v", retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
} else {
// Transfer was successful.
if retErr == nil {
log.Infof(logCtx, "transfer successful")
f.metrics.ConnMigrationSuccessCount.Inc(1)
} else {
log.Infof(logCtx, "transfer failed: connection recovered, err=%v", retErr)
f.metrics.ConnMigrationErrorRecoverableCount.Inc(1)
}
f.resumeProcessors()
}
}()

f.mu.Lock()
defer f.mu.Unlock()

// Suspend both processors before starting the transfer.
f.mu.request.suspend(ctx)
f.mu.response.suspend(ctx)

// Transfer the connection.
newServerConn, err := transferConnection(ctx, f.connector, f.mu.clientConn, f.mu.serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}

// Transfer was successful.
clockFn := makeLogicalClockFn()
f.mu.serverConn.Close()
f.mu.serverConn = newServerConn
f.mu.request = newProcessor(clockFn, f.mu.clientConn, f.mu.serverConn)
f.mu.response = newProcessor(clockFn, f.mu.serverConn, f.mu.clientConn)
return nil
}

// transferConnection performs the transfer operation for the current server
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

// Context was cancelled.
if ctx.Err() != nil {
return nil, ctx.Err()
}

transferKey := uuid.MakeV4().String()

// Send the SHOW TRANSFER STATE statement. At this point, connection is
// non-recoverable because the message has already been sent to the server.
ctx.markRecoverable(false)
if err := runShowTransferState(serverConn, transferKey); err != nil {
return nil, errors.Wrap(err, "sending transfer request")
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}

// Failures after this point are recoverable, and connections should not be
// terminated.
ctx.markRecoverable(true)

// If we consumed until ReadyForQuery without errors, but the transfer state
// response returns an error, we could still resume the connection, but the
// transfer process will need to be aborted.
//
// This case may happen pretty frequently (e.g. open transactions, temporary
// tables, etc.).
if transferErr != "" {
return nil, errors.Newf("%s", transferErr)
}

// Connect to a new SQL pod.
//
// TODO(jaylim-crl): There is a possibility where the same pod will get
// selected. Some ideas to solve this: pass in the remote address of
// serverConn to avoid choosing that pod, or maybe a filter callback?
// We can also consider adding a target pod as an argument to RequestTransfer.
// That way a central component gets to choose where the connections go.
netConn, err := connector.OpenTenantConnWithToken(ctx, revivalToken)
if err != nil {
return nil, errors.Wrap(err, "opening connection")
}
defer func() {
if retErr != nil {
netConn.Close()
}
}()
newServerConn := interceptor.NewPGConn(netConn)

// Deserialize session state within the new SQL pod.
if err := runAndWaitForDeserializeSession(
ctx, newServerConn.ToFrontendConn(), state,
); err != nil {
return nil, errors.Wrap(err, "deserializing session")
}

return newServerConn, nil
}

// isSafeTransferPoint returns true if we're at a point where we're safe to
// transfer, and false otherwise.
var isSafeTransferPoint = func(request *processor, response *processor) bool {
request.mu.Lock()
response.mu.Lock()
defer request.mu.Unlock()
defer response.mu.Unlock()

// Three conditions when evaluating a safe transfer point:
// 1. The last message sent to the SQL pod was a Sync(S) or SimpleQuery(Q),
// and a ReadyForQuery(Z) has been received after.
// 2. The last message sent to the SQL pod was a CopyDone(c), and a
// ReadyForQuery(Z) has been received after.
// 3. The last message sent to the SQL pod was a CopyFail(f), and a
// ReadyForQuery(Z) has been received after.

// The conditions above are not possible if this is true. They cannot be
// equal since the same logical clock is used.
if request.mu.lastMessageTransferredAt > response.mu.lastMessageTransferredAt {
return false
}

switch pgwirebase.ClientMessageType(request.mu.lastMessageType) {
case pgwirebase.ClientMessageType(0),
pgwirebase.ClientMsgSync,
pgwirebase.ClientMsgSimpleQuery,
pgwirebase.ClientMsgCopyDone,
pgwirebase.ClientMsgCopyFail:
return pgwirebase.ServerMessageType(response.mu.lastMessageType) == pgwirebase.ServerMsgReady
default:
return false
}
}

// runShowTransferState sends a SHOW TRANSFER STATE query with the input
// transferKey to the given writer. The transferKey will be used to uniquely
// identify the request when parsing the response messages in
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/sqlproxyccl/conn_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/stretchr/testify/require"
)

func TestIsSafeTransferPoint(t *testing.T) {
defer leaktest.AfterTest(t)()
// TODO(jaylim-crl): Tests.
}

func TestRunShowTransferState(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
Loading

0 comments on commit fddfd30

Please sign in to comment.