forked from vesoft-inc/nebula-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
session.go
109 lines (98 loc) · 2.86 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
package nebula_go
import (
"fmt"
"github.com/facebook/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/nebula-go/v2/nebula"
graph "github.com/vesoft-inc/nebula-go/v2/nebula/graph"
)
type timezoneInfo struct {
offset int32
name []byte
}
type Session struct {
sessionID int64
connection *connection
connPool *ConnectionPool
log Logger
timezoneInfo
}
// unsupported
// func (session *Session) ExecuteJson(stmt string) (*graph.ExecutionResponse, error) {
// return session.graph.ExecuteJson(session.sessionID, []byte(stmt))
// }
// Execute returns the result of given query as a ResultSet
func (session *Session) Execute(stmt string) (*ResultSet, error) {
if session.connection == nil {
return nil, fmt.Errorf("failed to execute: Session has been released")
}
resp, err := session.connection.execute(session.sessionID, stmt)
if err == nil {
resSet, err := genResultSet(resp, session.timezoneInfo)
if err != nil {
return nil, err
}
return resSet, nil
}
// Reconnect only if the tranport is closed
err2, ok := err.(thrift.TransportException)
if !ok {
return nil, err
}
if err2.TypeID() == thrift.END_OF_FILE {
_err := session.reConnect()
if _err != nil {
session.log.Error(fmt.Sprintf("Failed to reconnect, %s", _err.Error()))
return nil, _err
}
session.log.Info(fmt.Sprintf("Successfully reconnect to host: %s, port: %d",
session.connection.severAddress.Host, session.connection.severAddress.Port))
// Execute with the new connetion
resp, err := session.connection.execute(session.sessionID, stmt)
if err != nil {
return nil, err
}
resSet, err := genResultSet(resp, session.timezoneInfo)
if err != nil {
return nil, err
}
return resSet, nil
} else { // No need to reconnect
session.log.Error(fmt.Sprintf("Error info: %s", err2.Error()))
return nil, err2
}
}
func (session *Session) reConnect() error {
newconnection, err := session.connPool.getIdleConn()
if err != nil {
err = fmt.Errorf(err.Error())
return err
}
// Release connection to pool
session.connPool.release(session.connection)
session.connection = newconnection
return nil
}
// Release logs out and releases connetion hold by session
func (session *Session) Release() {
if session == nil {
return
}
if session.connection == nil {
session.log.Warn("Session has been released")
return
}
if err := session.connection.signOut(session.sessionID); err != nil {
session.log.Warn(fmt.Sprintf("Sign out failed, %s", err.Error()))
}
// Release connection to pool
session.connPool.release(session.connection)
session.connection = nil
}
func IsError(resp *graph.ExecutionResponse) bool {
return resp.GetErrorCode() != nebula.ErrorCode_SUCCEEDED
}