diff --git a/nsq/reader.py b/nsq/reader.py index 0c24245..1208893 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -368,7 +368,8 @@ def _maybe_update_rdy(self, conn): # 2. After a change in connection count or max_in_flight we adjust to the new # connection_max_in_flight. conn_max_in_flight = self._connection_max_in_flight() - if conn.rdy == 1 or conn.rdy != conn_max_in_flight: + if (conn.rdy == 1 or conn.rdy != conn_max_in_flight) and \ + self.total_rdy < self.max_in_flight: self._send_rdy(conn, conn_max_in_flight) def _finish_backoff_block(self): @@ -402,11 +403,15 @@ def _on_backoff_resume(self, success, **kwargs): def _complete_backoff_block(self): self.backoff_block_completed = True - rdy = self._connection_max_in_flight() logger.info('[%s] backoff complete, resuming normal operation (%d connections)', self.name, len(self.conns)) - for c in self.conns.values(): - self._send_rdy(c, rdy) + if self.max_in_flight < len(self.conns): + self.need_rdy_redistributed = True + self._redistribute_rdy_state() + else: + rdy = self._connection_max_in_flight() + for c in self.conns.values(): + self._send_rdy(c, rdy) def _enter_continue_or_exit_backoff(self): # Take care of backoff in the appropriate cases. When this @@ -532,7 +537,8 @@ def _on_connection_ready(self, conn, **kwargs): # *initially* starved since redistribute won't apply # 2. `max_in_flight < num_conns` ensuring that we never exceed max_in_flight # and rely on the fact that redistribute will handle balancing RDY across conns - if not self.backoff_timer.get_interval() or len(self.conns) == 1: + if (not self.backoff_timer.get_interval() or len(self.conns) == 1) and \ + self.total_rdy < self.max_in_flight: # only send RDY 1 if we're not in backoff (some other conn # should be testing the waters) # (but always send it if we're the first) @@ -615,6 +621,11 @@ def query_lookupd(self): def set_max_in_flight(self, max_in_flight): """Dynamically adjust the reader max_in_flight. Set to 0 to immediately disable a Reader""" + for conn in self.conns.values(): + if conn.rdy_timeout is not None: + self.io_loop.remove_timeout(conn.rdy_timeout) + conn.rdy_timeout = None + assert isinstance(max_in_flight, int) self.max_in_flight = max_in_flight @@ -660,6 +671,17 @@ def _redistribute_rdy_state(self): if self.need_rdy_redistributed: self.need_rdy_redistributed = False + if self.total_rdy > self.max_in_flight: + conns = list(self.conns.values()) + available_rdy = self.max_in_flight + while conns and available_rdy: + available_rdy -= 1 + conn = conns.pop(random.randrange(len(conns))) + self._send_rdy(conn, 1) + while conns: + conn = conns.pop() + self._send_rdy(conn, 0) + # first set RDY 0 to all connections that have not received a message within # a configurable timeframe (low_rdy_idle_timeout). for conn_id, conn in iteritems(self.conns): diff --git a/tests/reader_unit_test_helpers.py b/tests/reader_unit_test_helpers.py index a837982..dda8b77 100644 --- a/tests/reader_unit_test_helpers.py +++ b/tests/reader_unit_test_helpers.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import collections import time from mock import patch, create_autospec @@ -13,18 +14,22 @@ def get_reader(max_in_flight=5): - return nsq.Reader("test", "test", - message_handler=_message_handler, - lookupd_http_addresses=["http://localhost:4161"], - max_in_flight=max_in_flight, - max_backoff_duration=2.0, - ) + r = nsq.Reader( + "test", "test", + message_handler=_message_handler, + lookupd_http_addresses=["http://localhost:4161"], + max_in_flight=max_in_flight, + max_backoff_duration=2.0, + ) + r.conns = collections.OrderedDict() + return r def get_ioloop(): ioloop = create_autospec(IOLoop, instance=True) ioloop.time.side_effect = time.time ioloop.call_later.side_effect = lambda dt, cb: ioloop.add_timeout(time.time() + dt, cb) + ioloop.add_timeout.side_effect = lambda _, cb: cb return ioloop diff --git a/tests/test_rdy.py b/tests/test_rdy.py index 719df07..8450bae 100644 --- a/tests/test_rdy.py +++ b/tests/test_rdy.py @@ -1,6 +1,12 @@ +import pytest +from mock import patch + +from nsq import event + from .reader_unit_test_helpers import ( get_reader, get_conn, + get_ioloop, send_message ) @@ -22,3 +28,245 @@ def test_new_conn_throttles_down_existing_conns(): conn2 = get_conn(r) assert conn2.rdy == 1 assert conn1.rdy == r._connection_max_in_flight() + + +def test_new_conn_respects_max_in_flight(): + max_in_flight = 1 + r = get_reader(max_in_flight) + get_conn(r) + get_conn(r) + + +@pytest.mark.parametrize(['conn_count', 'max_in_flight'], [ + [5, 10], + [5, 4], +]) +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_complete_backoff(ioloop_current_mock, conn_count, max_in_flight): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + r = get_reader(max_in_flight=max_in_flight) + conns = [get_conn(r) for _ in range(conn_count)] + + msg = send_message(conns[0]) + msg.trigger(event.REQUEUE, message=msg) + + timeout_args, _ = ioloop_mock.add_timeout.call_args + conn = timeout_args[1]() + assert r.total_rdy == 1 + + msg = send_message(conn) + msg.trigger(event.FINISH, message=msg) + assert r.total_rdy == max_in_flight + + +@pytest.mark.parametrize(['conn_count', 'old_max', 'new_max'], [ + [4, 8, 12], + [4, 3, 12], + [4, 2, 3], + [4, 12, 8], + [4, 12, 3], + [4, 3, 2], +]) +def test_rdy_set_max_in_flight(old_max, new_max, conn_count): + r = get_reader(old_max) + conns = [get_conn(r) for _ in range(conn_count)] + for c in conns: + msg = send_message(c) + msg.trigger(event.FINISH, message=msg) + r.set_max_in_flight(new_max) + assert r.total_rdy <= new_max + for c in conns: + msg = send_message(c) + msg.trigger(event.FINISH, message=msg) + assert r.total_rdy == new_max + + +def test_rdy_disable_enable(): + max_in_flight = 10 + r = get_reader(max_in_flight) + conns = [get_conn(r) for _ in range(max_in_flight // 2 - 1)] + msg = send_message(conns[0]) + + r.set_max_in_flight(0) + assert r.total_rdy == 0 + + conns.append(get_conn(r)) + assert r.total_rdy == 0 + + msg.trigger(event.FINISH, message=msg) + assert r.total_rdy == 0 + + r.set_max_in_flight(max_in_flight) + assert r.total_rdy == len(conns) + + +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_disable_enable_in_backoff(ioloop_current_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + max_in_flight = 10 + r = get_reader(max_in_flight) + conns = [get_conn(r) for _ in range(2)] + msg = send_message(conns[0]) + + r.set_max_in_flight(0) + assert r.total_rdy == 0 + + msg.trigger(event.REQUEUE, message=msg) + timeout_args, _ = ioloop_mock.add_timeout.call_args + timeout_args[1]() + assert r.total_rdy == 0 + + r.set_max_in_flight(max_in_flight) + assert r.total_rdy == 1 + + +@patch("random.randrange") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_retry(ioloop_current_mock, randrange_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + randrange_mock.return_value = 0 + r = get_reader(max_in_flight=1) + get_conn(r) + r.set_max_in_flight(0) + c = get_conn(r) + r.set_max_in_flight(1) + if c.rdy_timeout is not None: + timeout_args, _ = ioloop_mock.add_timeout.call_args + timeout_args[1]() + assert r.total_rdy <= r.max_in_flight + + +@patch("random.randrange") +@patch("time.time") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_redistribution_idle_conns_lose_rdy(ioloop_current_mock, time_mock, randrange_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + t = 1 + time_mock.side_effect = lambda: t + randrange_mock.return_value = 1 + r = get_reader(max_in_flight=2) + conn1 = get_conn(r) + conn2 = get_conn(r) + conn3 = get_conn(r) + assert conn1.rdy == conn2.rdy == 1 + assert conn3.rdy == 0 + msg = send_message(conn1) + msg.trigger(event.FINISH, message=msg) + t += r.low_rdy_idle_timeout - 0.1 + r._redistribute_rdy_state() + assert conn1.rdy == conn2.rdy == 1 + assert conn3.rdy == 0 + t += 0.2 + r._redistribute_rdy_state() + assert conn1.rdy == 0 + assert conn2.rdy == conn3.rdy == 1 + + +@patch("time.time") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_redistribution_doesnt_reallocate_rdy_in_use(ioloop_current_mock, time_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + t = 1 + time_mock.side_effect = lambda: t + r = get_reader(max_in_flight=2) + conn1 = get_conn(r) + conn2 = get_conn(r) + get_conn(r) + assert r.total_rdy == 2 + msg1 = send_message(conn1) + msg2 = send_message(conn2) + msg1.trigger(event.FINISH, message=msg1) + t += r.low_rdy_idle_timeout + 0.1 + r._redistribute_rdy_state() + assert r.total_rdy == 1 + msg2.trigger(event.FINISH, message=msg2) + r._redistribute_rdy_state() + assert r.total_rdy == 2 + + +@patch("random.randrange") +@patch("time.time") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_redistribution_isnt_pinned_to_conns(ioloop_current_mock, time_mock, randrange_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + t = 1 + time_mock.side_effect = lambda: t + randrange_mock.return_value = 1 + r = get_reader(max_in_flight=2) + conn1 = get_conn(r) + conn2 = get_conn(r) + conn3 = get_conn(r) + assert conn1.rdy == conn2.rdy == 1 + assert conn3.rdy == 0 + msg1 = send_message(conn1) + msg2 = send_message(conn2) + msg1.trigger(event.FINISH, message=msg1) + t += 1 + r._redistribute_rdy_state() + assert conn1.rdy == 1 + assert conn2.rdy == conn3.rdy == 0 + t += 1 + msg2.trigger(event.FINISH, message=msg2) + r._redistribute_rdy_state() + assert conn1.rdy == conn3.rdy == 1 + assert conn2.rdy == 0 + + +@patch("random.choice") +@patch("random.randrange") +@patch("time.time") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_redistribution_idle_conns_lose_rdy_in_backoff( + ioloop_current_mock, time_mock, randrange_mock, choice_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + t = 1 + time_mock.side_effect = lambda: t + randrange_mock.return_value = 2 + r = get_reader(max_in_flight=2) + conn1 = get_conn(r) + conn2 = get_conn(r) + conn3 = get_conn(r) + choice_mock.return_value = conn1 + assert conn1.rdy == conn2.rdy == 1 + assert conn3.rdy == 0 + msg = send_message(conn1) + msg.trigger(event.REQUEUE, message=msg) + timeout_args, _ = ioloop_mock.add_timeout.call_args + timeout_args[1]() + assert conn1.rdy == r.total_rdy == 1 + t += r.low_rdy_idle_timeout + 0.1 + r._redistribute_rdy_state() + assert conn3.rdy == r.total_rdy == 1 + + +@patch("random.choice") +@patch("time.time") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_redistribution_doesnt_reallocate_rdy_in_use_in_backoff( + ioloop_current_mock, time_mock, choice_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + t = 1 + time_mock.side_effect = lambda: t + r = get_reader(max_in_flight=2) + conn1 = get_conn(r) + conn2 = get_conn(r) + get_conn(r) + assert r.total_rdy == 2 + msg = send_message(conn1) + send_message(conn2) + msg.trigger(event.REQUEUE, message=msg) + timeout_args, _ = ioloop_mock.add_timeout.call_args + choice_mock.return_value = conn1 + timeout_args[1]() + assert r.total_rdy == 1 + t += r.low_rdy_idle_timeout + 0.1 + r._redistribute_rdy_state() + assert r.total_rdy == 0