Skip to content

Commit

Permalink
fix problem with aliquoting when using mini-batching, closes 3377 (#3404
Browse files Browse the repository at this point in the history
)

* fix problem with aliquoting when using mini-batching, closes 3377

* fix tests

* make licence linter happy
  • Loading branch information
RafalSkolasinski authored Jul 15, 2021
1 parent e0d8312 commit 3de8b4e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 4 deletions.
2 changes: 1 addition & 1 deletion python/licenses/license_info.no_versions.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"cachetools","MIT License"
"certifi","Mozilla Public License 2.0 (MPL 2.0)"
"cffi","MIT License"
"chardet","GNU Library or Lesser General Public License (LGPL)"
"charset-normalizer","MIT License"
"click","BSD License"
"cryptography","Apache Software License; BSD License"
"flatbuffers","Apache Software License"
Expand Down
2 changes: 2 additions & 0 deletions python/seldon_core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ def _start_input_file_worker(
q_in.put(batch)
batch = []
enum_idx += 1
if batch:
q_in.put(batch)


def _start_output_file_worker(
Expand Down
2 changes: 1 addition & 1 deletion python/seldon_core/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __del__(self):
self._manager.shutdown()

def update_reward(self, reward: float):
""""Update metrics key corresponding to feedback reward counter."""
"""Update metrics key corresponding to feedback reward counter."""
if not reward or legacy_mode:
return
self.update(
Expand Down
17 changes: 15 additions & 2 deletions testing/scripts/test_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
wait_for_status,
)

logging.basicConfig(level=logging.DEBUG)


class TestBatchWorker(object):
def test_batch_worker(self, namespace):
Expand All @@ -35,6 +37,8 @@ def test_batch_worker(self, namespace):
for i in range(batch_size):
f.write("[[1,2,3,4]]\n")

logging.info("Sending first batch: mini-batch size=1")

start_multithreaded_batch_worker(
"sklearn",
"istio",
Expand All @@ -54,13 +58,20 @@ def test_batch_worker(self, namespace):
str(uuid.uuid1()),
)

logging.info("Finished first batch. Checking.")

with open(output_data_path, "r") as f:
count = 0
for line in f:
count += 1
output = json.loads(line)
# Ensure all requests are successful
assert output.get("data", {}).get("ndarray", False)
assert count == batch_size

# Now test that a batch size of 10 works
logging.info("Sending first batch: mini-batch size=30")

# Now test that with a mini batch size of 30 works
start_multithreaded_batch_worker(
"sklearn",
"istio",
Expand All @@ -71,7 +82,7 @@ def test_batch_worker(self, namespace):
"ndarray",
100,
3,
10,
30,
input_data_path,
output_data_path,
"predict",
Expand All @@ -80,6 +91,8 @@ def test_batch_worker(self, namespace):
str(uuid.uuid1()),
)

logging.info("Finished first batch. Checking.")

with open(output_data_path, "r") as f:
count = 0
for line in f:
Expand Down

0 comments on commit 3de8b4e

Please sign in to comment.