Skip to content

Commit

Permalink
Merge pull request #254 from alpaker/respect-max-in-flight-on-startup
Browse files Browse the repository at this point in the history
reader: don't send RDY 1 on a new connection if that would violate max-in-flight
  • Loading branch information
mreiferson authored Oct 24, 2021
2 parents a1fac14 + 16abeb7 commit ab14efe
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 11 deletions.
32 changes: 27 additions & 5 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ def _maybe_update_rdy(self, conn):
# 2. After a change in connection count or max_in_flight we adjust to the new
# connection_max_in_flight.
conn_max_in_flight = self._connection_max_in_flight()
if conn.rdy == 1 or conn.rdy != conn_max_in_flight:
if (conn.rdy == 1 or conn.rdy != conn_max_in_flight) and \
self.total_rdy < self.max_in_flight:
self._send_rdy(conn, conn_max_in_flight)

def _finish_backoff_block(self):
Expand Down Expand Up @@ -402,11 +403,15 @@ def _on_backoff_resume(self, success, **kwargs):

def _complete_backoff_block(self):
self.backoff_block_completed = True
rdy = self._connection_max_in_flight()
logger.info('[%s] backoff complete, resuming normal operation (%d connections)',
self.name, len(self.conns))
for c in self.conns.values():
self._send_rdy(c, rdy)
if self.max_in_flight < len(self.conns):
self.need_rdy_redistributed = True
self._redistribute_rdy_state()
else:
rdy = self._connection_max_in_flight()
for c in self.conns.values():
self._send_rdy(c, rdy)

def _enter_continue_or_exit_backoff(self):
# Take care of backoff in the appropriate cases. When this
Expand Down Expand Up @@ -532,7 +537,8 @@ def _on_connection_ready(self, conn, **kwargs):
# *initially* starved since redistribute won't apply
# 2. `max_in_flight < num_conns` ensuring that we never exceed max_in_flight
# and rely on the fact that redistribute will handle balancing RDY across conns
if not self.backoff_timer.get_interval() or len(self.conns) == 1:
if (not self.backoff_timer.get_interval() or len(self.conns) == 1) and \
self.total_rdy < self.max_in_flight:
# only send RDY 1 if we're not in backoff (some other conn
# should be testing the waters)
# (but always send it if we're the first)
Expand Down Expand Up @@ -615,6 +621,11 @@ def query_lookupd(self):

def set_max_in_flight(self, max_in_flight):
"""Dynamically adjust the reader max_in_flight. Set to 0 to immediately disable a Reader"""
for conn in self.conns.values():
if conn.rdy_timeout is not None:
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

assert isinstance(max_in_flight, int)
self.max_in_flight = max_in_flight

Expand Down Expand Up @@ -660,6 +671,17 @@ def _redistribute_rdy_state(self):
if self.need_rdy_redistributed:
self.need_rdy_redistributed = False

if self.total_rdy > self.max_in_flight:
conns = list(self.conns.values())
available_rdy = self.max_in_flight
while conns and available_rdy:
available_rdy -= 1
conn = conns.pop(random.randrange(len(conns)))
self._send_rdy(conn, 1)
while conns:
conn = conns.pop()
self._send_rdy(conn, 0)

# first set RDY 0 to all connections that have not received a message within
# a configurable timeframe (low_rdy_idle_timeout).
for conn_id, conn in iteritems(self.conns):
Expand Down
17 changes: 11 additions & 6 deletions tests/reader_unit_test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import

import collections
import time

from mock import patch, create_autospec
Expand All @@ -13,18 +14,22 @@


def get_reader(max_in_flight=5):
return nsq.Reader("test", "test",
message_handler=_message_handler,
lookupd_http_addresses=["http://localhost:4161"],
max_in_flight=max_in_flight,
max_backoff_duration=2.0,
)
r = nsq.Reader(
"test", "test",
message_handler=_message_handler,
lookupd_http_addresses=["http://localhost:4161"],
max_in_flight=max_in_flight,
max_backoff_duration=2.0,
)
r.conns = collections.OrderedDict()
return r


def get_ioloop():
ioloop = create_autospec(IOLoop, instance=True)
ioloop.time.side_effect = time.time
ioloop.call_later.side_effect = lambda dt, cb: ioloop.add_timeout(time.time() + dt, cb)
ioloop.add_timeout.side_effect = lambda _, cb: cb
return ioloop


Expand Down
248 changes: 248 additions & 0 deletions tests/test_rdy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import pytest
from mock import patch

from nsq import event

from .reader_unit_test_helpers import (
get_reader,
get_conn,
get_ioloop,
send_message
)

Expand All @@ -22,3 +28,245 @@ def test_new_conn_throttles_down_existing_conns():
conn2 = get_conn(r)
assert conn2.rdy == 1
assert conn1.rdy == r._connection_max_in_flight()


def test_new_conn_respects_max_in_flight():
max_in_flight = 1
r = get_reader(max_in_flight)
get_conn(r)
get_conn(r)


