Skip to content

Commit

Permalink
Merge pull request #78219 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-77965

release-22.1: ccl/sqlproxyccl: rename RequestTransfer to TransferConnection, and make it sync
  • Loading branch information
jaylim-crl authored Mar 22, 2022
2 parents db57134 + 83a5163 commit 26c05e1
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 113 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ go_test(
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
31 changes: 28 additions & 3 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,24 @@ func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) {

var errTransferCannotStart = errors.New("transfer cannot be started")

func (f *forwarder) runTransfer() (retErr error) {
// TransferConnection attempts a best-effort connection migration to an
// available SQL pod based on the load-balancing algorithm. If a transfer has
// already been started, or the forwarder has been closed, this returns an
// error. This is a best-effort process because there could be a situation
// where the forwarder is not in a state that is eligible for a connection
// migration.
//
// NOTE: If the forwarder hasn't been closed, runTransfer has an invariant
// where the processors have been resumed prior to calling this method. When
// runTransfer returns, it is guaranteed that processors will either be
// re-resumed, or the forwarder will be closed (in the case of a non-recoverable
// error).
//
// TODO(jaylim-crl): It would be nice to introduce transfer policies in the
// future. That way, we could either transfer to another random SQL pod, or to
// a specific SQL pod. If we do that, TransferConnection would take in some kind
// of policy parameter(s).
func (f *forwarder) TransferConnection() (retErr error) {
// A previous non-recoverable transfer would have closed the forwarder, so
// return right away.
if f.ctx.Err() != nil {
Expand All @@ -131,6 +148,10 @@ func (f *forwarder) runTransfer() (retErr error) {
// blocked I/Os as described above.
go func() {
<-ctx.Done()
// This Close call here in addition to the one in the defer callback
// below is on purpose. This would help unblock situations where we're
// blocked on sending/reading messages from connections that couldn't
// be handled with context.Context.
if !ctx.isRecoverable() {
f.Close()
}
Expand All @@ -151,9 +172,12 @@ func (f *forwarder) runTransfer() (retErr error) {
latencyDur := timeutil.Since(tBegin)
f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds())

// When runTransfer returns, it's either the forwarder has been closed,
// or the procesors have been resumed.
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
f.Close()
} else {
// Transfer was successful.
if retErr == nil {
Expand Down Expand Up @@ -241,8 +265,9 @@ func transferConnection(
// TODO(jaylim-crl): There is a possibility where the same pod will get
// selected. Some ideas to solve this: pass in the remote address of
// serverConn to avoid choosing that pod, or maybe a filter callback?
// We can also consider adding a target pod as an argument to RequestTransfer.
// That way a central component gets to choose where the connections go.
// We can also consider adding a target pod as an argument to
// TransferConnection. That way a central component gets to choose where the
// connections go.
connectFn := connector.OpenTenantConnWithToken
if transferConnectionConnectorTestHook != nil {
connectFn = transferConnectionConnectorTestHook
Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/sqlproxyccl/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,6 @@ func (f *forwarder) Close() {
serverConn.Close()
}

// RequestTransfer requests that the forwarder performs a best-effort connection
// migration whenever it can. It is best-effort because this will be a no-op if
// the forwarder is not in a state that is eligible for a connection migration.
// If a transfer is already in progress, or has been requested, this is a no-op.
func (f *forwarder) RequestTransfer() {
// Ignore the error here. These errors will be logged accordingly.
go func() { _ = f.runTransfer() }()
}

// resumeProcessors starts both the request and response processors
// asynchronously. The forwarder will be closed if any of the processors
// return an error while resuming. This is idempotent as resume() will return
Expand Down
Loading

0 comments on commit 26c05e1

Please sign in to comment.