Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49073
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
bb7133 authored and ti-chi-bot committed Jan 5, 2024
1 parent 63e621a commit 32321ac
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 21 deletions.
46 changes: 25 additions & 21 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ type clientConn struct {
lastActive time.Time // last active time
authPlugin string // default authentication plugin
isUnixSocket bool // connection is Unix Socket file
rsEncoder *column.ResultEncoder // rsEncoder is used to encode the string result to different charsets.
inputDecoder *util2.InputDecoder // inputDecoder is used to decode the different charsets of incoming strings to utf-8.
closeOnce sync.Once // closeOnce is used to make sure clientConn closes only once
rsEncoder *column.ResultEncoder // rsEncoder is used to encode the string result to different charsets
inputDecoder *util2.InputDecoder // inputDecoder is used to decode the different charsets of incoming strings to utf-8
socketCredUID uint32 // UID from the other end of the Unix Socket
// mu is used for cancelling the execution of current transaction.
mu struct {
Expand Down Expand Up @@ -339,28 +340,31 @@ func (cc *clientConn) Close() error {
return closeConn(cc, connections)
}

// closeConn should be idempotent.
// closeConn is idempotent and thread-safe.
// It will be called on the same `clientConn` more than once to avoid connection leak.
func closeConn(cc *clientConn, connections int) error {
metrics.ConnGauge.Set(float64(connections))
if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
}
if cc.bufReadConn != nil {
err := cc.bufReadConn.Close()
if err != nil {
// We need to expect connection might have already disconnected.
// This is because closeConn() might be called after a connection read-timeout.
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
var err error
cc.closeOnce.Do(func() {
metrics.ConnGauge.Set(float64(connections))
if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
}
if cc.bufReadConn != nil {
err := cc.bufReadConn.Close()
if err != nil {
// We need to expect connection might have already disconnected.
// This is because closeConn() might be called after a connection read-timeout.
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
}
}
}
// Close statements and session
// This will release advisory locks, row locks, etc.
if ctx := cc.getCtx(); ctx != nil {
return ctx.Close()
}
return nil
// Close statements and session
// This will release advisory locks, row locks, etc.
if ctx := cc.getCtx(); ctx != nil {
err = ctx.Close()
}
})
return err
}

func (cc *clientConn) closeWithoutLock() error {
Expand Down
96 changes: 96 additions & 0 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1998,3 +1999,98 @@ func TestEmptyOrgName(t *testing.T) {

testDispatch(t, inputs, 0)
}
<<<<<<< HEAD
=======

func TestStats(t *testing.T) {
var outBuffer bytes.Buffer

store := testkit.CreateMockStore(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
server, err := NewServer(cfg, drv)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)

cc := &clientConn{
connectionID: 1,
salt: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A,
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14,
},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}

// No compression
vars := tk.Session().GetSessionVars()
m, err := cc.Stats(vars)
require.NoError(t, err)
require.Equal(t, "OFF", m["Compression"])
require.Equal(t, "", m["Compression_algorithm"])
require.Equal(t, 0, m["Compression_level"])

// zlib compression
vars.CompressionAlgorithm = mysql.CompressionZlib
m, err = cc.Stats(vars)
require.NoError(t, err)
require.Equal(t, "ON", m["Compression"])
require.Equal(t, "zlib", m["Compression_algorithm"])
require.Equal(t, mysql.ZlibCompressDefaultLevel, m["Compression_level"])

// zstd compression, with level 1
vars.CompressionAlgorithm = mysql.CompressionZstd
vars.CompressionLevel = 1
m, err = cc.Stats(vars)
require.NoError(t, err)
require.Equal(t, "ON", m["Compression"])
require.Equal(t, "zstd", m["Compression_algorithm"])
require.Equal(t, 1, m["Compression_level"])
}

func TestCloseConn(t *testing.T) {
var outBuffer bytes.Buffer

store, _ := testkit.CreateMockStoreAndDomain(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
server, err := NewServer(cfg, drv)
require.NoError(t, err)

cc := &clientConn{
connectionID: 0,
salt: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A,
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14,
},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}

var wg sync.WaitGroup
const numGoroutines = 10
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
err := closeConn(cc, 1)
require.NoError(t, err)
}()
}
wg.Wait()
}
>>>>>>> 43825796a66 (server: make `clientConn()` thread-safe (#49073))

0 comments on commit 32321ac

Please sign in to comment.