From 72e83390b2ab35b055648dbd3eadfee180ba8d5e Mon Sep 17 00:00:00 2001 From: SachinVarghese Date: Thu, 20 Jan 2022 19:34:53 +0530 Subject: [PATCH 1/3] Updating inference logic to add node level request-response logging --- executor/predictor/predictor_process.go | 86 +++++++++++++++++++------ 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/executor/predictor/predictor_process.go b/executor/predictor/predictor_process.go index 1e63a5b02c..ed5c169514 100644 --- a/executor/predictor/predictor_process.go +++ b/executor/predictor/predictor_process.go @@ -118,7 +118,7 @@ func (p *PredictorProcess) transformInput(node *v1.PredictiveUnit, msg payload.S p.RoutingMutex.Unlock() return p.Client.TransformInput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) } else { - return msg, nil + return nil, nil } } @@ -144,7 +144,7 @@ func (p *PredictorProcess) transformOutput(node *v1.PredictiveUnit, msg payload. } return p.Client.TransformOutput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) } else { - return msg, nil + return nil, nil } } @@ -273,7 +273,7 @@ func (p *PredictorProcess) predictChildren(node *v1.PredictiveUnit, msg payload. return p.aggregate(node, cmsgs) } else { // Don't add routing for leaf nodes - return msg, nil + return nil, nil } } @@ -365,29 +365,77 @@ func (p *PredictorProcess) Predict(node *v1.PredictiveUnit, msg payload.SeldonPa if err != nil { return nil, err } - //Log Request - if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) - if err != nil { - return nil, err - } - } + + nextMsg := msg tmsg, err := p.transformInput(node, msg) + if tmsg != nil { + nextMsg = tmsg + } if err != nil { - return tmsg, err + //Log Error Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) + if err != nil { + return nil, err + } + } + return nextMsg, err + } + cmsg, err := p.predictChildren(node, nextMsg) + if cmsg != nil { + nextMsg = cmsg } - cmsg, err := p.predictChildren(node, tmsg) if err != nil { - return cmsg, err + //Log Error Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) + if err != nil { + return nil, err + } + } + return nextMsg, err } - response, err := p.transformOutput(node, cmsg) - // Log Response - if err == nil && node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, response, puid) - if err != nil { - return nil, err + + response, err := p.transformOutput(node, nextMsg) + + if response != nil { + //Log Output Transformer Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, nextMsg, puid) + if err != nil { + return nil, err + } } + + // Log Output Transformer Response + if err == nil && node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, response, puid) + if err != nil { + return nil, err + } + } + } else { + //Log Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) + if err != nil { + return nil, err + } + } + + if tmsg != nil { + // Log Response + if node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, tmsg, puid) + if err != nil { + return nil, err + } + } + } + + response = nextMsg } + if envEnableRoutingInjection { if routeResponse, err := util.InsertRouteToSeldonPredictPayload(response, &p.Routing); err == nil { return routeResponse, err From 728231828c399680123c21471df4c2e2041eeaac Mon Sep 17 00:00:00 2001 From: SachinVarghese Date: Fri, 21 Jan 2022 21:24:56 +0530 Subject: [PATCH 2/3] Update to component level logging --- executor/predictor/predictor_process.go | 136 +++++++++--------------- 1 file changed, 53 insertions(+), 83 deletions(-) diff --git a/executor/predictor/predictor_process.go b/executor/predictor/predictor_process.go index ed5c169514..6f096e5d34 100644 --- a/executor/predictor/predictor_process.go +++ b/executor/predictor/predictor_process.go @@ -82,7 +82,7 @@ func (p *PredictorProcess) getModelName(node *v1.PredictiveUnit) string { return modelName } -func (p *PredictorProcess) transformInput(node *v1.PredictiveUnit, msg payload.SeldonPayload) (payload.SeldonPayload, error) { +func (p *PredictorProcess) transformInput(node *v1.PredictiveUnit, msg payload.SeldonPayload, puid string) (tmsg payload.SeldonPayload, err error) { callModel := false callTransformInput := false if (*node).Type != nil { @@ -99,16 +99,15 @@ func (p *PredictorProcess) transformInput(node *v1.PredictiveUnit, msg payload.S modelName := p.getModelName(node) - if callModel { - msg, err := p.Client.Chain(p.Ctx, modelName, msg) - if err != nil { - return nil, err + if callModel || callTransformInput { + //Log Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) + if err != nil { + return nil, err + } } - p.RoutingMutex.Lock() - p.Routing[node.Name] = -1 - p.RoutingMutex.Unlock() - return p.Client.Predict(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) - } else if callTransformInput { + msg, err := p.Client.Chain(p.Ctx, modelName, msg) if err != nil { return nil, err @@ -116,14 +115,28 @@ func (p *PredictorProcess) transformInput(node *v1.PredictiveUnit, msg payload.S p.RoutingMutex.Lock() p.Routing[node.Name] = -1 p.RoutingMutex.Unlock() - return p.Client.TransformInput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) + + if callTransformInput { + tmsg, err = p.Client.TransformInput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) + } else { + tmsg, err = p.Client.Predict(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) + } + if tmsg != nil && err == nil { + // Log Response + if node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, tmsg, puid) + if err != nil { + return nil, err + } + } + } + return tmsg, err } else { - return nil, nil + return msg, nil } - } -func (p *PredictorProcess) transformOutput(node *v1.PredictiveUnit, msg payload.SeldonPayload) (payload.SeldonPayload, error) { +func (p *PredictorProcess) transformOutput(node *v1.PredictiveUnit, msg payload.SeldonPayload, puid string) (payload.SeldonPayload, error) { callClient := false if (*node).Type != nil { switch *node.Type { @@ -142,9 +155,26 @@ func (p *PredictorProcess) transformOutput(node *v1.PredictiveUnit, msg payload. if err != nil { return nil, err } - return p.Client.TransformOutput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) + //Log Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) + if err != nil { + return nil, err + } + } + tmsg, err := p.Client.TransformOutput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) + if tmsg != nil && err == nil { + // Log Response + if node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, tmsg, puid) + if err != nil { + return nil, err + } + } + } + return tmsg, err } else { - return nil, nil + return msg, nil } } @@ -224,10 +254,9 @@ func (p *PredictorProcess) aggregate(node *v1.PredictiveUnit, msg []payload.Seld } else { return msg[0], nil } - } -func (p *PredictorProcess) predictChildren(node *v1.PredictiveUnit, msg payload.SeldonPayload) (payload.SeldonPayload, error) { +func (p *PredictorProcess) predictChildren(node *v1.PredictiveUnit, msg payload.SeldonPayload, puid string) (payload.SeldonPayload, error) { if node.Children != nil && len(node.Children) > 0 { route, err := p.route(node, msg) if err != nil { @@ -273,7 +302,7 @@ func (p *PredictorProcess) predictChildren(node *v1.PredictiveUnit, msg payload. return p.aggregate(node, cmsgs) } else { // Don't add routing for leaf nodes - return nil, nil + return msg, nil } } @@ -366,75 +395,16 @@ func (p *PredictorProcess) Predict(node *v1.PredictiveUnit, msg payload.SeldonPa return nil, err } - nextMsg := msg - tmsg, err := p.transformInput(node, msg) - if tmsg != nil { - nextMsg = tmsg - } + tmsg, err := p.transformInput(node, msg, puid) if err != nil { - //Log Error Request - if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) - if err != nil { - return nil, err - } - } - return nextMsg, err - } - cmsg, err := p.predictChildren(node, nextMsg) - if cmsg != nil { - nextMsg = cmsg + return tmsg, err } + cmsg, err := p.predictChildren(node, tmsg, puid) if err != nil { - //Log Error Request - if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) - if err != nil { - return nil, err - } - } - return nextMsg, err + return cmsg, err } - response, err := p.transformOutput(node, nextMsg) - - if response != nil { - //Log Output Transformer Request - if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, nextMsg, puid) - if err != nil { - return nil, err - } - } - - // Log Output Transformer Response - if err == nil && node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, response, puid) - if err != nil { - return nil, err - } - } - } else { - //Log Request - if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) - if err != nil { - return nil, err - } - } - - if tmsg != nil { - // Log Response - if node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { - err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, tmsg, puid) - if err != nil { - return nil, err - } - } - } - - response = nextMsg - } + response, err := p.transformOutput(node, cmsg, puid) if envEnableRoutingInjection { if routeResponse, err := util.InsertRouteToSeldonPredictPayload(response, &p.Routing); err == nil { From 6bf82ab7cc41fdf1b76cdb50671970c117a247e1 Mon Sep 17 00:00:00 2001 From: SachinVarghese Date: Mon, 24 Jan 2022 16:23:05 +0530 Subject: [PATCH 3/3] Adding payload logging to combiner component --- executor/predictor/predictor_process.go | 35 +++++++++++++++++++------ 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/executor/predictor/predictor_process.go b/executor/predictor/predictor_process.go index 6f096e5d34..a8b35e450e 100644 --- a/executor/predictor/predictor_process.go +++ b/executor/predictor/predictor_process.go @@ -151,10 +151,6 @@ func (p *PredictorProcess) transformOutput(node *v1.PredictiveUnit, msg payload. modelName := p.getModelName(node) if callClient { - msg, err := p.Client.Chain(p.Ctx, modelName, msg) - if err != nil { - return nil, err - } //Log Request if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) @@ -162,6 +158,11 @@ func (p *PredictorProcess) transformOutput(node *v1.PredictiveUnit, msg payload. return nil, err } } + + msg, err := p.Client.Chain(p.Ctx, modelName, msg) + if err != nil { + return nil, err + } tmsg, err := p.Client.TransformOutput(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) if tmsg != nil && err == nil { // Log Response @@ -232,7 +233,7 @@ func (p *PredictorProcess) route(node *v1.PredictiveUnit, msg payload.SeldonPayl } } -func (p *PredictorProcess) aggregate(node *v1.PredictiveUnit, msg []payload.SeldonPayload) (payload.SeldonPayload, error) { +func (p *PredictorProcess) aggregate(node *v1.PredictiveUnit, cmsg []payload.SeldonPayload, msg payload.SeldonPayload, puid string) (payload.SeldonPayload, error) { callClient := false if (*node).Type != nil { switch *node.Type { @@ -247,12 +248,29 @@ func (p *PredictorProcess) aggregate(node *v1.PredictiveUnit, msg []payload.Seld modelName := p.getModelName(node) if callClient { + //Log Request + if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg, puid) + if err != nil { + return nil, err + } + } p.RoutingMutex.Lock() p.Routing[node.Name] = -1 p.RoutingMutex.Unlock() - return p.Client.Combine(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), msg, p.Meta.Meta) + tmsg, err := p.Client.Combine(p.Ctx, modelName, node.Endpoint.ServiceHost, p.getPort(node), cmsg, p.Meta.Meta) + if tmsg != nil && err == nil { + // Log Response + if node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, tmsg, puid) + if err != nil { + return nil, err + } + } + } + return tmsg, err } else { - return msg[0], nil + return cmsg[0], nil } } @@ -264,6 +282,7 @@ func (p *PredictorProcess) predictChildren(node *v1.PredictiveUnit, msg payload. } var cmsgs []payload.SeldonPayload if route == -1 { + cmsgs = make([]payload.SeldonPayload, len(node.Children)) var errs = make([]error, len(node.Children)) wg := sync.WaitGroup{} @@ -299,7 +318,7 @@ func (p *PredictorProcess) predictChildren(node *v1.PredictiveUnit, msg payload. return cmsgs[0], err } } - return p.aggregate(node, cmsgs) + return p.aggregate(node, cmsgs, msg, puid) } else { // Don't add routing for leaf nodes return msg, nil