Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: ccl/sqlproxyccl: fix inaccurate CurConnCount metric due to goroutine leak #82685

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions pkg/ccl/sqlproxyccl/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (f *forwarder) Context() context.Context {
//
// Close implements the balancer.ConnectionHandle interface.
func (f *forwarder) Close() {
// Cancelling the forwarder's context and connections will automatically
// cause the processors to exit, and close themselves.
f.ctxCancel()

// Whenever Close is called while both of the processors are suspended, the
Expand Down Expand Up @@ -389,7 +391,10 @@ func makeLogicalClockFn() func() uint64 {
// cancellation of dials.
var aLongTimeAgo = timeutil.Unix(1, 0)

var errProcessorResumed = errors.New("processor has already been resumed")
var (
errProcessorResumed = errors.New("processor has already been resumed")
errProcessorClosed = errors.New("processor has been closed")
)

// processor must always be constructed through newProcessor.
type processor struct {
Expand All @@ -402,6 +407,7 @@ type processor struct {
mu struct {
syncutil.Mutex
cond *sync.Cond
closed bool
resumed bool
inPeek bool
suspendReq bool // Indicates that a suspend has been requested.
Expand All @@ -424,13 +430,15 @@ func newProcessor(logicalClockFn func() uint64, src, dst *interceptor.PGConn) *p

// resume starts the processor and blocks during the processing. When the
// processing has been terminated, this returns nil if the processor can be
// resumed again in the future. If an error (except errProcessorResumed) was
// returned, the processor should not be resumed again, and the forwarder should
// be closed.
func (p *processor) resume(ctx context.Context) error {
// resumed again in the future. If an error was returned, the processor should
// not be resumed again, and the forwarder must be closed.
func (p *processor) resume(ctx context.Context) (retErr error) {
enterResume := func() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.mu.closed {
return errProcessorClosed
}
if p.mu.resumed {
return errProcessorResumed
}
Expand All @@ -441,6 +449,10 @@ func (p *processor) resume(ctx context.Context) error {
exitResume := func() {
p.mu.Lock()
defer p.mu.Unlock()
// If there's an error, close the processor.
if retErr != nil {
p.mu.closed = true
}
p.mu.resumed = false
p.mu.cond.Broadcast()
}
Expand Down Expand Up @@ -495,6 +507,9 @@ func (p *processor) resume(ctx context.Context) error {
}

if err := enterResume(); err != nil {
if errors.Is(err, errProcessorResumed) {
return nil
}
return err
}
defer exitResume()
Expand Down Expand Up @@ -524,6 +539,9 @@ func (p *processor) waitResumed(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
}
if p.mu.closed {
return errProcessorClosed
}
p.mu.cond.Wait()
}
return nil
Expand All @@ -536,6 +554,11 @@ func (p *processor) suspend(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()

// If the processor has been closed, it cannot be suspended at all.
if p.mu.closed {
return errProcessorClosed
}

defer func() {
if p.mu.suspendReq {
p.mu.suspendReq = false
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/sqlproxyccl/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,15 @@ func TestSuspendResumeProcessor(t *testing.T) {
interceptor.NewPGConn(serverProxy),
)
require.EqualError(t, p.resume(ctx), context.Canceled.Error())
p.mu.Lock()
require.True(t, p.mu.closed)
p.mu.Unlock()

// Set resumed to true to simulate suspend loop.
p.mu.Lock()
p.mu.resumed = true
p.mu.Unlock()
require.EqualError(t, p.suspend(ctx), context.Canceled.Error())
require.EqualError(t, p.suspend(ctx), errProcessorClosed.Error())
})

t.Run("wait_for_resumed", func(t *testing.T) {
Expand Down Expand Up @@ -586,15 +589,15 @@ func TestSuspendResumeProcessor(t *testing.T) {
interceptor.NewPGConn(serverProxy),
)

// Ensure that everything will return a resumed error except 1.
// Ensure that two resume calls will return right away.
errCh := make(chan error, 2)
go func() { errCh <- p.resume(ctx) }()
go func() { errCh <- p.resume(ctx) }()
go func() { errCh <- p.resume(ctx) }()
err := <-errCh
require.EqualError(t, err, errProcessorResumed.Error())
require.NoError(t, err)
err = <-errCh
require.EqualError(t, err, errProcessorResumed.Error())
require.NoError(t, err)

// Suspend the last goroutine.
err = p.waitResumed(ctx)
Expand All @@ -604,7 +607,7 @@ func TestSuspendResumeProcessor(t *testing.T) {

// Validate suspension.
err = <-errCh
require.Nil(t, err)
require.NoError(t, err)
p.mu.Lock()
require.False(t, p.mu.resumed)
require.False(t, p.mu.inPeek)
Expand Down Expand Up @@ -694,10 +697,7 @@ func TestSuspendResumeProcessor(t *testing.T) {
// Wait until all resume calls except 1 have returned.
for i := 0; i < concurrency-1; i++ {
err := <-errResumeCh
// If error is not nil, it has to be an already resumed error.
if err != nil {
require.EqualError(t, err, errProcessorResumed.Error())
}
require.NoError(t, err)
}

// Wait until the last one returns. We can guarantee that this is for
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
if err := f.run(fe.Conn, crdbConn); err != nil {
// Don't send to the client here for the same reason below.
handler.metrics.updateForError(err)
return err
return errors.Wrap(err, "running forwarder")
}

// Block until an error is received, or when the stopper starts quiescing,
Expand Down
70 changes: 70 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,76 @@ func TestConnectionMigration(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
}

// TestCurConnCountMetric ensures that the CurConnCount metric is accurate.
// Previously, there was a regression where the CurConnCount metric wasn't
// decremented whenever the connections were closed due to a goroutine leak.
func TestCurConnCountMetric(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// Start KV server.
params, _ := tests.CreateTestServerParams()
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

// Start a single SQL pod.
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, 1)
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register the SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
tds.CreateTenant(tenantID, "tenant-cluster")
tds.AddPod(tenantID, &tenant.Pod{
TenantID: tenantID.ToUint64(),
Addr: tenants[0].SQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})
require.NoError(t, tds.Start(ctx))

opts := &ProxyOptions{SkipVerify: true, DisableConnectionRebalancing: true}
opts.testingKnobs.directoryServer = tds
proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts)
connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-%s", addr, tenantID)

// Open 500 connections to the SQL pod.
const numConns = 500
var wg sync.WaitGroup
wg.Add(numConns)
for i := 0; i < numConns; i++ {
go func() {
defer wg.Done()

// Opens a new connection, runs SELECT 1, and closes it right away.
// Ignore all connection errors.
conn, err := pgx.Connect(ctx, connectionString)
if err != nil {
return
}
_ = conn.Ping(ctx)
_ = conn.Close(ctx)
}()
}
wg.Wait()

// Ensure that the CurConnCount metric gets decremented to 0 whenever all
// the connections are closed.
testutils.SucceedsSoon(t, func() error {
val := proxy.metrics.CurConnCount.Value()
if val == 0 {
return nil
}
return errors.Newf("expected CurConnCount=0, but got %d", val)
})
}

func TestClusterNameAndTenantFromParams(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down