diff --git a/pkg/ccl/sqlproxyccl/forwarder.go b/pkg/ccl/sqlproxyccl/forwarder.go index 247498d19835..6fc5315f8444 100644 --- a/pkg/ccl/sqlproxyccl/forwarder.go +++ b/pkg/ccl/sqlproxyccl/forwarder.go @@ -225,6 +225,29 @@ func (f *forwarder) Close() { if serverConn != nil { serverConn.Close() } + + // Broadcast all waiters in the processors. At this point, context has + // already been cancelled, so waking them up would terminate the waiter + // functions (e.g. waitResumed). + // + // This handles the following scenario: + // 1. Goroutine 1 invokes resumeProcessors, which starts up Goroutine 2 + // and 3 to resume the individual processors. + // 2. Goroutine 1 invokes waitResumed on the request processor, and waits + // because the processor has not been resumed yet. + // 3. Goroutine 2 resumes the request processor, and broadcasts the + // condition variable. At this point, if the client closes the session, + // resume will return right away with an error, closing the forwarder. + // 4. Goroutine 1 wakes up due the broadcast in (3), checks that the + // resumed field is false, and goes back to wait. Goroutine 1 is now + // stuck because waiting forever, resulting in a leak. + // request, response := f.getProcessors() + // if request != nil { + // request.broadcast() + // } + // if response != nil { + // response.broadcast() + // } } // IsIdle returns true if the forwarder is idle, and false otherwise. @@ -513,6 +536,15 @@ func (p *processor) resume(ctx context.Context) error { return ctx.Err() } +// broadcast sends a broadcast signal to the condition variable. This should +// only be called within the Close function of the forwarder to wake all +// blocked waiters after cancelling the forwarder's context. +func (p *processor) broadcast() { + p.mu.Lock() + defer p.mu.Unlock() + p.mu.cond.Broadcast() +} + // waitResumed waits until the processor has been resumed. This can be used to // ensure that suspend actually suspends the running processor, and there won't // be a race where the goroutines have not started running, and suspend returns. diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index 4f13591a54db..31dba30c3c4d 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -407,7 +407,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn if err := f.run(fe.Conn, crdbConn); err != nil { // Don't send to the client here for the same reason below. handler.metrics.updateForError(err) - return err + return errors.Wrap(err, "running forwarder") } // Block until an error is received, or when the stopper starts quiescing, diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 18ce1a8fc0e9..d5850ea50175 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -1325,6 +1325,76 @@ func TestConnectionMigration(t *testing.T) { }, 10*time.Second, 100*time.Millisecond) } +// TestCurConnCountMetric ensures that the CurConnCount metric is accurate. +// Previously, there was a regression where the CurConnCount metric wasn't +// decremented whenever the connections were closed due to a goroutine leak. +func TestCurConnCountMetric(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Start KV server, and enable session migration. + params, _ := tests.CreateTestServerParams() + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + // Start a single SQL pod. + tenantID := serverutils.TestTenantID() + tenants := startTestTenantPods(ctx, t, s, tenantID, 1) + defer func() { + for _, tenant := range tenants { + tenant.Stopper().Stop(ctx) + } + }() + + // Register the SQL pod in the directory server. + tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */) + tds.CreateTenant(tenantID, "tenant-cluster") + tds.AddPod(tenantID, &tenant.Pod{ + TenantID: tenantID.ToUint64(), + Addr: tenants[0].SQLAddr(), + State: tenant.RUNNING, + StateTimestamp: timeutil.Now(), + }) + require.NoError(t, tds.Start(ctx)) + + opts := &ProxyOptions{SkipVerify: true, DisableConnectionRebalancing: true} + opts.testingKnobs.directoryServer = tds + proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts) + connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-%s", addr, tenantID) + + // Open 500 connections to the SQL pod. + const numConns = 500 + var wg sync.WaitGroup + wg.Add(numConns) + for i := 0; i < numConns; i++ { + go func() { + defer wg.Done() + + // Opens a new connection, runs SELECT 1, and closes it right away. + // Ignore all connection errors. + conn, err := pgx.Connect(ctx, connectionString) + if err != nil { + return + } + _ = conn.Ping(ctx) + conn.Close(ctx) + }() + } + wg.Wait() + + // Ensure that the CurConnCount metric gets decremented to 0 whenever all + // the connections are closed. + testutils.SucceedsSoon(t, func() error { + val := proxy.metrics.CurConnCount.Value() + if val == 0 { + return nil + } + return errors.Newf("expected CurConnCount=0, but got %d", val) + }) +} + func TestClusterNameAndTenantFromParams(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)