Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add session manager #217

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/vesoft-inc/nebula-go/v3
go 1.13

require (
github.com/bits-and-blooms/bitset v1.3.0
github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295
github.com/stretchr/testify v1.7.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/bits-and-blooms/bitset v1.3.0 h1:h7mv5q31cthBTd7V4kLAZaIThj1e8vPGcSqpPue9KVI=
github.com/bits-and-blooms/bitset v1.3.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 h1:ZA+qQ3d2In0RNzVpk+D/nq1sjDSv+s1Wy2zrAPQAmsg=
Expand Down
157 changes: 157 additions & 0 deletions session_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package nebula_go

import (
"crypto/tls"
"fmt"
"github.com/bits-and-blooms/bitset"
"sync"
)

type ManagerConfig struct {
username string
password string
spaceName string
addresses []HostAddress
poolConfig PoolConfig
sslConfig *tls.Config
}

type SessionManager struct {
config ManagerConfig
mutex sync.Mutex
isClose bool
log Logger
pool *ConnectionPool
idleBitSet *bitset.BitSet
idleSessionMap map[uint]*SessionWrapper
sessionMap map[int64]*SessionWrapper
}

func NewSessionManager(config ManagerConfig, log Logger) (*SessionManager, error) {
if len(config.spaceName) == 0 {
return nil, fmt.Errorf("fail to create session manager: space name can not be empty")
}
config.poolConfig.validateConf(log)

manager := &SessionManager{
config: config,
log: log,
}
err := manager.initManager()
if err != nil {
return nil, err
}
return manager, nil
}

func (m *SessionManager) initManager() error {

var pool *ConnectionPool
var err error
if m.config.sslConfig != nil {
pool, err = NewConnectionPool(m.config.addresses, m.config.poolConfig, m.log)
} else {
pool, err = NewSslConnectionPool(m.config.addresses, m.config.poolConfig, m.config.sslConfig, m.log)
}

if err != nil {
return err
}
m.pool = pool

// idleBitSet: bitset contains [config.poolConfig.MaxConnPoolSize] bits
// idleBitSet: [1, 0, 1, .... 0], set value of position-i to 1 means there is an idle wrapper-session available in idleSessionMap
m.idleBitSet = bitset.New(uint(m.config.poolConfig.MaxConnPoolSize))

// sessionMap : <index of idleBitSet: *SessionWrapper>
m.idleSessionMap = make(map[uint]*SessionWrapper)

// all session get from manager, <sessionId, *SessionWrapper>
m.sessionMap = make(map[int64]*SessionWrapper)

return nil
}

// GetSession get a wrapper session from sessionManager
func (m *SessionManager) GetSession() (*SessionWrapper, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.isClose {
return nil, fmt.Errorf("fail to get new seesion, session manager has closed")
}

// if there are any available session
if m.idleBitSet.Count() != 0 {
index, ok := m.idleBitSet.NextSet(0)
if ok {
m.idleBitSet.Clear(index)
session := m.idleSessionMap[index]
delete(m.idleSessionMap, index)
return session, nil
}
}

// create new Session from pool
session, err := m.pool.GetSession(m.config.username, m.config.password)
if err != nil {
return nil, err
}

// change space
result, err := session.Execute("USE " + m.config.spaceName)
if err != nil {
return nil, err
}
if !result.IsSucceed() {
return nil, fmt.Errorf("fail to get new seesion, change space error")
}

sessionWrapper := &SessionWrapper{session: session, sessionManager: m, sessionId: session.GetSessionID()}
m.sessionMap[session.GetSessionID()] = sessionWrapper
return sessionWrapper, nil
}

// ReleaseSession release wrapper session to sessionManager
func (m *SessionManager) ReleaseSession(sessionWrapper *SessionWrapper) {
m.mutex.Lock()
defer m.mutex.Unlock()

// do nothing
if sessionWrapper == nil || sessionWrapper.isRelease || m.isClose {
return
}

// session is not created by manager, just return to pool
if _, ok := m.sessionMap[sessionWrapper.session.GetSessionID()]; !ok {
sessionWrapper.session.Release()
return
}

// acquire position from idleBitSet
setIndex, ok := m.idleBitSet.NextClear(0)
if !ok {
sessionWrapper.session.Release()
} else {
m.idleBitSet.Set(setIndex)
newWrapper := &SessionWrapper{session: sessionWrapper.session, sessionManager: m, sessionId: sessionWrapper.sessionId}
m.idleSessionMap[setIndex] = newWrapper
}
sessionWrapper.isRelease = true
sessionWrapper.session = nil
sessionWrapper.sessionManager = nil
}

// Close mark all wrapper session to release status and close connection pool
func (m *SessionManager) Close() {
m.mutex.Lock()
defer m.mutex.Unlock()

for _, v := range m.sessionMap {
v.isRelease = true
}
m.sessionMap = nil
m.idleSessionMap = nil
m.pool.Close()
m.isClose = true
}
178 changes: 178 additions & 0 deletions session_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package nebula_go

import (
"fmt"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func GetManagerConfig(minPoolSize int, maxPoolSize int) ManagerConfig {
return ManagerConfig{
username: "root",
password: "nebula",
addresses: []HostAddress{
{
Host: "127.0.0.1",
Port: 3699,
},
},
// schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/
spaceName: "basketballplayer",
poolConfig: GetPoolConfig(minPoolSize, maxPoolSize),
}
}

func GetPoolConfig(minPoolSize int, maxPoolSize int) PoolConfig {
return PoolConfig{
MinConnPoolSize: minPoolSize,
MaxConnPoolSize: maxPoolSize,
IdleTime: 0,
}
}

func TestSessionManager(t *testing.T) {
err := InitData()
if err != nil {
t.Fatalf(err.Error())
}
manager, err := NewSessionManager(GetManagerConfig(1, 2), DefaultLogger{})
if err != nil {
t.Fatalf("create session manager error, %s", err.Error())
}
defer manager.Close()
session, err := manager.GetSession()
if err != nil {
t.Fatalf("fail to get session from session manager, %s", err.Error())
}
// schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/
result, err := session.Execute("GO FROM \"player101\" OVER follow YIELD id($$);")
if err != nil || !result.IsSucceed() {
t.Fatalf("execute statment fails")
}
assert.True(t, len(result.GetRows()) != 0)
}

func BenchmarkSessionManager(b *testing.B) {
skipBenchmark(b)
err := InitData()
if err != nil {
b.Fatalf(err.Error())
}
manager, err := NewSessionManager(GetManagerConfig(10, 20), DefaultLogger{})
if err != nil {
b.Fail()
return
}
defer manager.Close()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
session, err := manager.GetSession()
if err != nil {
fmt.Sprintf("get session err: %v", err)
panic(err)
}

result, err := session.Execute("GO FROM \"player101\" OVER follow YIELD id($$)")
if err != nil || !result.IsSucceed() {
fmt.Sprintf("execute statment err: %v", err)
session.Release()
panic(err)
}
session.Release()
}
})
}

