Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tags backward compatibility in executor #1481

Merged
merged 3 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions python/seldon_core/seldon_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def predict(
client_response = client_predict(
user_model, features, datadef.names, meta=meta
)
return construct_response(user_model, False, request, client_response)
return construct_response(user_model, False, request, client_response, meta)
else:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_predict(
user_model, features, class_names, meta=meta
)
return construct_response_json(user_model, False, request, client_response)
return construct_response_json(
user_model, False, request, client_response, meta
)


def send_feedback(
Expand Down Expand Up @@ -162,14 +164,16 @@ def transform_input(
client_response = client_transform_input(
user_model, features, datadef.names, meta=meta
)
return construct_response(user_model, False, request, client_response)
return construct_response(user_model, False, request, client_response, meta)
else:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_transform_input(
user_model, features, class_names, meta=meta
)
return construct_response_json(user_model, False, request, client_response)
return construct_response_json(
user_model, False, request, client_response, meta
)


def transform_output(
Expand Down Expand Up @@ -213,14 +217,16 @@ def transform_output(
client_response = client_transform_output(
user_model, features, datadef.names, meta=meta
)
return construct_response(user_model, False, request, client_response)
return construct_response(user_model, False, request, client_response, meta)
else:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_transform_output(
user_model, features, class_names, meta=meta
)
return construct_response_json(user_model, False, request, client_response)
return construct_response_json(
user_model, False, request, client_response, meta
)


def route(
Expand Down Expand Up @@ -296,6 +302,14 @@ def aggregate(
Aggregated SeldonMessage proto

"""

def merge_meta(meta_list):
tags = {}
for meta in meta_list:
if meta:
tags.update(meta.get("tags", {}))
return {"tags": tags}

is_proto = isinstance(request, prediction_pb2.SeldonMessageList)

if hasattr(user_model, "aggregate_rest"):
Expand All @@ -314,15 +328,21 @@ def aggregate(
if is_proto:
features_list = []
names_list = []
meta_list = []

for msg in request.seldonMessages:
(features, meta, datadef, data_type) = extract_request_parts(msg)
features_list.append(features)
names_list.append(datadef.names)
meta_list.append(meta)

client_response = client_aggregate(user_model, features_list, names_list)
return construct_response(
RafalSkolasinski marked this conversation as resolved.
Show resolved Hide resolved
user_model, False, request.seldonMessages[0], client_response
user_model,
False,
request.seldonMessages[0],
client_response,
merge_meta(meta_list),
)
else:
features_list = []
Expand All @@ -339,14 +359,18 @@ def aggregate(
f"Invalid request data type: {request}"
)

meta_list = []
for msg in msgs:
(features, meta, datadef, data_type) = extract_request_parts_json(msg)
class_names = datadef["names"] if datadef and "names" in datadef else []
features_list.append(features)
names_list.append(class_names)
meta_list.append(meta)

client_response = client_aggregate(user_model, features_list, names_list)
return construct_response_json(user_model, False, msgs[0], client_response)
return construct_response_json(
user_model, False, msgs[0], client_response, merge_meta(meta_list)
)


def health_status(user_model: Any) -> Union[prediction_pb2.SeldonMessage, List, Dict]:
Expand Down
33 changes: 24 additions & 9 deletions python/seldon_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def construct_response_json(
is_request: bool,
client_request_raw: Union[List, Dict],
client_raw_response: Union[np.ndarray, str, bytes, dict],
meta: dict = None,
) -> Union[List, Dict]:
"""
This class converts a raw REST response into a JSON object that has the same structure as
Expand Down Expand Up @@ -410,8 +411,13 @@ def construct_response_json(
response["data"]["names"] = names

response["meta"] = {}
client_custom_tags(user_model)
tags = client_custom_tags(user_model)
if meta:
RafalSkolasinski marked this conversation as resolved.
Show resolved Hide resolved
tags = meta.get("tags", {})
else:
tags = {}
custom_tags = client_custom_tags(user_model)
if custom_tags:
tags.update(custom_tags)
if tags:
response["meta"]["tags"] = tags
metrics = client_custom_metrics(user_model)
Expand All @@ -429,6 +435,7 @@ def construct_response(
is_request: bool,
client_request: prediction_pb2.SeldonMessage,
client_raw_response: Union[np.ndarray, str, bytes, dict],
meta: dict = None,
) -> prediction_pb2.SeldonMessage:
"""

Expand All @@ -449,18 +456,26 @@ def construct_response(

"""
data_type = client_request.WhichOneof("data_oneof")
meta = prediction_pb2.Meta()
meta_pb = prediction_pb2.Meta()
meta_json: Dict = {}
tags = client_custom_tags(user_model)

if meta:
tags = meta.get("tags", {})
else:
tags = {}
custom_tags = client_custom_tags(user_model)
if custom_tags:
tags.update(custom_tags)
if tags:
meta_json["tags"] = tags

metrics = client_custom_metrics(user_model)
if metrics:
meta_json["metrics"] = metrics
if client_request.meta:
if client_request.meta.puid:
meta_json["puid"] = client_request.meta.puid
json_format.ParseDict(meta_json, meta)
json_format.ParseDict(meta_json, meta_pb)
if isinstance(client_raw_response, np.ndarray) or isinstance(
client_raw_response, list
):
Expand All @@ -482,16 +497,16 @@ def construct_response(
else:
default_data_type = "ndarray"
data = array_to_grpc_datadef(default_data_type, client_raw_response, names)
return prediction_pb2.SeldonMessage(data=data, meta=meta)
return prediction_pb2.SeldonMessage(data=data, meta=meta_pb)
elif isinstance(client_raw_response, str):
return prediction_pb2.SeldonMessage(strData=client_raw_response, meta=meta)
return prediction_pb2.SeldonMessage(strData=client_raw_response, meta=meta_pb)
elif isinstance(client_raw_response, dict):
jsonDataResponse = ParseDict(
client_raw_response, prediction_pb2.SeldonMessage().jsonData
)
return prediction_pb2.SeldonMessage(jsonData=jsonDataResponse, meta=meta)
return prediction_pb2.SeldonMessage(jsonData=jsonDataResponse, meta=meta_pb)
elif isinstance(client_raw_response, (bytes, bytearray)):
return prediction_pb2.SeldonMessage(binData=client_raw_response, meta=meta)
return prediction_pb2.SeldonMessage(binData=client_raw_response, meta=meta_pb)
else:
raise SeldonMicroserviceException(
"Unknown data type returned as payload:" + client_raw_response
Expand Down
67 changes: 67 additions & 0 deletions python/tests/test_combiner_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,33 @@ def test_aggreate_ok_seldon_messages():
assert j["data"]["ndarray"] == [1]


def test_aggreate_combines_tags():
user_object = UserObject()
app = get_rest_microservice(user_object)
client = app.test_client()
msgs = (
"["
'{"meta":{"tags":{"input-1":"yes","common":1}}, "data":{"ndarray":[0]}}, '
'{"meta":{"tags":{"input-2":"yes","common":2}}, "data":{"ndarray":[1]}}'
"]"
)
# Note: double "{{}}" used to escape for string formatting
rv = client.get('/aggregate?json={{"seldonMessages":{}}}'.format(msgs))
logging.info(rv)
j = json.loads(rv.data)
logging.info(j)
assert rv.status_code == 200
assert j["meta"]["tags"] == {
"common": 2,
"input-1": "yes",
"input-2": "yes",
"mytag": 1,
}
assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"]
assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"]
assert j["data"]["ndarray"] == [0]


def test_aggreate_ok_list():
user_object = UserObject()
app = get_rest_microservice(user_object)
Expand Down Expand Up @@ -265,6 +292,46 @@ def test_aggregate_proto_ok():
assert j["data"]["tensor"]["values"] == [1, 2]


def test_aggregate_proto_combines_tags():
user_object = UserObject()
app = SeldonModelGRPC(user_object)

arr1 = np.array([1, 2])
meta1 = prediction_pb2.Meta()
json_format.ParseDict({"tags": {"input-1": "yes", "common": 1}}, meta1)
datadef1 = prediction_pb2.DefaultData(
tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr1)
)

arr2 = np.array([3, 4])
meta2 = prediction_pb2.Meta()
json_format.ParseDict({"tags": {"input-2": "yes", "common": 2}}, meta2)
datadef2 = prediction_pb2.DefaultData(
tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr2)
)

msg1 = prediction_pb2.SeldonMessage(data=datadef1, meta=meta1)
msg2 = prediction_pb2.SeldonMessage(data=datadef2, meta=meta2)
request = prediction_pb2.SeldonMessageList(seldonMessages=[msg1, msg2])
resp = app.Aggregate(request, None)
jStr = json_format.MessageToJson(resp)
j = json.loads(jStr)
logging.info(j)

assert j["meta"]["tags"] == {
"common": 2,
"input-1": "yes",
"input-2": "yes",
"mytag": 1,
}

# add default type
assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"]
assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"]
assert j["data"]["tensor"]["shape"] == [2, 1]
assert j["data"]["tensor"]["values"] == [1, 2]


def test_aggregate_proto_bin_data():
user_object = UserObject()
app = SeldonModelGRPC(user_object)
Expand Down
46 changes: 44 additions & 2 deletions python/tests/test_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np
import signal
import unittest.mock as mock
from google.protobuf import json_format


@contextmanager
Expand Down Expand Up @@ -54,8 +55,8 @@ def start_microservice(app_location, tracing=False, grpc=False, envs={}):
)
if tracing:
cmd = cmd + ("--tracing",)
logging.info("starting:", " ".join(cmd))
logging.info("cwd:", app_location)
logging.info("starting: %s", " ".join(cmd))
logging.info("cwd: %s", app_location)
# stdout=PIPE, stderr=PIPE,
p = Popen(cmd, cwd=app_location, env=env_vars, preexec_fn=os.setsid)

Expand Down Expand Up @@ -100,6 +101,22 @@ def test_model_template_app_rest(tracing):
assert response.json() == {"data": {"ndarray": []}, "meta": {}}


@pytest.mark.parametrize("tracing", [(False), (True)])
def test_model_template_app_rest_tags(tracing):
with start_microservice(
join(dirname(__file__), "model-template-app"), tracing=tracing
):
data = '{"meta":{"tags":{"foo":"bar"}},"data":{"names":["a","b"],"ndarray":[[1.0,2.0]]}}'
response = requests.get(
"http://127.0.0.1:5000/predict", params="json=%s" % data
)
response.raise_for_status()
assert response.json() == {
"data": {"names": ["t:0", "t:1"], "ndarray": [[1.0, 2.0]]},
"meta": {"tags": {"foo": "bar"}},
}


@pytest.mark.parametrize("tracing", [(False), (True)])
def test_model_template_app_rest_submodule(tracing):
with start_microservice(
Expand Down Expand Up @@ -154,6 +171,31 @@ def test_model_template_app_grpc(tracing):
response = stub.SendFeedback(request=request)


@pytest.mark.parametrize("tracing", [(False), (True)])
def test_model_template_app_grpc_tags(tracing):
with start_microservice(
join(dirname(__file__), "model-template-app"), tracing=tracing, grpc=True
):
data = np.array([[1, 2]])
datadef = prediction_pb2.DefaultData(
tensor=prediction_pb2.Tensor(shape=data.shape, values=data.flatten())
)

meta = prediction_pb2.Meta()
json_format.ParseDict({"tags": {"foo": "bar"}}, meta)

request = prediction_pb2.SeldonMessage(data=datadef, meta=meta)
channel = grpc.insecure_channel("0.0.0.0:5000")
stub = prediction_pb2_grpc.ModelStub(channel)
response = stub.Predict(request=request)
assert response.data.tensor.shape[0] == 1
assert response.data.tensor.shape[1] == 2
assert response.data.tensor.values[0] == 1
assert response.data.tensor.values[1] == 2

assert response.meta.tags["foo"].string_value == "bar"


def test_model_template_app_tracing_config():
envs = {
"JAEGER_CONFIG_PATH": join(dirname(__file__), "tracing_config/tracing.yaml")
Expand Down
Loading