From 51c2e66bedc4f205c900b92aaf33ad5197f9f85c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 29 Nov 2018 17:36:10 +0800 Subject: [PATCH] domain,session: simplify the session pool of domain (#8456) (#8501) --- domain/domain.go | 69 ++++++++++++++++++++++++++++++++++++++++++-- session/session.go | 7 ++++- session/tidb_test.go | 4 +-- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 0763c93dc5ced..32aef3d5f9e3a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -57,7 +57,7 @@ type Domain struct { ddl ddl.DDL m sync.Mutex SchemaValidator SchemaValidator - sysSessionPool *pools.ResourcePool + sysSessionPool *SessionPool exit chan struct{} etcdClient *clientv3.Client wg sync.WaitGroup @@ -495,7 +495,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio store: store, SchemaValidator: NewSchemaValidator(ddlLease), exit: make(chan struct{}), - sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout), + sysSessionPool: newSessionPool(capacity, factory), statsLease: statsLease, infoHandle: infoschema.NewHandle(store), } @@ -561,8 +561,71 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R return nil } +// SessionPool is a pool of session. +type SessionPool struct { + resources chan pools.Resource + factory pools.Factory + mu struct { + sync.RWMutex + closed bool + } +} + +func newSessionPool(cap int, factory pools.Factory) *SessionPool { + return &SessionPool{ + resources: make(chan pools.Resource, cap), + factory: factory, + } +} + +// Get gets a resource from the pool. +func (p *SessionPool) Get() (resource pools.Resource, err error) { + var ok bool + select { + case resource, ok = <-p.resources: + if !ok { + err = errors.New("session pool closed") + } + default: + resource, err = p.factory() + } + return +} + +// Put puts a resource into to pool. +func (p *SessionPool) Put(resource pools.Resource) { + p.mu.RLock() + defer p.mu.RUnlock() + if p.mu.closed { + resource.Close() + return + } + + select { + case p.resources <- resource: + default: + resource.Close() + } +} + +// Close closes the pool. +func (p *SessionPool) Close() { + p.mu.Lock() + if p.mu.closed { + p.mu.Unlock() + return + } + p.mu.closed = true + close(p.resources) + p.mu.Unlock() + + for r := range p.resources { + r.Close() + } +} + // SysSessionPool returns the system session pool. -func (do *Domain) SysSessionPool() *pools.ResourcePool { +func (do *Domain) SysSessionPool() *SessionPool { return do.sysSessionPool } diff --git a/session/session.go b/session/session.go index 0732b0f5b079e..d3dfe67b4d527 100644 --- a/session/session.go +++ b/session/session.go @@ -555,7 +555,12 @@ func sqlForLog(sql string) string { return executor.QueryReplacer.Replace(sql) } -func (s *session) sysSessionPool() *pools.ResourcePool { +type sessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + +func (s *session) sysSessionPool() sessionPool { return domain.GetDomain(s).SysSessionPool() } diff --git a/session/tidb_test.go b/session/tidb_test.go index 0a41ff89e6a05..d2c3c085e8f4d 100644 --- a/session/tidb_test.go +++ b/session/tidb_test.go @@ -143,7 +143,6 @@ func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) { c.Skip("make leak should check it") // TODO: testleak package should be able to find this leak. store, dom := newStoreWithBootstrap(c, s.dbName+"goroutine_leak") - defer dom.Close() defer store.Close() se, err := createSession(store) c.Assert(err, IsNil) @@ -162,8 +161,7 @@ func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) { }(se) } wg.Wait() - se.sysSessionPool().Close() - c.Assert(se.sysSessionPool().IsClosed(), Equals, true) + dom.Close() for i := 0; i < 300; i++ { // After and before should be Equal, but this test may be disturbed by other factors. // So I relax the strict check to make CI more stable.