From 0dcb417bab0aadc6d1092a192f6754d40ad5b870 Mon Sep 17 00:00:00 2001 From: Alp Aker Date: Mon, 14 Sep 2020 00:32:35 -0700 Subject: [PATCH] reader: Avoid max-in-flight violations when dynamically setting max-in-flight. --- nsq/reader.py | 18 ++++++++++- tests/test_rdy.py | 77 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/nsq/reader.py b/nsq/reader.py index 85f18fb..90917cc 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -368,7 +368,8 @@ def _maybe_update_rdy(self, conn): # 2. After a change in connection count or max_in_flight we adjust to the new # connection_max_in_flight. conn_max_in_flight = self._connection_max_in_flight() - if conn.rdy == 1 or conn.rdy != conn_max_in_flight: + if (conn.rdy == 1 or conn.rdy != conn_max_in_flight) and \ + self.total_rdy < self.max_in_flight: self._send_rdy(conn, conn_max_in_flight) def _finish_backoff_block(self): @@ -631,6 +632,10 @@ def set_max_in_flight(self, max_in_flight): self._send_rdy(conn, 0) self.total_rdy = 0 else: + for conn in self.conns.values(): + if conn.rdy_timeout is not None: + self.io_loop.remove_timeout(conn.rdy_timeout) + conn.rdy_timeout = None self.need_rdy_redistributed = True self._redistribute_rdy_state() @@ -665,6 +670,17 @@ def _redistribute_rdy_state(self): if self.need_rdy_redistributed: self.need_rdy_redistributed = False + if self.total_rdy > self.max_in_flight: + conns = list(self.conns.values()) + available_rdy = self.max_in_flight + while conns and available_rdy: + available_rdy -= 1 + conn = conns.pop(random.randrange(len(conns))) + self._send_rdy(conn, 1) + while conns: + conn = conns.pop() + self._send_rdy(conn, 0) + # first set RDY 0 to all connections that have not received a message within # a configurable timeframe (low_rdy_idle_timeout). for conn_id, conn in iteritems(self.conns): diff --git a/tests/test_rdy.py b/tests/test_rdy.py index af5e597..fe1c952 100644 --- a/tests/test_rdy.py +++ b/tests/test_rdy.py @@ -58,3 +58,80 @@ def test_rdy_complete_backoff(ioloop_current_mock, conn_count, max_in_flight): msg = send_message(conn) msg.trigger(event.FINISH, message=msg) assert r.total_rdy == max_in_flight + + +@pytest.mark.parametrize(['conn_count', 'old_max', 'new_max'], [ + [4, 8, 12], + [4, 3, 12], + [4, 2, 3], + [4, 12, 8], + [4, 12, 3], + [4, 3, 2], +]) +def test_rdy_set_max_in_flight(old_max, new_max, conn_count): + r = get_reader(old_max) + conns = [get_conn(r) for _ in range(conn_count)] + for c in conns: + msg = send_message(c) + msg.trigger(event.FINISH, message=msg) + r.set_max_in_flight(new_max) + assert r.total_rdy <= new_max + for c in conns: + msg = send_message(c) + msg.trigger(event.FINISH, message=msg) + assert r.total_rdy == new_max + + +def test_rdy_disable_enable(): + max_in_flight = 10 + r = get_reader(max_in_flight) + conns = [get_conn(r) for _ in range(max_in_flight // 2 - 1)] + msg = send_message(conns[0]) + + r.set_max_in_flight(0) + assert r.total_rdy == 0 + + conns.append(get_conn(r)) + assert r.total_rdy == 0 + + msg.trigger(event.FINISH, message=msg) + assert r.total_rdy == 0 + + r.set_max_in_flight(max_in_flight) + assert r.total_rdy == len(conns) + + +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_disable_enable_in_backoff(ioloop_current_mock): + ioloop_mock = get_ioloop() + ioloop_current_mock.return_value = ioloop_mock + max_in_flight = 10 + r = get_reader(max_in_flight) + conns = [get_conn(r) for _ in range(2)] + msg = send_message(conns[0]) + + r.set_max_in_flight(0) + assert r.total_rdy == 0 + + msg.trigger(event.REQUEUE, message=msg) + timeout_args, _ = ioloop_mock.add_timeout.call_args + timeout_args[1]() + assert r.total_rdy == 0 + + r.set_max_in_flight(max_in_flight) + assert r.total_rdy == 1 + + +@patch("random.randrange") +@patch("tornado.ioloop.IOLoop.current") +def test_rdy_retry(ioloop_current_mock, randrange_mock): + ioloop_current_mock.return_value = get_ioloop() + randrange_mock.return_value = 0 + r = get_reader(max_in_flight=1) + get_conn(r) + r.set_max_in_flight(0) + c = get_conn(r) + r.set_max_in_flight(1) + if c.rdy_timeout is not None: + c.rdy_timeout() + assert r.total_rdy <= r.max_in_flight