@pytest.mark.parametrize(['conn_count', 'max_in_flight'], [
[5, 10],
[5, 4],
])
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_complete_backoff(ioloop_current_mock, conn_count, max_in_flight):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
r = get_reader(max_in_flight=max_in_flight)
conns = [get_conn(r) for _ in range(conn_count)]

msg = send_message(conns[0])
msg.trigger(event.REQUEUE, message=msg)

timeout_args, _ = ioloop_mock.add_timeout.call_args
conn = timeout_args[1]()
assert r.total_rdy == 1

msg = send_message(conn)
msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == max_in_flight


@pytest.mark.parametrize(['conn_count', 'old_max', 'new_max'], [
[4, 8, 12],
[4, 3, 12],
[4, 2, 3],
[4, 12, 8],
[4, 12, 3],
[4, 3, 2],
])
def test_rdy_set_max_in_flight(old_max, new_max, conn_count):
r = get_reader(old_max)
conns = [get_conn(r) for _ in range(conn_count)]
for c in conns:
msg = send_message(c)
msg.trigger(event.FINISH, message=msg)
r.set_max_in_flight(new_max)
assert r.total_rdy <= new_max
for c in conns:
msg = send_message(c)
msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == new_max


def test_rdy_disable_enable():
max_in_flight = 10
r = get_reader(max_in_flight)
conns = [get_conn(r) for _ in range(max_in_flight // 2 - 1)]
msg = send_message(conns[0])

r.set_max_in_flight(0)
assert r.total_rdy == 0

conns.append(get_conn(r))
assert r.total_rdy == 0

msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == 0

r.set_max_in_flight(max_in_flight)
assert r.total_rdy == len(conns)


@patch("tornado.ioloop.IOLoop.current")
def test_rdy_disable_enable_in_backoff(ioloop_current_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
max_in_flight = 10
r = get_reader(max_in_flight)
conns = [get_conn(r) for _ in range(2)]
msg = send_message(conns[0])

r.set_max_in_flight(0)
assert r.total_rdy == 0

msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert r.total_rdy == 0

r.set_max_in_flight(max_in_flight)
assert r.total_rdy == 1


@patch("random.randrange")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_retry(ioloop_current_mock, randrange_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
randrange_mock.return_value = 0
r = get_reader(max_in_flight=1)
get_conn(r)
r.set_max_in_flight(0)
c = get_conn(r)
r.set_max_in_flight(1)
if c.rdy_timeout is not None:
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert r.total_rdy <= r.max_in_flight


@patch("random.randrange")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_idle_conns_lose_rdy(ioloop_current_mock, time_mock, randrange_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
randrange_mock.return_value = 1
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
conn3 = get_conn(r)
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
msg = send_message(conn1)
msg.trigger(event.FINISH, message=msg)
t += r.low_rdy_idle_timeout - 0.1
r._redistribute_rdy_state()
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
t += 0.2
r._redistribute_rdy_state()
assert conn1.rdy == 0
assert conn2.rdy == conn3.rdy == 1


@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_doesnt_reallocate_rdy_in_use(ioloop_current_mock, time_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
get_conn(r)
assert r.total_rdy == 2
msg1 = send_message(conn1)
msg2 = send_message(conn2)
msg1.trigger(event.FINISH, message=msg1)
t += r.low_rdy_idle_timeout + 0.1
r._redistribute_rdy_state()
assert r.total_rdy == 1
msg2.trigger(event.FINISH, message=msg2)
r._redistribute_rdy_state()
assert r.total_rdy == 2


@patch("random.randrange")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_isnt_pinned_to_conns(ioloop_current_mock, time_mock, randrange_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
randrange_mock.return_value = 1
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
conn3 = get_conn(r)
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
msg1 = send_message(conn1)
msg2 = send_message(conn2)
msg1.trigger(event.FINISH, message=msg1)
t += 1
r._redistribute_rdy_state()
assert conn1.rdy == 1
assert conn2.rdy == conn3.rdy == 0
t += 1
msg2.trigger(event.FINISH, message=msg2)
r._redistribute_rdy_state()
assert conn1.rdy == conn3.rdy == 1
assert conn2.rdy == 0


@patch("random.choice")
@patch("random.randrange")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_idle_conns_lose_rdy_in_backoff(
ioloop_current_mock, time_mock, randrange_mock, choice_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
randrange_mock.return_value = 2
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
conn3 = get_conn(r)
choice_mock.return_value = conn1
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
msg = send_message(conn1)
msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert conn1.rdy == r.total_rdy == 1
t += r.low_rdy_idle_timeout + 0.1
r._redistribute_rdy_state()
assert conn3.rdy == r.total_rdy == 1


@patch("random.choice")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_doesnt_reallocate_rdy_in_use_in_backoff(
ioloop_current_mock, time_mock, choice_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
get_conn(r)
assert r.total_rdy == 2
msg = send_message(conn1)
send_message(conn2)
msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
choice_mock.return_value = conn1
timeout_args[1]()
assert r.total_rdy == 1
t += r.low_rdy_idle_timeout + 0.1
r._redistribute_rdy_state()
assert r.total_rdy == 0

0 comments on commit ab14efe

Please sign in to comment.