Skip to content

Commit

Permalink
ccl/sqlproxyccl: fix inaccurate CurConnCount metric due to goroutine …
Browse files Browse the repository at this point in the history
…leak

Previously, there was a possibility where a processor can return from
resuming because the client's connection was closed _before_ waitResumed even
has the chance to wake up to check on the resumed field. When that happens,
the connection goroutine will be blocked forever, and the CurConnCount metric
will never be decremented, even if the connection has already been terminated.

When the client's connection was closed, the forwarder's context will be
cancelled as well. The ideal behavior would be to terminate all waiters when
that happens, but the current code does not do that. This commit fixes that
issue by sending a broadcast signal to all waiters in the processors whenever
the forwarder is closed.

Release note: None
  • Loading branch information
jaylim-crl committed Jun 9, 2022
1 parent ba73ede commit b5ff88f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 1 deletion.
32 changes: 32 additions & 0 deletions pkg/ccl/sqlproxyccl/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,29 @@ func (f *forwarder) Close() {
if serverConn != nil {
serverConn.Close()
}

// Broadcast all waiters in the processors. At this point, context has
// already been cancelled, so waking them up would terminate the waiter
// functions (e.g. waitResumed).
//
// This handles the following scenario:
// 1. Goroutine 1 invokes resumeProcessors, which starts up Goroutine 2
// and 3 to resume the individual processors.
// 2. Goroutine 1 invokes waitResumed on the request processor, and waits
// because the processor has not been resumed yet.
// 3. Goroutine 2 resumes the request processor, and broadcasts the
// condition variable. At this point, if the client closes the session,
// resume will return right away with an error, closing the forwarder.
// 4. Goroutine 1 wakes up due the broadcast in (3), checks that the
// resumed field is false, and goes back to wait. Goroutine 1 is now
// stuck because waiting forever, resulting in a leak.
// request, response := f.getProcessors()
// if request != nil {
// request.broadcast()
// }
// if response != nil {
// response.broadcast()
// }
}

// IsIdle returns true if the forwarder is idle, and false otherwise.
Expand Down Expand Up @@ -513,6 +536,15 @@ func (p *processor) resume(ctx context.Context) error {
return ctx.Err()
}

// broadcast sends a broadcast signal to the condition variable. This should
// only be called within the Close function of the forwarder to wake all
// blocked waiters after cancelling the forwarder's context.
func (p *processor) broadcast() {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.cond.Broadcast()
}

// waitResumed waits until the processor has been resumed. This can be used to
// ensure that suspend actually suspends the running processor, and there won't
// be a race where the goroutines have not started running, and suspend returns.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
if err := f.run(fe.Conn, crdbConn); err != nil {
// Don't send to the client here for the same reason below.
handler.metrics.updateForError(err)
return err
return errors.Wrap(err, "running forwarder")
}

// Block until an error is received, or when the stopper starts quiescing,
Expand Down
70 changes: 70 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,76 @@ func TestConnectionMigration(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
}

// TestCurConnCountMetric ensures that the CurConnCount metric is accurate.
// Previously, there was a regression where the CurConnCount metric wasn't
// decremented whenever the connections were closed due to a goroutine leak.
func TestCurConnCountMetric(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// Start KV server, and enable session migration.
params, _ := tests.CreateTestServerParams()
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

// Start a single SQL pod.
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, 1)
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register the SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
tds.CreateTenant(tenantID, "tenant-cluster")
tds.AddPod(tenantID, &tenant.Pod{
TenantID: tenantID.ToUint64(),
Addr: tenants[0].SQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})
require.NoError(t, tds.Start(ctx))

opts := &ProxyOptions{SkipVerify: true, DisableConnectionRebalancing: true}
opts.testingKnobs.directoryServer = tds
proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts)
connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-%s", addr, tenantID)

// Open 500 connections to the SQL pod.
const numConns = 500
var wg sync.WaitGroup
wg.Add(numConns)
for i := 0; i < numConns; i++ {
go func() {
defer wg.Done()

// Opens a new connection, runs SELECT 1, and closes it right away.
// Ignore all connection errors.
conn, err := pgx.Connect(ctx, connectionString)
if err != nil {
return
}
_ = conn.Ping(ctx)
conn.Close(ctx)
}()
}
wg.Wait()

// Ensure that the CurConnCount metric gets decremented to 0 whenever all
// the connections are closed.
testutils.SucceedsSoon(t, func() error {
val := proxy.metrics.CurConnCount.Value()
if val == 0 {
return nil
}
return errors.Newf("expected CurConnCount=0, but got %d", val)
})
}

func TestClusterNameAndTenantFromParams(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit b5ff88f

Please sign in to comment.