Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/sqlproxyccl: complete connection migration support in the forwarder #76805

Merged
merged 3 commits into from
Mar 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down Expand Up @@ -78,6 +79,8 @@ go_test(
"//pkg/sql",
"//pkg/sql/pgwire",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand All @@ -90,6 +93,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down
270 changes: 269 additions & 1 deletion pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,281 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/interceptor"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pgproto3 "github.com/jackc/pgproto3/v2"
)

// defaultTransferTimeout corresponds to the timeout period for the connection
// migration process. If the timeout gets triggered, and we're in a non
// recoverable state, the connection will be closed.
//
// This is a variable instead of a constant to support testing hooks.
var defaultTransferTimeout = 15 * time.Second

// Used in testing.
var transferConnectionConnectorTestHook func(context.Context, string) (net.Conn, error) = nil

type transferContext struct {
context.Context
mu struct {
syncutil.Mutex
recoverableConn bool
}
}

func newTransferContext(backgroundCtx context.Context) (*transferContext, context.CancelFunc) {
transferCtx, cancel := context.WithTimeout(backgroundCtx, defaultTransferTimeout) // nolint:context
ctx := &transferContext{
Context: transferCtx,
}
ctx.mu.recoverableConn = true
return ctx, cancel
}

func (t *transferContext) markRecoverable(r bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.recoverableConn = r
}

func (t *transferContext) isRecoverable() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.mu.recoverableConn
}

// tryBeginTransfer returns true if the transfer can be started, and false
// otherwise. If the transfer can be started, it updates the state of the
// forwarder to indicate that a transfer is in progress, and a cleanup function
// will be returned.
func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) {
f.mu.Lock()
defer f.mu.Unlock()

// Transfer is already in progress. No concurrent transfers are allowed.
if f.mu.isTransferring {
return false, nil
}

if !isSafeTransferPoint(f.mu.request, f.mu.response) {
return false, nil
}

f.mu.isTransferring = true

return true, func() {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.isTransferring = false
}
}

var errTransferCannotStart = errors.New("transfer cannot be started")

func (f *forwarder) runTransfer() (retErr error) {
// A previous non-recoverable transfer would have closed the forwarder, so
// return right away.
if f.ctx.Err() != nil {
return f.ctx.Err()
}

started, cleanupFn := f.tryBeginTransfer()
if !started {
return errTransferCannotStart
}
defer cleanupFn()

f.metrics.ConnMigrationAttemptedCount.Inc(1)

// Create a transfer context, and timeout handler which gets triggered
// whenever the context expires. We have to close the forwarder because
// the transfer may be blocked on I/O, and the only way for now is to close
// the connections. This then allow runTransfer to return and cleanup.
ctx, cancel := newTransferContext(f.ctx)
defer cancel()

// Use a separate handler for timeouts. This is the only way to handle
// blocked I/Os as described above.
go func() {
<-ctx.Done()
if !ctx.isRecoverable() {
f.Close()
}
}()

// Use a separate context for logging because f.ctx will be closed whenever
jaylim-crl marked this conversation as resolved.
Show resolved Hide resolved
// the connection is non-recoverable.
//
// TODO(jaylim-crl): There's a possible "use of Span after Finish" issue
// where proxy_handler.handle returns before this function returns because
// we're calling f.Close() in the timeout goroutine. When handle returns,
// the context (with the span) gets cleaned up. Some ideas to fix this:
// (1) errgroup (?), (2) use the stopper instead of the go keyword - that
// should fork a new span, and avoid this issue.
logCtx := logtags.WithTags(context.Background(), logtags.FromContext(f.ctx))
defer func() {
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, err=%v", retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
} else {
// Transfer was successful.
if retErr == nil {
log.Infof(logCtx, "transfer successful")
f.metrics.ConnMigrationSuccessCount.Inc(1)
} else {
log.Infof(logCtx, "transfer failed: connection recovered, err=%v", retErr)
f.metrics.ConnMigrationErrorRecoverableCount.Inc(1)
}
if err := f.resumeProcessors(); err != nil {
log.Infof(logCtx, "unable to resume processors: %v", err)
f.Close()
}
}
}()

// Suspend both processors before starting the transfer.
request, response := f.getProcessors()
if err := request.suspend(ctx); err != nil {
return errors.Wrap(err, "suspending request processor")
}
if err := response.suspend(ctx); err != nil {
return errors.Wrap(err, "suspending response processor")
}

