Skip to content

Commit

Permalink
Incorporate changes to support binary and tftensor payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
jklaise committed Dec 4, 2018
1 parent eb4974f commit 57648ec
Show file tree
Hide file tree
Showing 12 changed files with 2,061 additions and 106 deletions.
99 changes: 71 additions & 28 deletions python/seldon_core/microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import time
import logging
import multiprocessing as mp
import tensorflow as tf
from tensorflow.core.framework.tensor_pb2 import TensorProto
from google.protobuf import json_format

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,14 +62,14 @@ def to_dict(self):
def sanity_check_request(req):
if not type(req) == dict:
raise SeldonMicroserviceException("Request must be a dictionary")
data = req.get("data")
if data is None:
if "data" in req:
data = req.get("data")
if not type(data) == dict:
raise SeldonMicroserviceException("data field must be a dictionary")
if data.get('ndarray') is None and data.get('tensor') is None and data.get('tftensor') is None:
raise SeldonMicroserviceException("Data dictionary has no 'tensor', 'ndarray' or 'tftensor' keyword.")
elif not ("binData" in req or "strData" in req):
raise SeldonMicroserviceException("Request must contain Default Data")
if not type(data) == dict:
raise SeldonMicroserviceException("Data must be a dictionary")
if data.get('ndarray') is None and data.get('tensor') is None:
raise SeldonMicroserviceException(
"Data dictionary has no 'ndarray' or 'tensor' keyword.")
# TODO: Should we check more things? Like shape not being None or empty for a tensor?


Expand Down Expand Up @@ -103,70 +107,109 @@ def array_to_list_value(array, lv=None):
return lv


def get_data_from_json(message):
if "data" in message:
datadef = message.get("data")
return rest_datadef_to_array(datadef)
elif "binData" in message:
return message["binData"]
elif "strData" in message:
return message["strData"]
else:
strJson = json.dumps(message)
raise SeldonMicroserviceException("Can't find data in json: "+strJson)


def rest_datadef_to_array(datadef):
if datadef.get("tensor") is not None:
features = np.array(datadef.get("tensor").get("values")).reshape(
datadef.get("tensor").get("shape"))
features = np.array(datadef.get("tensor").get("values")).reshape(datadef.get("tensor").get("shape"))
elif datadef.get("ndarray") is not None:
features = np.array(datadef.get("ndarray"))
elif datadef.get("tftensor") is not None:
tfp = TensorProto()
json_format.ParseDict(datadef.get("tftensor"), tfp, ignore_unknown_fields=False)
features = tf.make_ndarray(tfp)
else:
features = np.array([])
return features


def array_to_rest_datadef(array, names, original_datadef):
datadef = {"names": names}
def array_to_rest_datadef(array,names,original_datadef):
datadef = {"names":names}
if original_datadef.get("tensor") is not None:
datadef["tensor"] = {
"shape": array.shape,
"values": array.ravel().tolist()
"shape":array.shape,
"values":array.ravel().tolist()
}
elif original_datadef.get("ndarray") is not None:
datadef["ndarray"] = array.tolist()
elif original_datadef.get("tftensor") is not None:
tftensor = tf.make_tensor_proto(array)
jStrTensor = json_format.MessageToJson(tftensor)
jTensor = json.loads(jStrTensor)
datadef["tftensor"] = jTensor
else:
datadef["ndarray"] = array.tolist()
return datadef


def get_data_from_proto(request):
data_type = request.WhichOneof("data_oneof")
if data_type == "data":
datadef = request.data
return grpc_datadef_to_array(datadef)
elif data_type == "binData":
return request.binData
elif data_type == "strData":
return request.strData
else:
raise SeldonMicroserviceException("Unknown data in SeldonMessage")


def grpc_datadef_to_array(datadef):
data_type = datadef.WhichOneof("data_oneof")
if data_type == "tensor":
if (sys.version_info >= (3, 0)):
sz = np.prod(datadef.tensor.shape) # get number of float64 entries
c = datadef.tensor.SerializeToString() # get bytes
sz = np.prod(datadef.tensor.shape) # get number of float64 entries
c = datadef.tensor.SerializeToString() # get bytes
# create array from packed entries which are at end of bytes - assumes same endianness
features = np.frombuffer(memoryview(
c[-(sz * 8):]), dtype=np.float64, count=sz, offset=0)
features = np.frombuffer(memoryview(c[-(sz*8):]), dtype=np.float64, count=sz, offset=0)
features = features.reshape(datadef.tensor.shape)
else:
# Python 2 version which is slower
features = np.array(datadef.tensor.values).reshape(
datadef.tensor.shape)
features = np.array(datadef.tensor.values).reshape(datadef.tensor.shape)
elif data_type == "ndarray":
features = np.array(datadef.ndarray)
elif data_type == "tftensor":
features = tf.make_ndarray(datadef.tftensor)
else:
features = np.array([])
return features