func BenchmarkSessionPool(b *testing.B) {
skipBenchmark(b)
err := InitData()
if err != nil {
b.Fatalf(err.Error())
}
config := GetManagerConfig(10, 20)
pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{})
if err != nil {
b.Fail()
return
}
defer pool.Close()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
session, err := pool.GetSession(config.username, config.password)
if err != nil {
fmt.Sprintf("get session err: %v", err)
panic(err)
}

result, err := session.Execute("USE basketballplayer;GO FROM \"player101\" OVER follow YIELD id($$);")
if err != nil || !result.IsSucceed() {
fmt.Sprintf("execute statment err: %v", err)
session.Release()
panic(err)
}
session.Release()
}
})
}

func InitData() error {
config := GetManagerConfig(10, 20)
pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{})
if err != nil {
return fmt.Errorf("test session manager, init data fail, %s", err.Error())
}
defer pool.Close()

session, err := pool.GetSession("root", "nebula")
defer session.Release()
if err != nil {
return fmt.Errorf("test session manager, init data, get session fail, %s", err.Error())
}

schema := "CREATE SPACE IF NOT EXISTS basketballplayer(partition_num=1, replica_factor=1, vid_type=fixed_string(30));" +
"USE basketballplayer;" +
"CREATE TAG IF NOT EXISTS player(name string, age int);" +
"CREATE EDGE IF NOT EXISTS follow(degree int);"

_, err = Execute(session, schema)
if err != nil {
return fmt.Errorf("test session manager, init data schema fail, %s", err.Error())
}

dataStatement := "INSERT VERTEX player(name, age) VALUES \"player101\":(\"Tony Parker\", 36), \"player100\":(\"Tim Duncan\", 42);"
_, err = Execute(session, dataStatement)
if err != nil {
return fmt.Errorf("test session manager, init data fail, %s", err.Error())
}

time.Sleep(2 * time.Second)

dataStatement = "INSERT EDGE follow(degree) VALUES \"player101\" -> \"player100\":(95);"
_, err = Execute(session, dataStatement)
if err != nil {
return fmt.Errorf("test session manager, init data fail, %s", err.Error())
}
return nil
}

func skipBenchmark(b *testing.B) {
if os.Getenv("session_manager_benchmark") != "true" {
b.Skip("skip session manager benchmark testing")
}
}

func Execute(session *Session, query string) (resp *ResultSet, err error) {
for i := 3; i > 0; i-- {
resp, err = session.Execute(query)
if err == nil && resp.IsSucceed() {
return
}
time.Sleep(2 * time.Second)
}
return
}
Loading