Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch processor enhancemenst through raw data parameter #3718

Merged
merged 10 commits into from
Nov 5, 2021
126 changes: 67 additions & 59 deletions python/seldon_core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,68 @@ def _start_request_worker(
q_in.task_done()


def _multi_request_extract_raw_data(loaded_data, tags):
RafalSkolasinski marked this conversation as resolved.
Show resolved Hide resolved
raw_input_tags = [d.get("meta", {}).get("tags", {}) for d in loaded_data]
first_input = loaded_data[0]

# Raw input format in mini-batch mode only work for "data" format
if "data" not in first_input:
raise ValueError(
"raw input with predict in mini-batch mode requires data payload"
)
# If-block for ndarray case
elif "ndarray" in first_input["data"]:
payload_type = "ndarray"
names_list = [d["data"]["names"] for d in loaded_data]
arrays = [np.array(d["data"]["ndarray"]) for d in loaded_data]
if not all_equal(names_list):
raise ValueError("All names in mini-batch must be the same.")
for arr in arrays:
if arr.shape[0] != 1:
raise ValueError(
"When using mini-batching each row should contain single instance."
)
ndarray = np.concatenate(arrays)
raw_data = {
"data": {"names": names_list[0], "ndarray": ndarray.tolist()},
"meta": {"tags": tags},
}
return raw_data, payload_type, raw_input_tags

# If-block for tensor case
elif "tensor" in first_input["data"]:
payload_type = "tensor"
names_list = [d["data"]["names"] for d in loaded_data]
tensor_shapes = [d["data"]["tensor"]["shape"] for d in loaded_data]
tensor_values = [d["data"]["tensor"]["values"] for d in loaded_data]

if not all_equal(names_list):
raise ValueError("All names in mini-batch must be the same.")

dim_0 = 0
dim_1 = tensor_shapes[0][1]
for shape in tensor_shapes:
if shape[0] != 1:
raise ValueError(
"When using mini-batching each row should contain single instance."
)
dim_0 += shape[0]
if dim_1 != shape[1]:
raise ValueError(
"All instances in mini-batch must have same number of features."
)
values = sum(tensor_values, [])
shape = [dim_0, dim_1]
raw_data = {
"data": {
"names": names_list[0],
"tensor": {"shape": shape, "values": values},
},
"meta": {"tags": tags},
}
return raw_data, payload_type, raw_input_tags


def _send_batch_predict_multi_request(
input_data: [],
data_type: str,
Expand Down Expand Up @@ -336,64 +398,10 @@ def _send_batch_predict_multi_request(
try:
RafalSkolasinski marked this conversation as resolved.
Show resolved Hide resolved
# Process raw input format
if data_type == "raw":
input_tags = [d.get("meta", {}).get("tags", {}) for d in loaded_data]
first_input = loaded_data[0]

# Raw input format in mini-batch mode only work for "data" format
if "data" not in first_input:
raise ValueError(
"raw input with predict in mini-batch mode requires data payload"
)
# If-block for ndarray case
elif "ndarray" in first_input["data"]:
payload_type = "ndarray"
names_list = [d["data"]["names"] for d in loaded_data]
arrays = [np.array(d["data"]["ndarray"]) for d in loaded_data]
if not all_equal(names_list):
raise ValueError("All names in mini-batch must be the same.")
for arr in arrays:
if arr.shape[0] != 1:
raise ValueError(
"When using mini-batching each row should contain single instance."
)
ndarray = np.concatenate(arrays)
raw_data = {
"data": {"names": names_list[0], "ndarray": ndarray.tolist()},
"meta": {"tags": predict_kwargs["meta"]},
}
predict_kwargs["raw_data"] = raw_data
# If-block for tensor case
elif "tensor" in first_input["data"]:
payload_type = "tensor"
names_list = [d["data"]["names"] for d in loaded_data]
tensor_shapes = [d["data"]["tensor"]["shape"] for d in loaded_data]
tensor_values = [d["data"]["tensor"]["values"] for d in loaded_data]

if not all_equal(names_list):
raise ValueError("All names in mini-batch must be the same.")

dim_0 = 0
dim_1 = tensor_shapes[0][1]
for shape in tensor_shapes:
if shape[0] != 1:
raise ValueError(
"When using mini-batching each row should contain single instance."
)
dim_0 += shape[0]
if dim_1 != shape[1]:
raise ValueError(
"All instances in mini-batch must have same number of features."
)
values = sum(tensor_values, [])
shape = [dim_0, dim_1]
raw_data = {
"data": {
"names": names_list[0],
"tensor": {"shape": shape, "values": values},
},
"meta": {"tags": predict_kwargs["meta"]},
}
predict_kwargs["raw_data"] = raw_data
raw_data, payload_type, raw_input_tags = _multi_request_extract_raw_data(
loaded_data, predict_kwargs["meta"]
)
predict_kwargs["raw_data"] = raw_data
else:
# Initialise concatenated array for data
arrays = [np.array(arr) for arr in loaded_data]
Expand Down Expand Up @@ -448,7 +456,7 @@ def _send_batch_predict_multi_request(
try:
new_response = copy.deepcopy(response)
if data_type == "raw":
new_response["meta"]["tags"].update(input_tags[i])
new_response["meta"]["tags"].update(raw_input_tags[i])
if payload_type == "ndarray":
# Format new responses for each original prediction request
new_response["data"]["ndarray"] = [response["data"]["ndarray"][i]]
Expand Down