From 83a5163d2d935bd662deab99b6c124e0d1f5e54b Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 16 Mar 2022 18:52:02 +0000 Subject: [PATCH] ccl/sqlproxyccl: rename RequestTransfer to TransferConnection, and make it sync Fixes a test flake in #77909. This commit fixes a test flake as described in the issue above. At the same time, we rename RequestTransfer to TransferConnection, and make the API synchronous instead. The transferer should invoke TransferConnection within a goroutine. Release justification: sqlproxy only change. Transfer API isn't used anywhere besides tests as well. Release note: None --- pkg/ccl/sqlproxyccl/BUILD.bazel | 1 + pkg/ccl/sqlproxyccl/conn_migration.go | 31 +++- pkg/ccl/sqlproxyccl/forwarder.go | 9 - pkg/ccl/sqlproxyccl/proxy_handler_test.go | 201 +++++++++++----------- 4 files changed, 129 insertions(+), 113 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index 85a6d2f6d7d9..60ca9b189b43 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -99,6 +99,7 @@ go_test( "@com_github_jackc_pgconn//:pgconn", "@com_github_jackc_pgproto3_v2//:pgproto3", "@com_github_jackc_pgx_v4//:pgx", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/pkg/ccl/sqlproxyccl/conn_migration.go b/pkg/ccl/sqlproxyccl/conn_migration.go index 13aa9c3b2e0e..c9015feb7839 100644 --- a/pkg/ccl/sqlproxyccl/conn_migration.go +++ b/pkg/ccl/sqlproxyccl/conn_migration.go @@ -105,7 +105,24 @@ func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) { var errTransferCannotStart = errors.New("transfer cannot be started") -func (f *forwarder) runTransfer() (retErr error) { +// TransferConnection attempts a best-effort connection migration to an +// available SQL pod based on the load-balancing algorithm. If a transfer has +// already been started, or the forwarder has been closed, this returns an +// error. This is a best-effort process because there could be a situation +// where the forwarder is not in a state that is eligible for a connection +// migration. +// +// NOTE: If the forwarder hasn't been closed, runTransfer has an invariant +// where the processors have been resumed prior to calling this method. When +// runTransfer returns, it is guaranteed that processors will either be +// re-resumed, or the forwarder will be closed (in the case of a non-recoverable +// error). +// +// TODO(jaylim-crl): It would be nice to introduce transfer policies in the +// future. That way, we could either transfer to another random SQL pod, or to +// a specific SQL pod. If we do that, TransferConnection would take in some kind +// of policy parameter(s). +func (f *forwarder) TransferConnection() (retErr error) { // A previous non-recoverable transfer would have closed the forwarder, so // return right away. if f.ctx.Err() != nil { @@ -131,6 +148,10 @@ func (f *forwarder) runTransfer() (retErr error) { // blocked I/Os as described above. go func() { <-ctx.Done() + // This Close call here in addition to the one in the defer callback + // below is on purpose. This would help unblock situations where we're + // blocked on sending/reading messages from connections that couldn't + // be handled with context.Context. if !ctx.isRecoverable() { f.Close() } @@ -151,9 +172,12 @@ func (f *forwarder) runTransfer() (retErr error) { latencyDur := timeutil.Since(tBegin) f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds()) + // When runTransfer returns, it's either the forwarder has been closed, + // or the procesors have been resumed. if !ctx.isRecoverable() { log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr) f.metrics.ConnMigrationErrorFatalCount.Inc(1) + f.Close() } else { // Transfer was successful. if retErr == nil { @@ -241,8 +265,9 @@ func transferConnection( // TODO(jaylim-crl): There is a possibility where the same pod will get // selected. Some ideas to solve this: pass in the remote address of // serverConn to avoid choosing that pod, or maybe a filter callback? - // We can also consider adding a target pod as an argument to RequestTransfer. - // That way a central component gets to choose where the connections go. + // We can also consider adding a target pod as an argument to + // TransferConnection. That way a central component gets to choose where the + // connections go. connectFn := connector.OpenTenantConnWithToken if transferConnectionConnectorTestHook != nil { connectFn = transferConnectionConnectorTestHook diff --git a/pkg/ccl/sqlproxyccl/forwarder.go b/pkg/ccl/sqlproxyccl/forwarder.go index 40d8ad1a01de..cb1535ea238a 100644 --- a/pkg/ccl/sqlproxyccl/forwarder.go +++ b/pkg/ccl/sqlproxyccl/forwarder.go @@ -148,15 +148,6 @@ func (f *forwarder) Close() { serverConn.Close() } -// RequestTransfer requests that the forwarder performs a best-effort connection -// migration whenever it can. It is best-effort because this will be a no-op if -// the forwarder is not in a state that is eligible for a connection migration. -// If a transfer is already in progress, or has been requested, this is a no-op. -func (f *forwarder) RequestTransfer() { - // Ignore the error here. These errors will be logged accordingly. - go func() { _ = f.runTransfer() }() -} - // resumeProcessors starts both the request and response processors // asynchronously. The forwarder will be closed if any of the processors // return an error while resuming. This is idempotent as resume() will return diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 934297c07be9..746bf7fc78c1 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -17,6 +17,7 @@ import ( "net" "os" "strings" + "sync" "sync/atomic" "testing" "time" @@ -46,6 +47,7 @@ import ( "github.com/cockroachdb/errors" pgproto3 "github.com/jackc/pgproto3/v2" pgx "github.com/jackc/pgx/v4" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -806,7 +808,7 @@ func TestConnectionMigration(t *testing.T) { } return nil } - _, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts) + proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts) // The tenant ID does not matter here since we stubbed RoutingRule. connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-28", addr) @@ -828,6 +830,21 @@ func TestConnectionMigration(t *testing.T) { return fmt.Sprintf("%s:%s", host, port) } + // validateMiscMetrics ensures that our invariant of + // attempts = success + error_recoverable + error_fatal is valid, and all + // other transfer related metrics were incremented as well. + validateMiscMetrics := func(t *testing.T) { + t.Helper() + totalAttempts := proxy.metrics.ConnMigrationSuccessCount.Count() + + proxy.metrics.ConnMigrationErrorRecoverableCount.Count() + + proxy.metrics.ConnMigrationErrorFatalCount.Count() + require.Equal(t, totalAttempts, proxy.metrics.ConnMigrationAttemptedCount.Count()) + require.Equal(t, totalAttempts, + proxy.metrics.ConnMigrationAttemptedLatency.TotalCount()) + require.Equal(t, totalAttempts, + proxy.metrics.ConnMigrationTransferResponseMessageSize.TotalCount()) + } + // Test that connection transfers are successful. Note that if one sub-test // fails, the remaining will fail as well since they all use the same // forwarder instance. @@ -878,10 +895,8 @@ func TestConnectionMigration(t *testing.T) { require.NoError(t, err) // Show that we get alternating SQL pods when we transfer. - f.RequestTransfer() - require.Eventually(t, func() bool { - return f.metrics.ConnMigrationSuccessCount.Count() == 1 - }, 30*time.Second, 100*time.Millisecond) + require.NoError(t, f.TransferConnection()) + require.Equal(t, int64(1), f.metrics.ConnMigrationSuccessCount.Count()) require.Equal(t, tenant2.SQLAddr(), queryAddr(t, tCtx, db)) var name string @@ -891,26 +906,29 @@ func TestConnectionMigration(t *testing.T) { _, err = db.Exec("SET application_name = 'bar'") require.NoError(t, err) - f.RequestTransfer() - require.Eventually(t, func() bool { - return f.metrics.ConnMigrationSuccessCount.Count() == 2 - }, 30*time.Second, 100*time.Millisecond) + require.NoError(t, f.TransferConnection()) + require.Equal(t, int64(2), f.metrics.ConnMigrationSuccessCount.Count()) require.Equal(t, tenant1.SQLAddr(), queryAddr(t, tCtx, db)) require.NoError(t, db.QueryRow("SHOW application_name").Scan(&name)) require.Equal(t, "bar", name) // Now attempt a transfer concurrently with requests. - closerCh := make(chan struct{}) + initSuccessCount := f.metrics.ConnMigrationSuccessCount.Count() + subCtx, cancel := context.WithCancel(tCtx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) go func() { - for i := 0; i < 10 && tCtx.Err() == nil; i++ { - f.RequestTransfer() - time.Sleep(500 * time.Millisecond) + defer wg.Done() + for subCtx.Err() == nil { + _ = f.TransferConnection() + time.Sleep(100 * time.Millisecond) } - closerCh <- struct{}{} }() - // This test runs for 5 seconds. + // This loop will run approximately 5 seconds. var tenant1Addr, tenant2Addr int for i := 0; i < 100; i++ { addr := queryAddr(t, tCtx, db) @@ -923,35 +941,23 @@ func TestConnectionMigration(t *testing.T) { time.Sleep(50 * time.Millisecond) } - // In 5s, we should have at least 10 successful transfers. Just do - // an approximation here. - require.Eventually(t, func() bool { - return f.metrics.ConnMigrationSuccessCount.Count() >= 5 - }, 30*time.Second, 100*time.Millisecond) - require.True(t, tenant1Addr > 2) - require.True(t, tenant2Addr > 2) - require.Equal(t, int64(0), f.metrics.ConnMigrationErrorRecoverableCount.Count()) - require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count()) - // Ensure that the goroutine terminates so other subtests are not // affected. - <-closerCh + cancel() + wg.Wait() - // There's a chance that we still have an in-progress transfer, so - // attempt to wait. - require.Eventually(t, func() bool { - f.mu.Lock() - defer f.mu.Unlock() - return !f.mu.isTransferring - }, 10*time.Second, 25*time.Millisecond) - - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationSuccessCount.Count(), - ) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationAttemptedLatency.TotalCount()) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount()) + // Ensure that some transfers were performed, and the forwarder isn't + // closed. + require.True(t, tenant1Addr >= 2) + require.True(t, tenant2Addr >= 2) + require.Nil(t, f.ctx.Err()) + + // Check metrics. + require.True(t, f.metrics.ConnMigrationSuccessCount.Count() > initSuccessCount+4) + require.Equal(t, int64(0), f.metrics.ConnMigrationErrorRecoverableCount.Count()) + require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count()) + + validateMiscMetrics(t) }) // Transfers should fail if there is an open transaction. These failed @@ -961,8 +967,15 @@ func TestConnectionMigration(t *testing.T) { initAddr := queryAddr(t, tCtx, db) err = crdb.ExecuteTx(tCtx, db, nil /* txopts */, func(tx *gosql.Tx) error { - for i := 0; i < 10; i++ { - f.RequestTransfer() + // Run multiple times to ensure that connection isn't closed. + for i := 0; i < 5; i++ { + err := f.TransferConnection() + if err == nil { + return errors.New("no error") + } + if !assert.Regexp(t, "cannot serialize", err.Error()) { + return errors.Wrap(err, "non-serialization error") + } addr := queryAddr(t, tCtx, tx) if initAddr != addr { return errors.Newf( @@ -971,44 +984,27 @@ func TestConnectionMigration(t *testing.T) { addr, ) } - time.Sleep(50 * time.Millisecond) } return nil }) require.NoError(t, err) - // Make sure there are no pending transfers. - func() { - f.mu.Lock() - defer f.mu.Unlock() - require.False(t, f.mu.isTransferring) - }() - - // Just check that we have half of what we requested since we cannot - // guarantee that the transfer will run within 50ms. - require.True(t, f.metrics.ConnMigrationErrorRecoverableCount.Count() >= 5) - require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count()) + // None of the migrations should succeed, and the forwarder should + // still be active. + require.Nil(t, f.ctx.Err()) require.Equal(t, initSuccessCount, f.metrics.ConnMigrationSuccessCount.Count()) - prevErrorRecoverableCount := f.metrics.ConnMigrationErrorRecoverableCount.Count() + require.Equal(t, int64(5), f.metrics.ConnMigrationErrorRecoverableCount.Count()) + require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count()) // Once the transaction is closed, transfers should work. - f.RequestTransfer() - require.Eventually(t, func() bool { - return f.metrics.ConnMigrationSuccessCount.Count() == initSuccessCount+1 - }, 30*time.Second, 100*time.Millisecond) + require.NoError(t, f.TransferConnection()) require.NotEqual(t, initAddr, queryAddr(t, tCtx, db)) - require.Equal(t, prevErrorRecoverableCount, f.metrics.ConnMigrationErrorRecoverableCount.Count()) + require.Nil(t, f.ctx.Err()) + require.Equal(t, initSuccessCount+1, f.metrics.ConnMigrationSuccessCount.Count()) + require.Equal(t, int64(5), f.metrics.ConnMigrationErrorRecoverableCount.Count()) require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count()) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationAttemptedLatency.TotalCount()) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount()) - - // We have already asserted metrics above, so transfer must have - // been completed. - f.mu.Lock() - defer f.mu.Unlock() - require.False(t, f.mu.isTransferring) + + validateMiscMetrics(t) }) // Transfer timeout caused by dial issues should not close the session. @@ -1023,23 +1019,18 @@ func TestConnectionMigration(t *testing.T) { lookupAddrDelayDuration = 10 * time.Second defer testutils.TestingHook(&defaultTransferTimeout, 3*time.Second)() - f.RequestTransfer() - require.Eventually(t, func() bool { - return f.metrics.ConnMigrationErrorRecoverableCount.Count() == initErrorRecoverableCount+1 - }, 30*time.Second, 100*time.Millisecond) + err := f.TransferConnection() + require.Error(t, err) + require.Regexp(t, "injected delays", err.Error()) require.Equal(t, initAddr, queryAddr(t, tCtx, db)) + require.Nil(t, f.ctx.Err()) + require.Equal(t, initSuccessCount, f.metrics.ConnMigrationSuccessCount.Count()) + require.Equal(t, initErrorRecoverableCount+1, + f.metrics.ConnMigrationErrorRecoverableCount.Count()) require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count()) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationAttemptedLatency.TotalCount()) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount()) - - // We have already asserted metrics above, so transfer must have - // been completed. - f.mu.Lock() - defer f.mu.Unlock() - require.False(t, f.mu.isTransferring) + + validateMiscMetrics(t) }) }) @@ -1083,6 +1074,9 @@ func TestConnectionMigration(t *testing.T) { t.Fatal("no connection") } + initSuccessCount := f.metrics.ConnMigrationSuccessCount.Count() + initErrorRecoverableCount := f.metrics.ConnMigrationErrorRecoverableCount.Count() + // Set up forwarder hooks. prevTenant1 := true f.connector.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) { @@ -1103,7 +1097,7 @@ func TestConnectionMigration(t *testing.T) { errCh := make(chan error, 1) go func() { goCh <- struct{}{} - _, err = conn.ExecContext(tCtx, "SELECT pg_sleep(10)") + _, err := conn.ExecContext(tCtx, "SELECT pg_sleep(10)") errCh <- err }() @@ -1115,34 +1109,39 @@ func TestConnectionMigration(t *testing.T) { // to make sure pg_sleep is running, but that seems unnecessary for just // one test. <-goCh - time.Sleep(250 * time.Millisecond) - f.RequestTransfer() + time.Sleep(2 * time.Second) + // This should be an error because the transfer timed out. + require.Error(t, f.TransferConnection()) // Connection should be closed because this is a non-recoverable error, // i.e. timeout after sending the request, but before fully receiving // its response. - require.Eventually(t, func() bool { - err := conn.PingContext(tCtx) - return err != nil && strings.Contains(err.Error(), "bad connection") - }, 30*time.Second, 100*time.Millisecond) + err = conn.PingContext(tCtx) + require.Error(t, err) + require.Regexp(t, "(closed|bad connection)", err.Error()) select { case <-time.After(10 * time.Second): t.Fatalf("require that pg_sleep query terminates") case err = <-errCh: - require.NotNil(t, err) - require.Regexp(t, "bad connection", err.Error()) + require.Error(t, err) + require.Regexp(t, "(closed|bad connection)", err.Error()) } - require.Eventually(t, func() bool { - return f.metrics.ConnMigrationErrorFatalCount.Count() == 1 - }, 30*time.Second, 100*time.Millisecond) - require.NotNil(t, f.ctx.Err()) - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(), - f.metrics.ConnMigrationAttemptedLatency.TotalCount()) + require.EqualError(t, f.ctx.Err(), context.Canceled.Error()) + require.Equal(t, initSuccessCount, f.metrics.ConnMigrationSuccessCount.Count()) + require.Equal(t, initErrorRecoverableCount, f.metrics.ConnMigrationErrorRecoverableCount.Count()) + require.Equal(t, int64(1), f.metrics.ConnMigrationErrorFatalCount.Count()) + + totalAttempts := f.metrics.ConnMigrationSuccessCount.Count() + + f.metrics.ConnMigrationErrorRecoverableCount.Count() + + f.metrics.ConnMigrationErrorFatalCount.Count() + require.Equal(t, totalAttempts, f.metrics.ConnMigrationAttemptedCount.Count()) + require.Equal(t, totalAttempts, + f.metrics.ConnMigrationAttemptedLatency.TotalCount()) // Here, we get a transfer timeout in response, so the message size // should not be recorded. - require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count()-1, + require.Equal(t, totalAttempts-1, f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount()) }) }