From 366e68637a73429a7e6dc8ff925775574aa8e9cc Mon Sep 17 00:00:00 2001 From: horoc Date: Wed, 10 Aug 2022 22:42:07 +0800 Subject: [PATCH 1/4] add session manager --- go.mod | 1 + go.sum | 2 + session_manager.go | 157 ++++++++++++++++++++++++++++++++++++++++ session_manager_test.go | 110 ++++++++++++++++++++++++++++ session_wrapper.go | 51 +++++++++++++ 5 files changed, 321 insertions(+) create mode 100644 session_manager.go create mode 100644 session_manager_test.go create mode 100644 session_wrapper.go diff --git a/go.mod b/go.mod index d858ca15..3d2d2b01 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/vesoft-inc/nebula-go/v3 go 1.13 require ( + github.com/bits-and-blooms/bitset v1.3.0 github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 github.com/stretchr/testify v1.7.0 ) diff --git a/go.sum b/go.sum index e449129e..4521d7f4 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/bits-and-blooms/bitset v1.3.0 h1:h7mv5q31cthBTd7V4kLAZaIThj1e8vPGcSqpPue9KVI= +github.com/bits-and-blooms/bitset v1.3.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 h1:ZA+qQ3d2In0RNzVpk+D/nq1sjDSv+s1Wy2zrAPQAmsg= diff --git a/session_manager.go b/session_manager.go new file mode 100644 index 00000000..1387e771 --- /dev/null +++ b/session_manager.go @@ -0,0 +1,157 @@ +package nebula_go + +import ( + "crypto/tls" + "fmt" + "github.com/bits-and-blooms/bitset" + "sync" +) + +type ManagerConfig struct { + username string + password string + spaceName string + addresses []HostAddress + poolConfig PoolConfig + sslConfig *tls.Config +} + +type SessionManager struct { + config ManagerConfig + mutex sync.Mutex + isClose bool + log Logger + pool *ConnectionPool + idleBitSet *bitset.BitSet + idleSessionMap map[uint]*SessionWrapper + sessionMap map[int64]*SessionWrapper +} + +func NewSessionManager(config ManagerConfig, log Logger) (*SessionManager, error) { + if len(config.spaceName) == 0 { + return nil, fmt.Errorf("fail to create session manager: space name can not be empty") + } + config.poolConfig.validateConf(log) + + manager := &SessionManager{ + config: config, + log: log, + } + err := manager.initManager() + if err != nil { + return nil, err + } + return manager, nil +} + +func (m *SessionManager) initManager() error { + + var pool *ConnectionPool + var err error + if m.config.sslConfig != nil { + pool, err = NewConnectionPool(m.config.addresses, m.config.poolConfig, m.log) + } else { + pool, err = NewSslConnectionPool(m.config.addresses, m.config.poolConfig, m.config.sslConfig, m.log) + } + + if err != nil { + return err + } + m.pool = pool + + // idleBitSet: bitset contains [config.poolConfig.MaxConnPoolSize] bits + // idleBitSet: [1, 0, 1, .... 0], set value of position-i to 1 means there is an idle wrapper-session available in idleSessionMap + m.idleBitSet = bitset.New(uint(m.config.poolConfig.MaxConnPoolSize)) + + // sessionMap : + m.idleSessionMap = make(map[uint]*SessionWrapper) + + // all session get from manager, + m.sessionMap = make(map[int64]*SessionWrapper) + + return nil +} + +// GetSession get a wrapper session from sessionManager +func (m *SessionManager) GetSession() (*SessionWrapper, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.isClose { + return nil, fmt.Errorf("fail to get new seesion, session manager has closed") + } + + // if there are any available session + if m.idleBitSet.Count() != 0 { + index, ok := m.idleBitSet.NextSet(0) + if ok { + m.idleBitSet.Clear(index) + session := m.idleSessionMap[index] + delete(m.idleSessionMap, index) + return session, nil + } + } + + // create new Session from pool + session, err := m.pool.GetSession(m.config.username, m.config.password) + if err != nil { + return nil, err + } + + // change space + result, err := session.Execute("USE " + m.config.spaceName) + if err != nil { + return nil, err + } + if !result.IsSucceed() { + return nil, fmt.Errorf("fail to get new seesion, change space error: %s", err.Error()) + } + + sessionWrapper := &SessionWrapper{session: session, sessionManager: m, sessionId: session.GetSessionID()} + m.sessionMap[session.GetSessionID()] = sessionWrapper + return sessionWrapper, nil +} + +// ReleaseSession release wrapper session to sessionManager +func (m *SessionManager) ReleaseSession(sessionWrapper *SessionWrapper) { + m.mutex.Lock() + defer m.mutex.Unlock() + + // do nothing + if sessionWrapper == nil || sessionWrapper.isRelease || m.isClose { + return + } + + // session is not created by manager, just return to pool + if _, ok := m.sessionMap[sessionWrapper.session.GetSessionID()]; !ok { + sessionWrapper.session.Release() + return + } + + // acquire position from idleBitSet + setIndex, ok := m.idleBitSet.NextClear(0) + if !ok { + sessionWrapper.session.Release() + } else { + m.idleBitSet.Set(setIndex) + newWrapper := &SessionWrapper{session: sessionWrapper.session, sessionManager: m, sessionId: sessionWrapper.sessionId} + m.idleSessionMap[setIndex] = newWrapper + } + sessionWrapper.isRelease = true + sessionWrapper.session = nil + sessionWrapper.sessionManager = nil +} + +// Close mark all wrapper session to release status and close connection pool +func (m *SessionManager) Close() { + m.mutex.Lock() + defer m.mutex.Unlock() + + for _, v := range m.sessionMap { + v.isRelease = true + } + m.sessionMap = nil + m.idleSessionMap = nil + m.pool.Close() + m.isClose = true +} diff --git a/session_manager_test.go b/session_manager_test.go new file mode 100644 index 00000000..688f4094 --- /dev/null +++ b/session_manager_test.go @@ -0,0 +1,110 @@ +package nebula_go + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func GetManagerConfig(minPoolSize int, maxPoolSize int) ManagerConfig { + return ManagerConfig{ + username: "root", + password: "", + addresses: []HostAddress{ + { + Host: "127.0.0.1", + Port: 9669, + }, + }, + // schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/ + spaceName: "basketballplayer", + poolConfig: GetPoolConfig(minPoolSize, maxPoolSize), + } +} + +func GetPoolConfig(minPoolSize int, maxPoolSize int) PoolConfig { + return PoolConfig{ + MinConnPoolSize: minPoolSize, + MaxConnPoolSize: maxPoolSize, + IdleTime: 0, + } +} + +func TestSessionManager(t *testing.T) { + manager, err := NewSessionManager(GetManagerConfig(1, 2), DefaultLogger{}) + if err != nil { + t.Fail() + return + } + defer manager.Close() + session, err := manager.GetSession() + if err != nil { + t.Fail() + return + } + // schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/ + result, err := session.Execute("GO FROM \"player101\" OVER follow YIELD id($$);") + if err != nil || !result.IsSucceed() { + t.Fail() + return + } + assert.True(t, len(result.GetRows()) != 0) +} + +// go test -bench=SessionManager -benchtime=10000x -run=^a +func BenchmarkSessionManager(b *testing.B) { + manager, err := NewSessionManager(GetManagerConfig(10, 20), DefaultLogger{}) + if err != nil { + b.Fail() + return + } + defer manager.Close() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + session, err := manager.GetSession() + if err != nil { + fmt.Sprintf("get session err: %v", err) + panic(err) + } + + result, err := session.Execute("GO FROM \"player101\" OVER follow YIELD id($$)") + if err != nil || !result.IsSucceed() { + fmt.Sprintf("execute statment err: %v", err) + session.Release() + panic(err) + } + session.Release() + } + }) +} + +// go test -bench=SessionPool -benchtime=10000x -run=^a +func BenchmarkSessionPool(b *testing.B) { + config := GetManagerConfig(10, 20) + pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{}) + if err != nil { + b.Fail() + return + } + defer pool.Close() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + session, err := pool.GetSession(config.username, config.password) + if err != nil { + fmt.Sprintf("get session err: %v", err) + panic(err) + } + + result, err := session.Execute("USE basketballplayer;GO FROM \"player101\" OVER follow YIELD id($$);") + if err != nil || !result.IsSucceed() { + fmt.Sprintf("execute statment err: %v", err) + session.Release() + panic(err) + } + session.Release() + } + }) +} diff --git a/session_wrapper.go b/session_wrapper.go new file mode 100644 index 00000000..9b5cdb0a --- /dev/null +++ b/session_wrapper.go @@ -0,0 +1,51 @@ +package nebula_go + +import "fmt" + +type SessionWrapper struct { + session *Session + sessionManager *SessionManager + sessionId int64 + isRelease bool +} + +func (w *SessionWrapper) GetSessionID() (int64, error) { + if w.isRelease { + return -1, fmt.Errorf("can not get session id of a released session wrapper") + } + return w.session.sessionID, nil +} + +func (w *SessionWrapper) Release() { + if !w.isRelease { + w.sessionManager.ReleaseSession(w) + } +} + +func (w *SessionWrapper) ExecuteWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { + if w.isRelease { + return nil, fmt.Errorf("can not execute statement by a released session wrapper") + } + return w.session.ExecuteWithParameter(stmt, params) +} + +func (w *SessionWrapper) Execute(stmt string) (*ResultSet, error) { + if w.isRelease { + return nil, fmt.Errorf("can not execute statement by a released session wrapper") + } + return w.ExecuteWithParameter(stmt, map[string]interface{}{}) +} + +func (w *SessionWrapper) ExecuteJson(stmt string) ([]byte, error) { + if w.isRelease { + return nil, fmt.Errorf("can not execute statement by a released session wrapper") + } + return w.session.ExecuteJsonWithParameter(stmt, map[string]interface{}{}) +} + +func (w *SessionWrapper) ExecuteJsonWithParameter(stmt string, params map[string]interface{}) ([]byte, error) { + if w.isRelease { + return nil, fmt.Errorf("can not execute statement by a released session wrapper") + } + return w.ExecuteJsonWithParameter(stmt, params) +} From f08bd3813ee7793fd4a3f31ecde74d67add1ab03 Mon Sep 17 00:00:00 2001 From: horoc Date: Thu, 11 Aug 2022 22:35:01 +0800 Subject: [PATCH 2/4] update unit test of session manager --- session_manager_test.go | 62 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/session_manager_test.go b/session_manager_test.go index 688f4094..24a3d5a5 100644 --- a/session_manager_test.go +++ b/session_manager_test.go @@ -2,19 +2,27 @@ package nebula_go import ( "fmt" - "testing" - "github.com/stretchr/testify/assert" + "os" + "testing" ) func GetManagerConfig(minPoolSize int, maxPoolSize int) ManagerConfig { return ManagerConfig{ username: "root", - password: "", + password: "nebula", addresses: []HostAddress{ { Host: "127.0.0.1", - Port: 9669, + Port: 3699, + }, + { + Host: "127.0.0.1", + Port: 3700, + }, + { + Host: "127.0.0.1", + Port: 3701, }, }, // schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/ @@ -32,6 +40,7 @@ func GetPoolConfig(minPoolSize int, maxPoolSize int) PoolConfig { } func TestSessionManager(t *testing.T) { + InitData(t) manager, err := NewSessionManager(GetManagerConfig(1, 2), DefaultLogger{}) if err != nil { t.Fail() @@ -40,13 +49,12 @@ func TestSessionManager(t *testing.T) { defer manager.Close() session, err := manager.GetSession() if err != nil { - t.Fail() - return + t.Fatalf("fail to get session from session manager, %s", err.Error()) } // schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/ result, err := session.Execute("GO FROM \"player101\" OVER follow YIELD id($$);") if err != nil || !result.IsSucceed() { - t.Fail() + t.Fatalf("execute statment fail, %s", err.Error()) return } assert.True(t, len(result.GetRows()) != 0) @@ -54,6 +62,7 @@ func TestSessionManager(t *testing.T) { // go test -bench=SessionManager -benchtime=10000x -run=^a func BenchmarkSessionManager(b *testing.B) { + skipBenchmark(b) manager, err := NewSessionManager(GetManagerConfig(10, 20), DefaultLogger{}) if err != nil { b.Fail() @@ -82,6 +91,7 @@ func BenchmarkSessionManager(b *testing.B) { // go test -bench=SessionPool -benchtime=10000x -run=^a func BenchmarkSessionPool(b *testing.B) { + skipBenchmark(b) config := GetManagerConfig(10, 20) pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{}) if err != nil { @@ -108,3 +118,41 @@ func BenchmarkSessionPool(b *testing.B) { } }) } + +func InitData(t *testing.T) { + config := GetManagerConfig(10, 20) + pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{}) + if err != nil { + t.Fatalf("test session manager, init data fail, %s", err.Error()) + } + defer pool.Close() + + session, err := pool.GetSession("root", "nebula") + defer session.Release() + if err != nil { + t.Fatalf("test session manager, init data, get session fail, %s", err.Error()) + } + + schema := "CREATE SPACE basketballplayer(partition_num=1, replica_factor=1, vid_type=fixed_string(30));" + + "USE basketballplayer;" + + "CREATE TAG player(name string, age int);" + + "CREATE EDGE follow(degree int);" + + _, err = session.Execute(schema) + if err != nil { + t.Fatalf("test session manager, init data schema fail, %s", err.Error()) + } + + dataStatement := "INSERT VERTEX player(name, age) VALUES \"player101\":(\"Tony Parker\", 36), \"player100\":(\"Tim Duncan\", 42);" + + "INSERT EDGE follow(degree) VALUES \"player101\" -> \"player100\":(95);" + _, err = session.Execute(dataStatement) + if err != nil { + t.Fatalf("test session manager, init data fail, %s", err.Error()) + } +} + +func skipBenchmark(b *testing.B) { + if os.Getenv("session_manager_benchmark") != "true" { + b.Skip("skip session manager benchmark testing") + } +} From 6bc3d28892e0bda105a47bf74ad4667348e128d6 Mon Sep 17 00:00:00 2001 From: horoc Date: Fri, 12 Aug 2022 00:00:59 +0800 Subject: [PATCH 3/4] fix error format --- session_manager.go | 2 +- session_manager_test.go | 76 ++++++++++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/session_manager.go b/session_manager.go index 1387e771..a8a53b8b 100644 --- a/session_manager.go +++ b/session_manager.go @@ -104,7 +104,7 @@ func (m *SessionManager) GetSession() (*SessionWrapper, error) { return nil, err } if !result.IsSucceed() { - return nil, fmt.Errorf("fail to get new seesion, change space error: %s", err.Error()) + return nil, fmt.Errorf("fail to get new seesion, change space error") } sessionWrapper := &SessionWrapper{session: session, sessionManager: m, sessionId: session.GetSessionID()} diff --git a/session_manager_test.go b/session_manager_test.go index 24a3d5a5..b2c7228a 100644 --- a/session_manager_test.go +++ b/session_manager_test.go @@ -2,9 +2,11 @@ package nebula_go import ( "fmt" - "github.com/stretchr/testify/assert" "os" "testing" + "time" + + "github.com/stretchr/testify/assert" ) func GetManagerConfig(minPoolSize int, maxPoolSize int) ManagerConfig { @@ -16,14 +18,6 @@ func GetManagerConfig(minPoolSize int, maxPoolSize int) ManagerConfig { Host: "127.0.0.1", Port: 3699, }, - { - Host: "127.0.0.1", - Port: 3700, - }, - { - Host: "127.0.0.1", - Port: 3701, - }, }, // schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/ spaceName: "basketballplayer", @@ -40,11 +34,13 @@ func GetPoolConfig(minPoolSize int, maxPoolSize int) PoolConfig { } func TestSessionManager(t *testing.T) { - InitData(t) + err := InitData() + if err != nil { + t.Fatalf(err.Error()) + } manager, err := NewSessionManager(GetManagerConfig(1, 2), DefaultLogger{}) if err != nil { - t.Fail() - return + t.Fatalf("create session manager error, %s", err.Error()) } defer manager.Close() session, err := manager.GetSession() @@ -54,15 +50,17 @@ func TestSessionManager(t *testing.T) { // schema : https://docs.nebula-graph.com.cn/3.1.0/2.quick-start/4.nebula-graph-crud/ result, err := session.Execute("GO FROM \"player101\" OVER follow YIELD id($$);") if err != nil || !result.IsSucceed() { - t.Fatalf("execute statment fail, %s", err.Error()) - return + t.Fatalf("execute statment fails") } assert.True(t, len(result.GetRows()) != 0) } -// go test -bench=SessionManager -benchtime=10000x -run=^a func BenchmarkSessionManager(b *testing.B) { skipBenchmark(b) + err := InitData() + if err != nil { + b.Fatalf(err.Error()) + } manager, err := NewSessionManager(GetManagerConfig(10, 20), DefaultLogger{}) if err != nil { b.Fail() @@ -89,9 +87,12 @@ func BenchmarkSessionManager(b *testing.B) { }) } -// go test -bench=SessionPool -benchtime=10000x -run=^a func BenchmarkSessionPool(b *testing.B) { skipBenchmark(b) + err := InitData() + if err != nil { + b.Fatalf(err.Error()) + } config := GetManagerConfig(10, 20) pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{}) if err != nil { @@ -119,36 +120,44 @@ func BenchmarkSessionPool(b *testing.B) { }) } -func InitData(t *testing.T) { +func InitData() error { config := GetManagerConfig(10, 20) pool, err := NewConnectionPool(config.addresses, config.poolConfig, DefaultLogger{}) if err != nil { - t.Fatalf("test session manager, init data fail, %s", err.Error()) + return fmt.Errorf("test session manager, init data fail, %s", err.Error()) } defer pool.Close() session, err := pool.GetSession("root", "nebula") defer session.Release() if err != nil { - t.Fatalf("test session manager, init data, get session fail, %s", err.Error()) + return fmt.Errorf("test session manager, init data, get session fail, %s", err.Error()) } - schema := "CREATE SPACE basketballplayer(partition_num=1, replica_factor=1, vid_type=fixed_string(30));" + + schema := "CREATE SPACE IF NOT EXISTS basketballplayer(partition_num=1, replica_factor=1, vid_type=fixed_string(30));" + "USE basketballplayer;" + - "CREATE TAG player(name string, age int);" + - "CREATE EDGE follow(degree int);" + "CREATE TAG IF NOT EXISTS player(name string, age int);" + + "CREATE EDGE IF NOT EXISTS follow(degree int);" + + _, err = tryToExecute(session, schema) + if err != nil { + return fmt.Errorf("test session manager, init data schema fail, %s", err.Error()) + } - _, err = session.Execute(schema) + dataStatement := "INSERT VERTEX player(name, age) VALUES \"player101\":(\"Tony Parker\", 36), \"player100\":(\"Tim Duncan\", 42);" + _, err = tryToExecute(session, dataStatement) if err != nil { - t.Fatalf("test session manager, init data schema fail, %s", err.Error()) + return fmt.Errorf("test session manager, init data fail, %s", err.Error()) } - dataStatement := "INSERT VERTEX player(name, age) VALUES \"player101\":(\"Tony Parker\", 36), \"player100\":(\"Tim Duncan\", 42);" + - "INSERT EDGE follow(degree) VALUES \"player101\" -> \"player100\":(95);" - _, err = session.Execute(dataStatement) + time.Sleep(2 * time.Second) + + dataStatement = "INSERT EDGE follow(degree) VALUES \"player101\" -> \"player100\":(95);" + _, err = tryToExecute(session, dataStatement) if err != nil { - t.Fatalf("test session manager, init data fail, %s", err.Error()) + return fmt.Errorf("test session manager, init data fail, %s", err.Error()) } + return nil } func skipBenchmark(b *testing.B) { @@ -156,3 +165,14 @@ func skipBenchmark(b *testing.B) { b.Skip("skip session manager benchmark testing") } } + +func tryToExecute(session *Session, query string) (resp *ResultSet, err error) { + for i := 3; i > 0; i-- { + resp, err = session.Execute(query) + if err == nil && resp.IsSucceed() { + return + } + time.Sleep(2 * time.Second) + } + return +} From 15542f43464b19d945647f0d8a3dd84a72d131ec Mon Sep 17 00:00:00 2001 From: horoc Date: Fri, 12 Aug 2022 21:45:14 +0800 Subject: [PATCH 4/4] fix unit test error --- session_manager_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/session_manager_test.go b/session_manager_test.go index b2c7228a..170b711d 100644 --- a/session_manager_test.go +++ b/session_manager_test.go @@ -139,13 +139,13 @@ func InitData() error { "CREATE TAG IF NOT EXISTS player(name string, age int);" + "CREATE EDGE IF NOT EXISTS follow(degree int);" - _, err = tryToExecute(session, schema) + _, err = Execute(session, schema) if err != nil { return fmt.Errorf("test session manager, init data schema fail, %s", err.Error()) } dataStatement := "INSERT VERTEX player(name, age) VALUES \"player101\":(\"Tony Parker\", 36), \"player100\":(\"Tim Duncan\", 42);" - _, err = tryToExecute(session, dataStatement) + _, err = Execute(session, dataStatement) if err != nil { return fmt.Errorf("test session manager, init data fail, %s", err.Error()) } @@ -153,7 +153,7 @@ func InitData() error { time.Sleep(2 * time.Second) dataStatement = "INSERT EDGE follow(degree) VALUES \"player101\" -> \"player100\":(95);" - _, err = tryToExecute(session, dataStatement) + _, err = Execute(session, dataStatement) if err != nil { return fmt.Errorf("test session manager, init data fail, %s", err.Error()) } @@ -166,7 +166,7 @@ func skipBenchmark(b *testing.B) { } } -func tryToExecute(session *Session, query string) (resp *ResultSet, err error) { +func Execute(session *Session, query string) (resp *ResultSet, err error) { for i := 3; i > 0; i-- { resp, err = session.Execute(query) if err == nil && resp.IsSucceed() {