def array_to_grpc_datadef(array, names, data_type):
def array_to_grpc_datadef(array,names,data_type):
if data_type == "tensor":
datadef = prediction_pb2.DefaultData(
names=names,
tensor=prediction_pb2.Tensor(
shape=array.shape,
values=array.ravel().tolist()
names = names,
tensor = prediction_pb2.Tensor(
shape = array.shape,
values = array.ravel().tolist()
)
)
elif data_type == "ndarray":
datadef = prediction_pb2.DefaultData(
names=names,
ndarray=array_to_list_value(array)
names = names,
ndarray = array_to_list_value(array)
)
elif data_type == "tftensor":
datadef = prediction_pb2.DefaultData(
names = names,
tftensor = tf.make_tensor_proto(array)
)
else:
datadef = prediction_pb2.DefaultData(
names=names,
ndarray=array_to_list_value(array)
names = names,
ndarray = array_to_list_value(array)
)

return datadef
Expand Down
67 changes: 40 additions & 27 deletions python/seldon_core/model_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from seldon_core.proto import prediction_pb2, prediction_pb2_grpc
from seldon_core.microservice import extract_message, sanity_check_request, rest_datadef_to_array, \
array_to_rest_datadef, grpc_datadef_to_array, array_to_grpc_datadef, \
SeldonMicroserviceException, get_custom_tags
SeldonMicroserviceException, get_custom_tags, get_data_from_json, get_data_from_proto
from seldon_core.metrics import get_custom_metrics
from seldon_core.seldon_flatbuffers import SeldonRPCToNumpyArray, NumpyArrayToSeldonRPC, CreateErrorMsg

Expand Down Expand Up @@ -66,26 +66,31 @@ def handle_invalid_usage(error):

@app.route("/seldon.json", methods=["GET"])
def openAPI():
return send_from_directory("openapi", "seldon.json")
return send_from_directory('', "seldon.json")

@app.route("/predict", methods=["GET", "POST"])
def Predict():
request = extract_message()
sanity_check_request(request)

datadef = request.get("data")
features = rest_datadef_to_array(datadef)

predictions = np.array(
predict(user_model, features, datadef.get("names")))
if len(predictions.shape) > 1:
class_names = get_class_names(user_model, predictions.shape[1])
features = get_data_from_json(request)
names = request.get("data", {}).get("names")

predictions = predict(user_model, features, names)

# If predictions is an numpy array or we used the default data then return as numpy array
if isinstance(predictions, np.ndarray) or "data" in request:
predictions = np.array(predictions)
if len(predictions.shape) > 1:
class_names = get_class_names(user_model, predictions.shape[1])
else:
class_names = []
data = array_to_rest_datadef(
predictions, class_names, request.get("data", {}))
response = {"data": data, "meta": {}}
else:
class_names = []

data = array_to_rest_datadef(predictions, class_names, datadef)
response = {"binData": predictions, "meta": {}}

response = {"data": data, "meta": {}}
tags = get_custom_tags(user_model)
if tags:
response["meta"]["tags"] = tags
Expand All @@ -101,8 +106,9 @@ def SendFeedback():
datadef_request = feedback.get("request", {}).get("data", {})
features = rest_datadef_to_array(datadef_request)

datadef_truth = feedback.get("truth",{}).get("data",{})
datadef_truth = feedback.get("truth", {}).get("data", {})
truth = rest_datadef_to_array(datadef_truth)

reward = feedback.get("reward")

send_feedback(user_model, features,
Expand All @@ -121,19 +127,10 @@ def __init__(self, user_model):
self.user_model = user_model

def Predict(self, request, context):
features = get_data_from_proto(request)
datadef = request.data
features = grpc_datadef_to_array(datadef)

predictions = np.array(
predict(self.user_model, features, datadef.names))
if len(predictions.shape) > 1:
class_names = get_class_names(
self.user_model, predictions.shape[1])
else:
class_names = []

data = array_to_grpc_datadef(
predictions, class_names, request.data.WhichOneof("data_oneof"))
data_type = request.WhichOneof("data_oneof")
predictions = predict(self.user_model, features, datadef.names)

# Construct meta data
meta = prediction_pb2.Meta()
Expand All @@ -146,7 +143,23 @@ def Predict(self, request, context):
metaJson["metrics"] = metrics
json_format.ParseDict(metaJson, meta)

return prediction_pb2.SeldonMessage(data=data, meta=meta)
if isinstance(predictions, np.ndarray) or data_type == "data":
predictions = np.array(predictions)
if len(predictions.shape) > 1:
class_names = get_class_names(
self.user_model, predictions.shape[1])
else:
class_names = []

if data_type == "data":
default_data_type = request.data.WhichOneof("data_oneof")
else:
default_data_type = "tensor"
data = array_to_grpc_datadef(
predictions, class_names, default_data_type)
return prediction_pb2.SeldonMessage(data=data, meta=meta)
else:
return prediction_pb2.SeldonMessage(binData=predictions, meta=meta)

def SendFeedback(self, feedback, context):
datadef_request = feedback.request.data
Expand Down
3 changes: 3 additions & 0 deletions python/seldon_core/proto/prediction.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "google/protobuf/struct.proto";
import "tensorflow/core/framework/tensor.proto";

package seldon.protos;

Expand All @@ -25,6 +26,7 @@ message DefaultData {
oneof data_oneof {
Tensor tensor = 2;
google.protobuf.ListValue ndarray = 3;
tensorflow.TensorProto tftensor = 4;
}
}

Expand All @@ -50,6 +52,7 @@ message Metric {
string key = 1;
MetricType type = 2;
float value = 3;
map<string,string> tags = 4;
}

message SeldonMessageList {
Expand Down
Loading

0 comments on commit 57648ec

Please sign in to comment.