Skip to content

Commit

Permalink
domain,session: simplify the session pool of domain (#8456) (#8501)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and zz-jason committed Nov 29, 2018
1 parent 2d1a3d5 commit 51c2e66
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
69 changes: 66 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Domain struct {
ddl ddl.DDL
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *pools.ResourcePool
sysSessionPool *SessionPool
exit chan struct{}
etcdClient *clientv3.Client
wg sync.WaitGroup
Expand Down Expand Up @@ -495,7 +495,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
store: store,
SchemaValidator: NewSchemaValidator(ddlLease),
exit: make(chan struct{}),
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
sysSessionPool: newSessionPool(capacity, factory),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
}
Expand Down Expand Up @@ -561,8 +561,71 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
return nil
}

// SessionPool is a pool of session.
type SessionPool struct {
resources chan pools.Resource
factory pools.Factory
mu struct {
sync.RWMutex
closed bool
}
}

func newSessionPool(cap int, factory pools.Factory) *SessionPool {
return &SessionPool{
resources: make(chan pools.Resource, cap),
factory: factory,
}
}

// Get gets a resource from the pool.
func (p *SessionPool) Get() (resource pools.Resource, err error) {
var ok bool
select {
case resource, ok = <-p.resources:
if !ok {
err = errors.New("session pool closed")
}
default:
resource, err = p.factory()
}
return
}

// Put puts a resource into to pool.
func (p *SessionPool) Put(resource pools.Resource) {
p.mu.RLock()
defer p.mu.RUnlock()
if p.mu.closed {
resource.Close()
return
}

select {
case p.resources <- resource:
default:
resource.Close()
}
}

// Close closes the pool.
func (p *SessionPool) Close() {
p.mu.Lock()
if p.mu.closed {
p.mu.Unlock()
return
}
p.mu.closed = true
close(p.resources)
p.mu.Unlock()

for r := range p.resources {
r.Close()
}
}

// SysSessionPool returns the system session pool.
func (do *Domain) SysSessionPool() *pools.ResourcePool {
func (do *Domain) SysSessionPool() *SessionPool {
return do.sysSessionPool
}

Expand Down
7 changes: 6 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,12 @@ func sqlForLog(sql string) string {
return executor.QueryReplacer.Replace(sql)
}

func (s *session) sysSessionPool() *pools.ResourcePool {
type sessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
}

func (s *session) sysSessionPool() sessionPool {
return domain.GetDomain(s).SysSessionPool()
}

Expand Down
4 changes: 1 addition & 3 deletions session/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
c.Skip("make leak should check it")
// TODO: testleak package should be able to find this leak.
store, dom := newStoreWithBootstrap(c, s.dbName+"goroutine_leak")
defer dom.Close()
defer store.Close()
se, err := createSession(store)
c.Assert(err, IsNil)
Expand All @@ -162,8 +161,7 @@ func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
}(se)
}
wg.Wait()
se.sysSessionPool().Close()
c.Assert(se.sysSessionPool().IsClosed(), Equals, true)
dom.Close()
for i := 0; i < 300; i++ {
// After and before should be Equal, but this test may be disturbed by other factors.
// So I relax the strict check to make CI more stable.
Expand Down

0 comments on commit 51c2e66

Please sign in to comment.