// Transfer the connection.
clientConn, serverConn := f.getConns()
newServerConn, err := transferConnection(ctx, f.connector, clientConn, serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}

// Transfer was successful.
f.replaceServerConn(newServerConn)
return nil
}

// transferConnection performs the transfer operation for the current server
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

// Context was cancelled.
if ctx.Err() != nil {
return nil, ctx.Err()
}

transferKey := uuid.MakeV4().String()

// Send the SHOW TRANSFER STATE statement. At this point, connection is
// non-recoverable because the message has already been sent to the server.
ctx.markRecoverable(false)
if err := runShowTransferState(serverConn, transferKey); err != nil {
return nil, errors.Wrap(err, "sending transfer request")
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}

// Failures after this point are recoverable, and connections should not be
// terminated.
ctx.markRecoverable(true)

// If we consumed until ReadyForQuery without errors, but the transfer state
// response returns an error, we could still resume the connection, but the
// transfer process will need to be aborted.
//
// This case may happen pretty frequently (e.g. open transactions, temporary
// tables, etc.).
if transferErr != "" {
return nil, errors.Newf("%s", transferErr)
}

// Connect to a new SQL pod.
//
// TODO(jaylim-crl): There is a possibility where the same pod will get
// selected. Some ideas to solve this: pass in the remote address of
// serverConn to avoid choosing that pod, or maybe a filter callback?
// We can also consider adding a target pod as an argument to RequestTransfer.
// That way a central component gets to choose where the connections go.
connectFn := connector.OpenTenantConnWithToken
if transferConnectionConnectorTestHook != nil {
connectFn = transferConnectionConnectorTestHook
}
netConn, err := connectFn(ctx, revivalToken)
if err != nil {
return nil, errors.Wrap(err, "opening connection")
}
defer func() {
if retErr != nil {
netConn.Close()
}
}()
newServerConn := interceptor.NewPGConn(netConn)

// Deserialize session state within the new SQL pod.
if err := runAndWaitForDeserializeSession(
ctx, newServerConn.ToFrontendConn(), state,
); err != nil {
return nil, errors.Wrap(err, "deserializing session")
}

return newServerConn, nil
}

// isSafeTransferPoint returns true if we're at a point where we're safe to
// transfer, and false otherwise.
var isSafeTransferPoint = func(request *processor, response *processor) bool {
request.mu.Lock()
response.mu.Lock()
defer request.mu.Unlock()
defer response.mu.Unlock()

// Three conditions when evaluating a safe transfer point:
// 1. The last message sent to the SQL pod was a Sync(S) or SimpleQuery(Q),
// and a ReadyForQuery(Z) has been received after.
// 2. The last message sent to the SQL pod was a CopyDone(c), and a
// ReadyForQuery(Z) has been received after.
// 3. The last message sent to the SQL pod was a CopyFail(f), and a
// ReadyForQuery(Z) has been received after.

// The conditions above are not possible if this is true. They cannot be
// equal since the same logical clock is used (except during initialization).
if request.mu.lastMessageTransferredAt > response.mu.lastMessageTransferredAt {
return false
}

// We need to check zero values here to handle the initialization case
// since we would still want to be able to transfer connections which have
// not made any queries to the server.
switch pgwirebase.ClientMessageType(request.mu.lastMessageType) {
case pgwirebase.ClientMessageType(0),
pgwirebase.ClientMsgSync,
pgwirebase.ClientMsgSimpleQuery,
pgwirebase.ClientMsgCopyDone,
pgwirebase.ClientMsgCopyFail:

serverMsg := pgwirebase.ServerMessageType(response.mu.lastMessageType)
return serverMsg == pgwirebase.ServerMsgReady || serverMsg == pgwirebase.ServerMessageType(0)
default:
return false
}
}

// runShowTransferState sends a SHOW TRANSFER STATE query with the input
// transferKey to the given writer. The transferKey will be used to uniquely
// identify the request when parsing the response messages in
Expand All @@ -28,7 +296,7 @@ import (
// Unlike runAndWaitForDeserializeSession, we split the SHOW TRANSFER STATE
// operation into `run` and `wait` since doing so allows us to send the query
// ahead of time.
func runShowTransferState(w io.Writer, transferKey string) error {
var runShowTransferState = func(w io.Writer, transferKey string) error {
return writeQuery(w, "SHOW TRANSFER STATE WITH '%s'", transferKey)
}

Expand Down
Loading