From c23dd4f99a7ecf5e57de59781ae4366630cb17ff Mon Sep 17 00:00:00 2001 From: "Harris.Chu" <1726587+HarrisChu@users.noreply.github.com> Date: Thu, 20 Jun 2024 11:43:45 +0800 Subject: [PATCH] Support execute timeout (#344) * support execute timeout * update --- connection.go | 4 + nebula/graph/graphservice.go | 890 +++++++++++++++++++++++++++++++++-- nebula/graph/ttypes.go | 116 +++++ session.go | 44 +- session_pool.go | 100 ++-- 5 files changed, 1086 insertions(+), 68 deletions(-) diff --git a/connection.go b/connection.go index f51e9c66..179bd5a4 100644 --- a/connection.go +++ b/connection.go @@ -187,6 +187,10 @@ func (cn *connection) executeWithParameter(sessionID int64, stmt string, return resp, nil } +func (cn *connection) executeWithParameterTimeout(sessionID int64, stmt string, params map[string]*nebula.Value, timeoutMs int64) (*graph.ExecutionResponse, error) { + return cn.graph.ExecuteWithTimeout(sessionID, []byte(stmt), params, timeoutMs) +} + func (cn *connection) executeJson(sessionID int64, stmt string) ([]byte, error) { return cn.ExecuteJsonWithParameter(sessionID, stmt, map[string]*nebula.Value{}) } diff --git a/nebula/graph/graphservice.go b/nebula/graph/graphservice.go index e42798f5..3361d7fe 100644 --- a/nebula/graph/graphservice.go +++ b/nebula/graph/graphservice.go @@ -42,6 +42,12 @@ type GraphService interface { // Parameters: // - SessionId // - Stmt + // - ParameterMap + // - Timeout + ExecuteWithTimeout(ctx context.Context, sessionId int64, stmt []byte, parameterMap map[string]*nebula0.Value, timeout int64) (_r *ExecutionResponse, err error) + // Parameters: + // - SessionId + // - Stmt ExecuteJson(ctx context.Context, sessionId int64, stmt []byte) (_r []byte, err error) // Parameters: // - SessionId @@ -51,6 +57,7 @@ type GraphService interface { // Parameters: // - Req VerifyClientVersion(ctx context.Context, req *VerifyClientVersionReq) (_r *VerifyClientVersionResp, err error) + Health(ctx context.Context) (_r *HealthResp, err error) } type GraphServiceClientInterface interface { @@ -74,6 +81,12 @@ type GraphServiceClientInterface interface { // Parameters: // - SessionId // - Stmt + // - ParameterMap + // - Timeout + ExecuteWithTimeout(sessionId int64, stmt []byte, parameterMap map[string]*nebula0.Value, timeout int64) (_r *ExecutionResponse, err error) + // Parameters: + // - SessionId + // - Stmt ExecuteJson(sessionId int64, stmt []byte) (_r []byte, err error) // Parameters: // - SessionId @@ -83,6 +96,7 @@ type GraphServiceClientInterface interface { // Parameters: // - Req VerifyClientVersion(req *VerifyClientVersionReq) (_r *VerifyClientVersionResp, err error) + Health() (_r *HealthResp, err error) } type GraphServiceClient struct { @@ -193,6 +207,32 @@ func (p *GraphServiceClient) recvExecuteWithParameter() (value *ExecutionRespons return result.GetSuccess(), nil } +// Parameters: +// - SessionId +// - Stmt +// - ParameterMap +// - Timeout +func (p *GraphServiceClient) ExecuteWithTimeout(sessionId int64, stmt []byte, parameterMap map[string]*nebula0.Value, timeout int64) (_r *ExecutionResponse, err error) { + args := GraphServiceExecuteWithTimeoutArgs{ + SessionId : sessionId, + Stmt : stmt, + ParameterMap : parameterMap, + Timeout : timeout, + } + err = p.CC.SendMsg("executeWithTimeout", &args, thrift.CALL) + if err != nil { return } + return p.recvExecuteWithTimeout() +} + + +func (p *GraphServiceClient) recvExecuteWithTimeout() (value *ExecutionResponse, err error) { + var result GraphServiceExecuteWithTimeoutResult + err = p.CC.RecvMsg("executeWithTimeout", &result) + if err != nil { return } + + return result.GetSuccess(), nil +} + // Parameters: // - SessionId // - Stmt @@ -259,6 +299,22 @@ func (p *GraphServiceClient) recvVerifyClientVersion() (value *VerifyClientVersi return result.GetSuccess(), nil } +func (p *GraphServiceClient) Health() (_r *HealthResp, err error) { + var args GraphServiceHealthArgs + err = p.CC.SendMsg("health", &args, thrift.CALL) + if err != nil { return } + return p.recvHealth() +} + + +func (p *GraphServiceClient) recvHealth() (value *HealthResp, err error) { + var result GraphServiceHealthResult + err = p.CC.RecvMsg("health", &result) + if err != nil { return } + + return result.GetSuccess(), nil +} + type GraphServiceThreadsafeClient struct { GraphServiceClientInterface @@ -383,6 +439,34 @@ func (p *GraphServiceThreadsafeClient) recvExecuteWithParameter() (value *Execut return result.GetSuccess(), nil } +// Parameters: +// - SessionId +// - Stmt +// - ParameterMap +// - Timeout +func (p *GraphServiceThreadsafeClient) ExecuteWithTimeout(sessionId int64, stmt []byte, parameterMap map[string]*nebula0.Value, timeout int64) (_r *ExecutionResponse, err error) { + p.Mu.Lock() + defer p.Mu.Unlock() + args := GraphServiceExecuteWithTimeoutArgs{ + SessionId : sessionId, + Stmt : stmt, + ParameterMap : parameterMap, + Timeout : timeout, + } + err = p.CC.SendMsg("executeWithTimeout", &args, thrift.CALL) + if err != nil { return } + return p.recvExecuteWithTimeout() +} + + +func (p *GraphServiceThreadsafeClient) recvExecuteWithTimeout() (value *ExecutionResponse, err error) { + var result GraphServiceExecuteWithTimeoutResult + err = p.CC.RecvMsg("executeWithTimeout", &result) + if err != nil { return } + + return result.GetSuccess(), nil +} + // Parameters: // - SessionId // - Stmt @@ -455,6 +539,24 @@ func (p *GraphServiceThreadsafeClient) recvVerifyClientVersion() (value *VerifyC return result.GetSuccess(), nil } +func (p *GraphServiceThreadsafeClient) Health() (_r *HealthResp, err error) { + p.Mu.Lock() + defer p.Mu.Unlock() + var args GraphServiceHealthArgs + err = p.CC.SendMsg("health", &args, thrift.CALL) + if err != nil { return } + return p.recvHealth() +} + + +func (p *GraphServiceThreadsafeClient) recvHealth() (value *HealthResp, err error) { + var result GraphServiceHealthResult + err = p.CC.RecvMsg("health", &result) + if err != nil { return } + + return result.GetSuccess(), nil +} + type GraphServiceChannelClient struct { RequestChannel thrift.RequestChannel @@ -535,6 +637,25 @@ func (p *GraphServiceChannelClient) ExecuteWithParameter(ctx context.Context, se return result.GetSuccess(), nil } +// Parameters: +// - SessionId +// - Stmt +// - ParameterMap +// - Timeout +func (p *GraphServiceChannelClient) ExecuteWithTimeout(ctx context.Context, sessionId int64, stmt []byte, parameterMap map[string]*nebula0.Value, timeout int64) (_r *ExecutionResponse, err error) { + args := GraphServiceExecuteWithTimeoutArgs{ + SessionId : sessionId, + Stmt : stmt, + ParameterMap : parameterMap, + Timeout : timeout, + } + var result GraphServiceExecuteWithTimeoutResult + err = p.RequestChannel.Call(ctx, "executeWithTimeout", &args, &result) + if err != nil { return } + + return result.GetSuccess(), nil +} + // Parameters: // - SessionId // - Stmt @@ -580,6 +701,16 @@ func (p *GraphServiceChannelClient) VerifyClientVersion(ctx context.Context, req return result.GetSuccess(), nil } +func (p *GraphServiceChannelClient) Health(ctx context.Context) (_r *HealthResp, err error) { + args := GraphServiceHealthArgs{ + } + var result GraphServiceHealthResult + err = p.RequestChannel.Call(ctx, "health", &args, &result) + if err != nil { return } + + return result.GetSuccess(), nil +} + type GraphServiceProcessor struct { processorMap map[string]thrift.ProcessorFunctionContext @@ -616,16 +747,20 @@ func NewGraphServiceProcessor(handler GraphService) *GraphServiceProcessor { self9.processorMap["signout"] = &graphServiceProcessorSignout{handler:handler} self9.processorMap["execute"] = &graphServiceProcessorExecute{handler:handler} self9.processorMap["executeWithParameter"] = &graphServiceProcessorExecuteWithParameter{handler:handler} + self9.processorMap["executeWithTimeout"] = &graphServiceProcessorExecuteWithTimeout{handler:handler} self9.processorMap["executeJson"] = &graphServiceProcessorExecuteJson{handler:handler} self9.processorMap["executeJsonWithParameter"] = &graphServiceProcessorExecuteJsonWithParameter{handler:handler} self9.processorMap["verifyClientVersion"] = &graphServiceProcessorVerifyClientVersion{handler:handler} + self9.processorMap["health"] = &graphServiceProcessorHealth{handler:handler} self9.functionServiceMap["authenticate"] = "GraphService" self9.functionServiceMap["signout"] = "GraphService" self9.functionServiceMap["execute"] = "GraphService" self9.functionServiceMap["executeWithParameter"] = "GraphService" + self9.functionServiceMap["executeWithTimeout"] = "GraphService" self9.functionServiceMap["executeJson"] = "GraphService" self9.functionServiceMap["executeJsonWithParameter"] = "GraphService" self9.functionServiceMap["verifyClientVersion"] = "GraphService" + self9.functionServiceMap["health"] = "GraphService" return self9 } @@ -841,6 +976,61 @@ func (p *graphServiceProcessorExecuteWithParameter) RunContext(ctx context.Conte return &result, nil } +type graphServiceProcessorExecuteWithTimeout struct { + handler GraphService +} + +func (p *GraphServiceExecuteWithTimeoutResult) Exception() thrift.WritableException { + if p == nil { return nil } + return nil +} + +func (p *graphServiceProcessorExecuteWithTimeout) Read(iprot thrift.Protocol) (thrift.Struct, thrift.Exception) { + args := GraphServiceExecuteWithTimeoutArgs{} + if err := args.Read(iprot); err != nil { + return nil, err + } + iprot.ReadMessageEnd() + return &args, nil +} + +func (p *graphServiceProcessorExecuteWithTimeout) Write(seqId int32, result thrift.WritableStruct, oprot thrift.Protocol) (err thrift.Exception) { + var err2 error + messageType := thrift.REPLY + switch result.(type) { + case thrift.ApplicationException: + messageType = thrift.EXCEPTION + } + if err2 = oprot.WriteMessageBegin("executeWithTimeout", messageType, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(); err == nil && err2 != nil { + err = err2 + } + return err +} + +func (p *graphServiceProcessorExecuteWithTimeout) RunContext(ctx context.Context, argStruct thrift.Struct) (thrift.WritableStruct, thrift.ApplicationException) { + args := argStruct.(*GraphServiceExecuteWithTimeoutArgs) + var result GraphServiceExecuteWithTimeoutResult + if retval, err := p.handler.ExecuteWithTimeout(ctx, args.SessionId, args.Stmt, args.ParameterMap, args.Timeout); err != nil { + switch err.(type) { + default: + x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, "Internal error processing executeWithTimeout: " + err.Error()) + return x, x + } + } else { + result.Success = retval + } + return &result, nil +} + type graphServiceProcessorExecuteJson struct { handler GraphService } @@ -1006,6 +1196,60 @@ func (p *graphServiceProcessorVerifyClientVersion) RunContext(ctx context.Contex return &result, nil } +type graphServiceProcessorHealth struct { + handler GraphService +} + +func (p *GraphServiceHealthResult) Exception() thrift.WritableException { + if p == nil { return nil } + return nil +} + +func (p *graphServiceProcessorHealth) Read(iprot thrift.Protocol) (thrift.Struct, thrift.Exception) { + args := GraphServiceHealthArgs{} + if err := args.Read(iprot); err != nil { + return nil, err + } + iprot.ReadMessageEnd() + return &args, nil +} + +func (p *graphServiceProcessorHealth) Write(seqId int32, result thrift.WritableStruct, oprot thrift.Protocol) (err thrift.Exception) { + var err2 error + messageType := thrift.REPLY + switch result.(type) { + case thrift.ApplicationException: + messageType = thrift.EXCEPTION + } + if err2 = oprot.WriteMessageBegin("health", messageType, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(); err == nil && err2 != nil { + err = err2 + } + return err +} + +func (p *graphServiceProcessorHealth) RunContext(ctx context.Context, argStruct thrift.Struct) (thrift.WritableStruct, thrift.ApplicationException) { + var result GraphServiceHealthResult + if retval, err := p.handler.Health(ctx); err != nil { + switch err.(type) { + default: + x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, "Internal error processing health: " + err.Error()) + return x, x + } + } else { + result.Success = retval + } + return &result, nil +} + // HELPER FUNCTIONS AND STRUCTURES @@ -2034,62 +2278,96 @@ func (p *GraphServiceExecuteWithParameterResult) String() string { // Attributes: // - SessionId // - Stmt -type GraphServiceExecuteJsonArgs struct { +// - ParameterMap +// - Timeout +type GraphServiceExecuteWithTimeoutArgs struct { thrift.IRequest SessionId int64 `thrift:"sessionId,1" db:"sessionId" json:"sessionId"` Stmt []byte `thrift:"stmt,2" db:"stmt" json:"stmt"` + ParameterMap map[string]*nebula0.Value `thrift:"parameterMap,3" db:"parameterMap" json:"parameterMap"` + Timeout int64 `thrift:"timeout,4" db:"timeout" json:"timeout"` } -func NewGraphServiceExecuteJsonArgs() *GraphServiceExecuteJsonArgs { - return &GraphServiceExecuteJsonArgs{} +func NewGraphServiceExecuteWithTimeoutArgs() *GraphServiceExecuteWithTimeoutArgs { + return &GraphServiceExecuteWithTimeoutArgs{} } -func (p *GraphServiceExecuteJsonArgs) GetSessionId() int64 { +func (p *GraphServiceExecuteWithTimeoutArgs) GetSessionId() int64 { return p.SessionId } -func (p *GraphServiceExecuteJsonArgs) GetStmt() []byte { +func (p *GraphServiceExecuteWithTimeoutArgs) GetStmt() []byte { return p.Stmt } -type GraphServiceExecuteJsonArgsBuilder struct { - obj *GraphServiceExecuteJsonArgs + +func (p *GraphServiceExecuteWithTimeoutArgs) GetParameterMap() map[string]*nebula0.Value { + return p.ParameterMap } -func NewGraphServiceExecuteJsonArgsBuilder() *GraphServiceExecuteJsonArgsBuilder{ - return &GraphServiceExecuteJsonArgsBuilder{ - obj: NewGraphServiceExecuteJsonArgs(), +func (p *GraphServiceExecuteWithTimeoutArgs) GetTimeout() int64 { + return p.Timeout +} +type GraphServiceExecuteWithTimeoutArgsBuilder struct { + obj *GraphServiceExecuteWithTimeoutArgs +} + +func NewGraphServiceExecuteWithTimeoutArgsBuilder() *GraphServiceExecuteWithTimeoutArgsBuilder{ + return &GraphServiceExecuteWithTimeoutArgsBuilder{ + obj: NewGraphServiceExecuteWithTimeoutArgs(), } } -func (p GraphServiceExecuteJsonArgsBuilder) Emit() *GraphServiceExecuteJsonArgs{ - return &GraphServiceExecuteJsonArgs{ +func (p GraphServiceExecuteWithTimeoutArgsBuilder) Emit() *GraphServiceExecuteWithTimeoutArgs{ + return &GraphServiceExecuteWithTimeoutArgs{ SessionId: p.obj.SessionId, Stmt: p.obj.Stmt, + ParameterMap: p.obj.ParameterMap, + Timeout: p.obj.Timeout, } } -func (g *GraphServiceExecuteJsonArgsBuilder) SessionId(sessionId int64) *GraphServiceExecuteJsonArgsBuilder { +func (g *GraphServiceExecuteWithTimeoutArgsBuilder) SessionId(sessionId int64) *GraphServiceExecuteWithTimeoutArgsBuilder { g.obj.SessionId = sessionId return g } -func (g *GraphServiceExecuteJsonArgsBuilder) Stmt(stmt []byte) *GraphServiceExecuteJsonArgsBuilder { +func (g *GraphServiceExecuteWithTimeoutArgsBuilder) Stmt(stmt []byte) *GraphServiceExecuteWithTimeoutArgsBuilder { g.obj.Stmt = stmt return g } -func (g *GraphServiceExecuteJsonArgs) SetSessionId(sessionId int64) *GraphServiceExecuteJsonArgs { +func (g *GraphServiceExecuteWithTimeoutArgsBuilder) ParameterMap(parameterMap map[string]*nebula0.Value) *GraphServiceExecuteWithTimeoutArgsBuilder { + g.obj.ParameterMap = parameterMap + return g +} + +func (g *GraphServiceExecuteWithTimeoutArgsBuilder) Timeout(timeout int64) *GraphServiceExecuteWithTimeoutArgsBuilder { + g.obj.Timeout = timeout + return g +} + +func (g *GraphServiceExecuteWithTimeoutArgs) SetSessionId(sessionId int64) *GraphServiceExecuteWithTimeoutArgs { g.SessionId = sessionId return g } -func (g *GraphServiceExecuteJsonArgs) SetStmt(stmt []byte) *GraphServiceExecuteJsonArgs { +func (g *GraphServiceExecuteWithTimeoutArgs) SetStmt(stmt []byte) *GraphServiceExecuteWithTimeoutArgs { g.Stmt = stmt return g } -func (p *GraphServiceExecuteJsonArgs) Read(iprot thrift.Protocol) error { +func (g *GraphServiceExecuteWithTimeoutArgs) SetParameterMap(parameterMap map[string]*nebula0.Value) *GraphServiceExecuteWithTimeoutArgs { + g.ParameterMap = parameterMap + return g +} + +func (g *GraphServiceExecuteWithTimeoutArgs) SetTimeout(timeout int64) *GraphServiceExecuteWithTimeoutArgs { + g.Timeout = timeout + return g +} + +func (p *GraphServiceExecuteWithTimeoutArgs) Read(iprot thrift.Protocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) } @@ -2110,6 +2388,14 @@ func (p *GraphServiceExecuteJsonArgs) Read(iprot thrift.Protocol) error { if err := p.ReadField2(iprot); err != nil { return err } + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } + case 4: + if err := p.ReadField4(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -2125,7 +2411,7 @@ func (p *GraphServiceExecuteJsonArgs) Read(iprot thrift.Protocol) error { return nil } -func (p *GraphServiceExecuteJsonArgs) ReadField1(iprot thrift.Protocol) error { +func (p *GraphServiceExecuteWithTimeoutArgs) ReadField1(iprot thrift.Protocol) error { if v, err := iprot.ReadI64(); err != nil { return thrift.PrependError("error reading field 1: ", err) } else { @@ -2134,7 +2420,7 @@ func (p *GraphServiceExecuteJsonArgs) ReadField1(iprot thrift.Protocol) error { return nil } -func (p *GraphServiceExecuteJsonArgs) ReadField2(iprot thrift.Protocol) error { +func (p *GraphServiceExecuteWithTimeoutArgs) ReadField2(iprot thrift.Protocol) error { if v, err := iprot.ReadBinary(); err != nil { return thrift.PrependError("error reading field 2: ", err) } else { @@ -2143,11 +2429,48 @@ func (p *GraphServiceExecuteJsonArgs) ReadField2(iprot thrift.Protocol) error { return nil } -func (p *GraphServiceExecuteJsonArgs) Write(oprot thrift.Protocol) error { - if err := oprot.WriteStructBegin("executeJson_args"); err != nil { +func (p *GraphServiceExecuteWithTimeoutArgs) ReadField3(iprot thrift.Protocol) error { + _, _, size, err := iprot.ReadMapBegin() + if err != nil { + return thrift.PrependError("error reading map begin: ", err) + } + tMap := make(map[string]*nebula0.Value, size) + p.ParameterMap = tMap + for i := 0; i < size; i ++ { + var _key13 string + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 0: ", err) + } else { + _key13 = v + } + _val14 := nebula0.NewValue() + if err := _val14.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", _val14), err) + } + p.ParameterMap[_key13] = _val14 + } + if err := iprot.ReadMapEnd(); err != nil { + return thrift.PrependError("error reading map end: ", err) + } + return nil +} + +func (p *GraphServiceExecuteWithTimeoutArgs) ReadField4(iprot thrift.Protocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 4: ", err) + } else { + p.Timeout = v + } + return nil +} + +func (p *GraphServiceExecuteWithTimeoutArgs) Write(oprot thrift.Protocol) error { + if err := oprot.WriteStructBegin("executeWithTimeout_args"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } if err := p.writeField1(oprot); err != nil { return err } if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { return err } + if err := p.writeField4(oprot); err != nil { return err } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } if err := oprot.WriteStructEnd(); err != nil { @@ -2155,7 +2478,7 @@ func (p *GraphServiceExecuteJsonArgs) Write(oprot thrift.Protocol) error { return nil } -func (p *GraphServiceExecuteJsonArgs) writeField1(oprot thrift.Protocol) (err error) { +func (p *GraphServiceExecuteWithTimeoutArgs) writeField1(oprot thrift.Protocol) (err error) { if err := oprot.WriteFieldBegin("sessionId", thrift.I64, 1); err != nil { return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:sessionId: ", p), err) } if err := oprot.WriteI64(int64(p.SessionId)); err != nil { @@ -2165,7 +2488,7 @@ func (p *GraphServiceExecuteJsonArgs) writeField1(oprot thrift.Protocol) (err er return err } -func (p *GraphServiceExecuteJsonArgs) writeField2(oprot thrift.Protocol) (err error) { +func (p *GraphServiceExecuteWithTimeoutArgs) writeField2(oprot thrift.Protocol) (err error) { if err := oprot.WriteFieldBegin("stmt", thrift.STRING, 2); err != nil { return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:stmt: ", p), err) } if err := oprot.WriteBinary(p.Stmt); err != nil { @@ -2175,8 +2498,321 @@ func (p *GraphServiceExecuteJsonArgs) writeField2(oprot thrift.Protocol) (err er return err } -func (p *GraphServiceExecuteJsonArgs) String() string { - if p == nil { +func (p *GraphServiceExecuteWithTimeoutArgs) writeField3(oprot thrift.Protocol) (err error) { + if err := oprot.WriteFieldBegin("parameterMap", thrift.MAP, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:parameterMap: ", p), err) } + if err := oprot.WriteMapBegin(thrift.STRING, thrift.STRUCT, len(p.ParameterMap)); err != nil { + return thrift.PrependError("error writing map begin: ", err) + } + for k, v := range p.ParameterMap { + if err := oprot.WriteString(string(k)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T. (0) field write error: ", p), err) } + if err := v.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", v), err) + } + } + if err := oprot.WriteMapEnd(); err != nil { + return thrift.PrependError("error writing map end: ", err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:parameterMap: ", p), err) } + return err +} + +func (p *GraphServiceExecuteWithTimeoutArgs) writeField4(oprot thrift.Protocol) (err error) { + if err := oprot.WriteFieldBegin("timeout", thrift.I64, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:timeout: ", p), err) } + if err := oprot.WriteI64(int64(p.Timeout)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.timeout (4) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:timeout: ", p), err) } + return err +} + +func (p *GraphServiceExecuteWithTimeoutArgs) String() string { + if p == nil { + return "" + } + + sessionIdVal := fmt.Sprintf("%v", p.SessionId) + stmtVal := fmt.Sprintf("%v", p.Stmt) + parameterMapVal := fmt.Sprintf("%v", p.ParameterMap) + timeoutVal := fmt.Sprintf("%v", p.Timeout) + return fmt.Sprintf("GraphServiceExecuteWithTimeoutArgs({SessionId:%s Stmt:%s ParameterMap:%s Timeout:%s})", sessionIdVal, stmtVal, parameterMapVal, timeoutVal) +} + +// Attributes: +// - Success +type GraphServiceExecuteWithTimeoutResult struct { + thrift.IResponse + Success *ExecutionResponse `thrift:"success,0,optional" db:"success" json:"success,omitempty"` +} + +func NewGraphServiceExecuteWithTimeoutResult() *GraphServiceExecuteWithTimeoutResult { + return &GraphServiceExecuteWithTimeoutResult{} +} + +var GraphServiceExecuteWithTimeoutResult_Success_DEFAULT *ExecutionResponse +func (p *GraphServiceExecuteWithTimeoutResult) GetSuccess() *ExecutionResponse { + if !p.IsSetSuccess() { + return GraphServiceExecuteWithTimeoutResult_Success_DEFAULT + } +return p.Success +} +func (p *GraphServiceExecuteWithTimeoutResult) IsSetSuccess() bool { + return p != nil && p.Success != nil +} + +type GraphServiceExecuteWithTimeoutResultBuilder struct { + obj *GraphServiceExecuteWithTimeoutResult +} + +func NewGraphServiceExecuteWithTimeoutResultBuilder() *GraphServiceExecuteWithTimeoutResultBuilder{ + return &GraphServiceExecuteWithTimeoutResultBuilder{ + obj: NewGraphServiceExecuteWithTimeoutResult(), + } +} + +func (p GraphServiceExecuteWithTimeoutResultBuilder) Emit() *GraphServiceExecuteWithTimeoutResult{ + return &GraphServiceExecuteWithTimeoutResult{ + Success: p.obj.Success, + } +} + +func (g *GraphServiceExecuteWithTimeoutResultBuilder) Success(success *ExecutionResponse) *GraphServiceExecuteWithTimeoutResultBuilder { + g.obj.Success = success + return g +} + +func (g *GraphServiceExecuteWithTimeoutResult) SetSuccess(success *ExecutionResponse) *GraphServiceExecuteWithTimeoutResult { + g.Success = success + return g +} + +func (p *GraphServiceExecuteWithTimeoutResult) Read(iprot thrift.Protocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 0: + if err := p.ReadField0(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *GraphServiceExecuteWithTimeoutResult) ReadField0(iprot thrift.Protocol) error { + p.Success = NewExecutionResponse() + if err := p.Success.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err) + } + return nil +} + +func (p *GraphServiceExecuteWithTimeoutResult) Write(oprot thrift.Protocol) error { + if err := oprot.WriteStructBegin("executeWithTimeout_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if err := p.writeField0(oprot); err != nil { return err } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *GraphServiceExecuteWithTimeoutResult) writeField0(oprot thrift.Protocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) } + if err := p.Success.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) } + } + return err +} + +func (p *GraphServiceExecuteWithTimeoutResult) String() string { + if p == nil { + return "" + } + + var successVal string + if p.Success == nil { + successVal = "" + } else { + successVal = fmt.Sprintf("%v", p.Success) + } + return fmt.Sprintf("GraphServiceExecuteWithTimeoutResult({Success:%s})", successVal) +} + +// Attributes: +// - SessionId +// - Stmt +type GraphServiceExecuteJsonArgs struct { + thrift.IRequest + SessionId int64 `thrift:"sessionId,1" db:"sessionId" json:"sessionId"` + Stmt []byte `thrift:"stmt,2" db:"stmt" json:"stmt"` +} + +func NewGraphServiceExecuteJsonArgs() *GraphServiceExecuteJsonArgs { + return &GraphServiceExecuteJsonArgs{} +} + + +func (p *GraphServiceExecuteJsonArgs) GetSessionId() int64 { + return p.SessionId +} + +func (p *GraphServiceExecuteJsonArgs) GetStmt() []byte { + return p.Stmt +} +type GraphServiceExecuteJsonArgsBuilder struct { + obj *GraphServiceExecuteJsonArgs +} + +func NewGraphServiceExecuteJsonArgsBuilder() *GraphServiceExecuteJsonArgsBuilder{ + return &GraphServiceExecuteJsonArgsBuilder{ + obj: NewGraphServiceExecuteJsonArgs(), + } +} + +func (p GraphServiceExecuteJsonArgsBuilder) Emit() *GraphServiceExecuteJsonArgs{ + return &GraphServiceExecuteJsonArgs{ + SessionId: p.obj.SessionId, + Stmt: p.obj.Stmt, + } +} + +func (g *GraphServiceExecuteJsonArgsBuilder) SessionId(sessionId int64) *GraphServiceExecuteJsonArgsBuilder { + g.obj.SessionId = sessionId + return g +} + +func (g *GraphServiceExecuteJsonArgsBuilder) Stmt(stmt []byte) *GraphServiceExecuteJsonArgsBuilder { + g.obj.Stmt = stmt + return g +} + +func (g *GraphServiceExecuteJsonArgs) SetSessionId(sessionId int64) *GraphServiceExecuteJsonArgs { + g.SessionId = sessionId + return g +} + +func (g *GraphServiceExecuteJsonArgs) SetStmt(stmt []byte) *GraphServiceExecuteJsonArgs { + g.Stmt = stmt + return g +} + +func (p *GraphServiceExecuteJsonArgs) Read(iprot thrift.Protocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *GraphServiceExecuteJsonArgs) ReadField1(iprot thrift.Protocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.SessionId = v + } + return nil +} + +func (p *GraphServiceExecuteJsonArgs) ReadField2(iprot thrift.Protocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 2: ", err) + } else { + p.Stmt = v + } + return nil +} + +func (p *GraphServiceExecuteJsonArgs) Write(oprot thrift.Protocol) error { + if err := oprot.WriteStructBegin("executeJson_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if err := p.writeField1(oprot); err != nil { return err } + if err := p.writeField2(oprot); err != nil { return err } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *GraphServiceExecuteJsonArgs) writeField1(oprot thrift.Protocol) (err error) { + if err := oprot.WriteFieldBegin("sessionId", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:sessionId: ", p), err) } + if err := oprot.WriteI64(int64(p.SessionId)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.sessionId (1) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:sessionId: ", p), err) } + return err +} + +func (p *GraphServiceExecuteJsonArgs) writeField2(oprot thrift.Protocol) (err error) { + if err := oprot.WriteFieldBegin("stmt", thrift.STRING, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:stmt: ", p), err) } + if err := oprot.WriteBinary(p.Stmt); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.stmt (2) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:stmt: ", p), err) } + return err +} + +func (p *GraphServiceExecuteJsonArgs) String() string { + if p == nil { return "" } @@ -2445,17 +3081,17 @@ func (p *GraphServiceExecuteJsonWithParameterArgs) ReadField3(iprot thrift.Prot tMap := make(map[string]*nebula0.Value, size) p.ParameterMap = tMap for i := 0; i < size; i ++ { - var _key13 string + var _key15 string if v, err := iprot.ReadString(); err != nil { return thrift.PrependError("error reading field 0: ", err) } else { - _key13 = v + _key15 = v } - _val14 := nebula0.NewValue() - if err := _val14.Read(iprot); err != nil { - return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", _val14), err) + _val16 := nebula0.NewValue() + if err := _val16.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", _val16), err) } - p.ParameterMap[_key13] = _val14 + p.ParameterMap[_key15] = _val16 } if err := iprot.ReadMapEnd(); err != nil { return thrift.PrependError("error reading map end: ", err) @@ -2899,4 +3535,196 @@ func (p *GraphServiceVerifyClientVersionResult) String() string { return fmt.Sprintf("GraphServiceVerifyClientVersionResult({Success:%s})", successVal) } +type GraphServiceHealthArgs struct { + thrift.IRequest +} + +func NewGraphServiceHealthArgs() *GraphServiceHealthArgs { + return &GraphServiceHealthArgs{} +} + +type GraphServiceHealthArgsBuilder struct { + obj *GraphServiceHealthArgs +} + +func NewGraphServiceHealthArgsBuilder() *GraphServiceHealthArgsBuilder{ + return &GraphServiceHealthArgsBuilder{ + obj: NewGraphServiceHealthArgs(), + } +} + +func (p GraphServiceHealthArgsBuilder) Emit() *GraphServiceHealthArgs{ + return &GraphServiceHealthArgs{ + } +} + +func (p *GraphServiceHealthArgs) Read(iprot thrift.Protocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *GraphServiceHealthArgs) Write(oprot thrift.Protocol) error { + if err := oprot.WriteStructBegin("health_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *GraphServiceHealthArgs) String() string { + if p == nil { + return "" + } + + return fmt.Sprintf("GraphServiceHealthArgs({})") +} + +// Attributes: +// - Success +type GraphServiceHealthResult struct { + thrift.IResponse + Success *HealthResp `thrift:"success,0,optional" db:"success" json:"success,omitempty"` +} + +func NewGraphServiceHealthResult() *GraphServiceHealthResult { + return &GraphServiceHealthResult{} +} + +var GraphServiceHealthResult_Success_DEFAULT *HealthResp +func (p *GraphServiceHealthResult) GetSuccess() *HealthResp { + if !p.IsSetSuccess() { + return GraphServiceHealthResult_Success_DEFAULT + } +return p.Success +} +func (p *GraphServiceHealthResult) IsSetSuccess() bool { + return p != nil && p.Success != nil +} + +type GraphServiceHealthResultBuilder struct { + obj *GraphServiceHealthResult +} + +func NewGraphServiceHealthResultBuilder() *GraphServiceHealthResultBuilder{ + return &GraphServiceHealthResultBuilder{ + obj: NewGraphServiceHealthResult(), + } +} + +func (p GraphServiceHealthResultBuilder) Emit() *GraphServiceHealthResult{ + return &GraphServiceHealthResult{ + Success: p.obj.Success, + } +} + +func (g *GraphServiceHealthResultBuilder) Success(success *HealthResp) *GraphServiceHealthResultBuilder { + g.obj.Success = success + return g +} + +func (g *GraphServiceHealthResult) SetSuccess(success *HealthResp) *GraphServiceHealthResult { + g.Success = success + return g +} + +func (p *GraphServiceHealthResult) Read(iprot thrift.Protocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 0: + if err := p.ReadField0(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *GraphServiceHealthResult) ReadField0(iprot thrift.Protocol) error { + p.Success = NewHealthResp() + if err := p.Success.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err) + } + return nil +} + +func (p *GraphServiceHealthResult) Write(oprot thrift.Protocol) error { + if err := oprot.WriteStructBegin("health_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if err := p.writeField0(oprot); err != nil { return err } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *GraphServiceHealthResult) writeField0(oprot thrift.Protocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) } + if err := p.Success.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) } + } + return err +} + +func (p *GraphServiceHealthResult) String() string { + if p == nil { + return "" + } + + var successVal string + if p.Success == nil { + successVal = "" + } else { + successVal = fmt.Sprintf("%v", p.Success) + } + return fmt.Sprintf("GraphServiceHealthResult({Success:%s})", successVal) +} + diff --git a/nebula/graph/ttypes.go b/nebula/graph/ttypes.go index a4b9f9eb..14ad6f8c 100644 --- a/nebula/graph/ttypes.go +++ b/nebula/graph/ttypes.go @@ -2452,3 +2452,119 @@ func (p *VerifyClientVersionReq) String() string { return fmt.Sprintf("VerifyClientVersionReq({Version:%s})", versionVal) } +// Attributes: +// - ErrorCode +type HealthResp struct { + ErrorCode int64 `thrift:"error_code,1,required" db:"error_code" json:"error_code"` +} + +func NewHealthResp() *HealthResp { + return &HealthResp{} +} + + +func (p *HealthResp) GetErrorCode() int64 { + return p.ErrorCode +} +type HealthRespBuilder struct { + obj *HealthResp +} + +func NewHealthRespBuilder() *HealthRespBuilder{ + return &HealthRespBuilder{ + obj: NewHealthResp(), + } +} + +func (p HealthRespBuilder) Emit() *HealthResp{ + return &HealthResp{ + ErrorCode: p.obj.ErrorCode, + } +} + +func (h *HealthRespBuilder) ErrorCode(errorCode int64) *HealthRespBuilder { + h.obj.ErrorCode = errorCode + return h +} + +func (h *HealthResp) SetErrorCode(errorCode int64) *HealthResp { + h.ErrorCode = errorCode + return h +} + +func (p *HealthResp) Read(iprot thrift.Protocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetErrorCode bool = false; + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + issetErrorCode = true + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetErrorCode{ + return thrift.NewProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field ErrorCode is not set")); + } + return nil +} + +func (p *HealthResp) ReadField1(iprot thrift.Protocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.ErrorCode = v + } + return nil +} + +func (p *HealthResp) Write(oprot thrift.Protocol) error { + if err := oprot.WriteStructBegin("HealthResp"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if err := p.writeField1(oprot); err != nil { return err } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *HealthResp) writeField1(oprot thrift.Protocol) (err error) { + if err := oprot.WriteFieldBegin("error_code", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:error_code: ", p), err) } + if err := oprot.WriteI64(int64(p.ErrorCode)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.error_code (1) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:error_code: ", p), err) } + return err +} + +func (p *HealthResp) String() string { + if p == nil { + return "" + } + + errorCodeVal := fmt.Sprintf("%v", p.ErrorCode) + return fmt.Sprintf("HealthResp({ErrorCode:%s})", errorCodeVal) +} + diff --git a/session.go b/session.go index 6a7da2af..bc896dca 100644 --- a/session.go +++ b/session.go @@ -56,16 +56,29 @@ func (session *Session) executeWithReconnect(f func() (interface{}, error)) (int func (session *Session) ExecuteWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { session.mu.Lock() defer session.mu.Unlock() - if session.connection == nil { - return nil, fmt.Errorf("failed to execute: Session has been released") - } paramsMap, err := parseParams(params) if err != nil { return nil, err } + fn := func() (*graph.ExecutionResponse, error) { + return session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) + } + return session.tryExecuteLocked(fn) + +} + +// Execute returns the result of the given query as a ResultSet +func (session *Session) Execute(stmt string) (*ResultSet, error) { + return session.ExecuteWithParameter(stmt, map[string]interface{}{}) +} + +func (session *Session) tryExecuteLocked(fn func() (*graph.ExecutionResponse, error)) (*ResultSet, error) { + if session.connection == nil { + return nil, fmt.Errorf("failed to execute: Session has been released") + } execFunc := func() (interface{}, error) { - resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) + resp, err := fn() if err != nil { return nil, err } @@ -75,7 +88,6 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]inte } return resSet, nil } - resp, err := session.executeWithReconnect(execFunc) if err != nil { return nil, err @@ -84,9 +96,25 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]inte } -// Execute returns the result of the given query as a ResultSet -func (session *Session) Execute(stmt string) (*ResultSet, error) { - return session.ExecuteWithParameter(stmt, map[string]interface{}{}) +func (session *Session) ExecuteWithTimeout(stmt string, timeoutMs int64) (*ResultSet, error) { + return session.ExecuteWithParameterTimeout(stmt, map[string]interface{}{}, timeoutMs) +} + +func (session *Session) ExecuteWithParameterTimeout(stmt string, params map[string]interface{}, timeoutMs int64) (*ResultSet, error) { + session.mu.Lock() + defer session.mu.Unlock() + if timeoutMs <= 0 { + return nil, fmt.Errorf("timeout should be a positive number") + } + paramsMap, err := parseParams(params) + if err != nil { + return nil, err + } + + fn := func() (*graph.ExecutionResponse, error) { + return session.connection.executeWithParameterTimeout(session.sessionID, stmt, paramsMap, timeoutMs) + } + return session.tryExecuteLocked(fn) } // ExecuteJson returns the result of the given query as a json string diff --git a/session_pool.go b/session_pool.go index e14121c2..551206c6 100644 --- a/session_pool.go +++ b/session_pool.go @@ -15,6 +15,7 @@ import ( "time" "github.com/vesoft-inc/nebula-go/v3/nebula" + "github.com/vesoft-inc/nebula-go/v3/nebula/graph" ) // SessionPool is a pool that manages sessions internally. @@ -96,19 +97,7 @@ func (pool *SessionPool) init() error { return nil } -// Execute returns the result of the given query as a ResultSet -// Notice there are some limitations: -// 1. The query should not be a plain space switch statement, e.g. "USE test_space", -// but queries like "use space xxx; match (v) return v" are accepted. -// 2. If the query contains statements like "USE ", the space will be set to the -// one in the pool config after the execution of the query. -// 3. The query should not change the user password nor drop a user. -func (pool *SessionPool) Execute(stmt string) (*ResultSet, error) { - return pool.ExecuteWithParameter(stmt, map[string]interface{}{}) -} - -// ExecuteWithParameter returns the result of the given query as a ResultSet -func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { +func (pool *SessionPool) executeFn(execFunc func(s *pureSession) (*ResultSet, error)) (*ResultSet, error) { // Check if the pool is closed if pool.closed { return nil, fmt.Errorf("failed to execute: Session pool has been closed") @@ -130,16 +119,6 @@ func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]int pool.removeSessionFromIdle(session) pool.addSessionToActive(session) } - - // Execute the query - execFunc := func(s *pureSession) (*ResultSet, error) { - rs, err := s.executeWithParameter(stmt, params) - if err != nil { - return nil, err - } - return rs, nil - } - rs, err := pool.executeWithRetry(session, execFunc, pool.conf.retryGetSessionTimes) if err != nil { if !pool.enableHttp() { @@ -172,6 +151,51 @@ func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]int return rs, nil } +// Execute returns the result of the given query as a ResultSet +// Notice there are some limitations: +// 1. The query should not be a plain space switch statement, e.g. "USE test_space", +// but queries like "use space xxx; match (v) return v" are accepted. +// 2. If the query contains statements like "USE ", the space will be set to the +// one in the pool config after the execution of the query. +// 3. The query should not change the user password nor drop a user. +func (pool *SessionPool) Execute(stmt string) (*ResultSet, error) { + return pool.ExecuteWithParameter(stmt, map[string]interface{}{}) +} + +// ExecuteWithParameter returns the result of the given query as a ResultSet +func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { + + // Execute the query + execFunc := func(s *pureSession) (*ResultSet, error) { + rs, err := s.executeWithParameter(stmt, params) + if err != nil { + return nil, err + } + return rs, nil + } + return pool.executeFn(execFunc) +} + +func (pool *SessionPool) ExecuteWithTimeout(stmt string, timeoutMs int64) (*ResultSet, error) { + return pool.ExecuteWithParameterTimeout(stmt, map[string]interface{}{}, timeoutMs) +} + +// ExecuteWithParameter returns the result of the given query as a ResultSet +func (pool *SessionPool) ExecuteWithParameterTimeout(stmt string, params map[string]interface{}, timeoutMs int64) (*ResultSet, error) { + // Execute the query + if timeoutMs <= 0 { + return nil, fmt.Errorf("timeout should be a positive number") + } + execFunc := func(s *pureSession) (*ResultSet, error) { + rs, err := s.executeWithParameterTimeout(stmt, params, timeoutMs) + if err != nil { + return nil, err + } + return rs, nil + } + return pool.executeFn(execFunc) +} + // ExecuteJson returns the result of the given query as a json string // Date and Datetime will be returned in UTC // @@ -630,15 +654,11 @@ func (session *pureSession) execute(stmt string) (*ResultSet, error) { return session.executeWithParameter(stmt, nil) } -func (session *pureSession) executeWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { - paramsMap, err := parseParams(params) - if err != nil { - return nil, err - } +func (session *pureSession) executeFn(fn func() (*graph.ExecutionResponse, error)) (*ResultSet, error) { if session.connection == nil { return nil, fmt.Errorf("failed to execute: Session has been released") } - resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) + resp, err := fn() if err != nil { return nil, err } @@ -649,6 +669,28 @@ func (session *pureSession) executeWithParameter(stmt string, params map[string] return rs, nil } +func (session *pureSession) executeWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { + paramsMap, err := parseParams(params) + if err != nil { + return nil, err + } + fn := func() (*graph.ExecutionResponse, error) { + return session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) + } + return session.executeFn(fn) +} + +func (session *pureSession) executeWithParameterTimeout(stmt string, params map[string]interface{}, timeout int64) (*ResultSet, error) { + paramsMap, err := parseParams(params) + if err != nil { + return nil, err + } + fn := func() (*graph.ExecutionResponse, error) { + return session.connection.executeWithParameterTimeout(session.sessionID, stmt, paramsMap, timeout) + } + return session.executeFn(fn) +} + func (session *pureSession) close() { defer func() { if err := recover(); err != nil {