Skip to content

Commit

Permalink
expose rest feedback endpoint in executor
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalSkolasinski committed May 15, 2020
1 parent 9dc1885 commit a00a124
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 8 deletions.
2 changes: 0 additions & 2 deletions executor/api/metric/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ const (
ModelImageMetric = "model_image"
ModelVersionMetric = "model_version"

PredictionServiceMetricName = "predictions"

ServerRequestsMetricName = "seldon_api_executor_server_requests_seconds"
ClientRequestsMetricName = "seldon_api_executor_client_requests_seconds"

Expand Down
41 changes: 37 additions & 4 deletions executor/api/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,16 @@ func (r *SeldonRestApi) Initialise() {
//v0.1 API
api01 := r.Router.PathPrefix("/api/v0.1").Methods("POST").Subrouter()
api01.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions))
api01.Handle("/feedback", r.wrapMetrics(metric.FeedbackHttpServiceName, r.feedback))
r.Router.NewRoute().Path("/api/v0.1/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status))
r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata))
r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata))
r.Router.NewRoute().PathPrefix("/api/v0.1/doc/").Handler(http.StripPrefix("/api/v0.1/doc/", http.FileServer(http.Dir("./openapi/"))))
//v1.0 API
api1 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter()
api1.Handle("/predictions", r.wrapMetrics(metric.PredictionServiceMetricName, r.predictions))
api10 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter()
api10.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions))
api10.Handle("/feedback", r.wrapMetrics(metric.FeedbackHttpServiceName, r.feedback))
r.Router.NewRoute().Path("/api/v1.0/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status))
r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata))
r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata))
r.Router.NewRoute().PathPrefix("/api/v1.0/doc/").Handler(http.StripPrefix("/api/v1.0/doc/", http.FileServer(http.Dir("./openapi/"))))

case api.ProtocolTensorflow:
Expand Down Expand Up @@ -260,6 +262,37 @@ func (r *SeldonRestApi) status(w http.ResponseWriter, req *http.Request) {
r.respondWithSuccess(w, http.StatusOK, resPayload)
}

func (r *SeldonRestApi) feedback(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()

// Apply tracing if active
if opentracing.IsGlobalTracerRegistered() {
var serverSpan opentracing.Span
ctx, serverSpan = setupTracing(ctx, req, TracingStatusName)
defer serverSpan.Finish()
}

bodyBytes, err := ioutil.ReadAll(req.Body)
if err != nil {
r.respondWithError(w, nil, err)
return
}

seldonPredictorProcess := predictor.NewPredictorProcess(ctx, r.Client, logf.Log.WithName(LoggingRestClientName), r.ServerUrl, r.Namespace, req.Header)
reqPayload, err := seldonPredictorProcess.Client.Unmarshall(bodyBytes)
if err != nil {
r.respondWithError(w, nil, err)
return
}

resPayload, err := seldonPredictorProcess.Feedback(r.predictor.Graph, reqPayload)
if err != nil {
r.respondWithError(w, resPayload, err)
return
}
r.respondWithSuccess(w, http.StatusOK, resPayload)
}

func (r *SeldonRestApi) predictions(w http.ResponseWriter, req *http.Request) {
r.Log.Info("Predictions called")

Expand Down
28 changes: 28 additions & 0 deletions executor/api/rest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,34 @@ func TestSeldonMetadata(t *testing.T) {
g.Expect(res.Body.String()).To(Equal(test.TestClientMetadataResponse))
}

func TestSeldonFeedback(t *testing.T) {
t.Logf("Started")
g := NewGomegaWithT(t)

model := v1.MODEL
p := v1.PredictorSpec{
Name: "p",
Graph: &v1.PredictiveUnit{
Type: &model,
Endpoint: &v1.Endpoint{
ServiceHost: "foo",
ServicePort: 9000,
Type: v1.REST,
},
},
}
url, _ := url.Parse("http://localhost")
r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics")
r.Initialise()
var data = ` {"data":{"ndarray":[1.1,2.0]}}`

req, _ := http.NewRequest("POST", "/api/v1.0/feedback", strings.NewReader(data))
req.Header = map[string][]string{"Content-Type": []string{"application/json"}}
res := httptest.NewRecorder()
r.Router.ServeHTTP(res, req)
g.Expect(res.Code).To(Equal(200))
}

func TestTensorflowMetadata(t *testing.T) {
t.Logf("Started")
g := NewGomegaWithT(t)
Expand Down
8 changes: 6 additions & 2 deletions executor/api/test/seldonmessage_test_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (s SeldonMessageTestClient) Feedback(ctx context.Context, modelName string,
if s.ErrMethod != nil && *s.ErrMethod == v1.SEND_FEEDBACK {
return nil, s.Err
}
resp := &payload.ProtoPayload{Msg: msg.GetPayload().(*proto.Feedback).Request}
return resp, nil
protoFeedback, ok := msg.GetPayload().(*proto.Feedback)
if ok == true {
resp := &payload.ProtoPayload{Msg: protoFeedback.Request}
return resp, nil
}
return msg, nil
}

0 comments on commit a00a124

Please sign in to comment.