Skip to content

Commit

Permalink
reader: Avoid max-in-flight violations when dynamically setting max-i…
Browse files Browse the repository at this point in the history
…n-flight.
  • Loading branch information
Alp Aker committed Sep 14, 2020
1 parent 6f2e662 commit 0dcb417
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 1 deletion.
18 changes: 17 additions & 1 deletion nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ def _maybe_update_rdy(self, conn):
# 2. After a change in connection count or max_in_flight we adjust to the new
# connection_max_in_flight.
conn_max_in_flight = self._connection_max_in_flight()
if conn.rdy == 1 or conn.rdy != conn_max_in_flight:
if (conn.rdy == 1 or conn.rdy != conn_max_in_flight) and \
self.total_rdy < self.max_in_flight:
self._send_rdy(conn, conn_max_in_flight)

def _finish_backoff_block(self):
Expand Down Expand Up @@ -631,6 +632,10 @@ def set_max_in_flight(self, max_in_flight):
self._send_rdy(conn, 0)
self.total_rdy = 0
else:
for conn in self.conns.values():
if conn.rdy_timeout is not None:
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None
self.need_rdy_redistributed = True
self._redistribute_rdy_state()

Expand Down Expand Up @@ -665,6 +670,17 @@ def _redistribute_rdy_state(self):
if self.need_rdy_redistributed:
self.need_rdy_redistributed = False

if self.total_rdy > self.max_in_flight:
conns = list(self.conns.values())
available_rdy = self.max_in_flight
while conns and available_rdy:
available_rdy -= 1
conn = conns.pop(random.randrange(len(conns)))
self._send_rdy(conn, 1)
while conns:
conn = conns.pop()
self._send_rdy(conn, 0)

# first set RDY 0 to all connections that have not received a message within
# a configurable timeframe (low_rdy_idle_timeout).
for conn_id, conn in iteritems(self.conns):
Expand Down
77 changes: 77 additions & 0 deletions tests/test_rdy.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,80 @@ def test_rdy_complete_backoff(ioloop_current_mock, conn_count, max_in_flight):
msg = send_message(conn)
msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == max_in_flight


@pytest.mark.parametrize(['conn_count', 'old_max', 'new_max'], [
[4, 8, 12],
[4, 3, 12],
[4, 2, 3],
[4, 12, 8],
[4, 12, 3],
[4, 3, 2],
])
def test_rdy_set_max_in_flight(old_max, new_max, conn_count):
r = get_reader(old_max)
conns = [get_conn(r) for _ in range(conn_count)]
for c in conns:
msg = send_message(c)
msg.trigger(event.FINISH, message=msg)
r.set_max_in_flight(new_max)
assert r.total_rdy <= new_max
for c in conns:
msg = send_message(c)
msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == new_max


def test_rdy_disable_enable():
max_in_flight = 10
r = get_reader(max_in_flight)
conns = [get_conn(r) for _ in range(max_in_flight // 2 - 1)]
msg = send_message(conns[0])

r.set_max_in_flight(0)
assert r.total_rdy == 0

conns.append(get_conn(r))
assert r.total_rdy == 0

msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == 0

r.set_max_in_flight(max_in_flight)
assert r.total_rdy == len(conns)


@patch("tornado.ioloop.IOLoop.current")
def test_rdy_disable_enable_in_backoff(ioloop_current_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
max_in_flight = 10
r = get_reader(max_in_flight)
conns = [get_conn(r) for _ in range(2)]
msg = send_message(conns[0])

r.set_max_in_flight(0)
assert r.total_rdy == 0

msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert r.total_rdy == 0

r.set_max_in_flight(max_in_flight)
assert r.total_rdy == 1


@patch("random.randrange")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_retry(ioloop_current_mock, randrange_mock):
ioloop_current_mock.return_value = get_ioloop()
randrange_mock.return_value = 0
r = get_reader(max_in_flight=1)
get_conn(r)
r.set_max_in_flight(0)
c = get_conn(r)
r.set_max_in_flight(1)
if c.rdy_timeout is not None:
c.rdy_timeout()
assert r.total_rdy <= r.max_in_flight

0 comments on commit 0dcb417

Please sign in to comment.