Skip to content

Commit

Permalink
Raise an exception if any indexer workers fail their health check (#3726
Browse files Browse the repository at this point in the history
)

* Raise an exception if any indexer workers fail their health check

* Add tests for failure case

* Add tests for happy path

* Simplify pook assertions

Co-authored-by: sarayourfriend <[email protected]>

* Reword comment to be more accurate

---------

Co-authored-by: sarayourfriend <[email protected]>
  • Loading branch information
AetherUnbound and sarayourfriend authored Feb 8, 2024
1 parent 5b271f5 commit f8e6d35
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,17 @@ def _assign_work(db_conn, workers, model_name, table_name, target_index):

worker_url_template = "http://{}:8002"
# Wait for the workers to start.
failures = []
for worker in workers:
worker_url = worker_url_template.format(worker)
succeeded = _wait_for_healthcheck(f"{worker_url}/healthcheck")
if not succeeded:
return False
failures.append(worker)
if failures:
raise ValueError(
f"Some workers didn't respond to health check: {','.join(failures)}"
)

for idx, worker in enumerate(workers):
worker_url = worker_url_template.format(worker)
params = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from unittest import mock

import pook
import pytest

from ingestion_server import distributed_reindex_scheduler


@pytest.mark.parametrize(
"estimated_records, record_limit, workers, expected_ranges",
[
# One worker
(100, 1000, ["worker1"], [(0, 100)]),
# Multiple workers, even split
(100, 1000, ["worker1", "worker2"], [(0, 50), (50, 100)]),
# Multiple workers, uneven split
# NOTE: records get dropped in this case
(100, 1000, ["worker1", "worker2", "worker3"], [(0, 33), (33, 66), (66, 99)]),
# One worker, limited
(100, 55, ["worker1"], [(0, 55)]),
# Two workers, limited
(100, 50, ["worker1", "worker2"], [(0, 25), (25, 50)]),
],
)
def test_assign_work(estimated_records, record_limit, workers, expected_ranges):
# Checks for the parameters
assert len(workers) == len(
expected_ranges
), "Number of workers and expected ranges do not match, correct the test parameters"
# Set up database mock response
mock_db = mock.MagicMock()
mock_db.cursor.return_value.__enter__.return_value.fetchone.return_value = [
estimated_records
]
# Enable pook & mock other internal functions
with pook.use(), mock.patch(
"ingestion_server.distributed_reindex_scheduler.get_record_limit"
) as mock_get_record_limit, mock.patch(
"ingestion_server.distributed_reindex_scheduler._wait_for_healthcheck"
) as mock_wait_for_healthcheck:
mock_wait_for_healthcheck.return_value = True
mock_get_record_limit.return_value = record_limit

# Set up pook matches
for worker, (start_id, end_id) in zip(workers, expected_ranges):
pook.post(f"http://{worker}:8002/indexing_task").json(
{
"model_name": "sample_model",
"table_name": "sample_table",
"target_index": "sample_index",
"start_id": start_id,
"end_id": end_id,
}
)

distributed_reindex_scheduler._assign_work(
mock_db,
workers,
"sample_model",
"sample_table",
"sample_index",
)

# Raise an exception to ensure all pook requests were matched
assert pook.isdone()


def test_assign_work_workers_fail():
mock_db = mock.MagicMock()
mock_db.cursor.return_value.__enter__.return_value.fetchone.return_value = [100]
with mock.patch(
"ingestion_server.distributed_reindex_scheduler._wait_for_healthcheck"
) as mock_wait_for_healthcheck, pook.use(False):
mock_wait_for_healthcheck.return_value = False

with pytest.raises(ValueError, match="Some workers didn't respond"):
distributed_reindex_scheduler._assign_work(
mock_db,
["worker"] * 6,
"fake-model",
"fake-table",
"target-index",
)

0 comments on commit f8e6d35

Please sign in to comment.