Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: don't send RDY 1 on a new connection if that would violate max-in-flight #254

Merged
merged 4 commits into from
Oct 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ def _maybe_update_rdy(self, conn):
# 2. After a change in connection count or max_in_flight we adjust to the new
# connection_max_in_flight.
conn_max_in_flight = self._connection_max_in_flight()
if conn.rdy == 1 or conn.rdy != conn_max_in_flight:
if (conn.rdy == 1 or conn.rdy != conn_max_in_flight) and \
self.total_rdy < self.max_in_flight:
self._send_rdy(conn, conn_max_in_flight)

def _finish_backoff_block(self):
Expand Down Expand Up @@ -402,11 +403,15 @@ def _on_backoff_resume(self, success, **kwargs):

def _complete_backoff_block(self):
self.backoff_block_completed = True
rdy = self._connection_max_in_flight()
logger.info('[%s] backoff complete, resuming normal operation (%d connections)',
self.name, len(self.conns))
for c in self.conns.values():
self._send_rdy(c, rdy)
if self.max_in_flight < len(self.conns):
self.need_rdy_redistributed = True
self._redistribute_rdy_state()
else:
rdy = self._connection_max_in_flight()
for c in self.conns.values():
self._send_rdy(c, rdy)

def _enter_continue_or_exit_backoff(self):
# Take care of backoff in the appropriate cases. When this
Expand Down Expand Up @@ -532,7 +537,8 @@ def _on_connection_ready(self, conn, **kwargs):
# *initially* starved since redistribute won't apply
# 2. `max_in_flight < num_conns` ensuring that we never exceed max_in_flight
# and rely on the fact that redistribute will handle balancing RDY across conns
if not self.backoff_timer.get_interval() or len(self.conns) == 1:
if (not self.backoff_timer.get_interval() or len(self.conns) == 1) and \
self.total_rdy < self.max_in_flight:
# only send RDY 1 if we're not in backoff (some other conn
# should be testing the waters)
# (but always send it if we're the first)
Expand Down Expand Up @@ -615,6 +621,11 @@ def query_lookupd(self):

def set_max_in_flight(self, max_in_flight):
"""Dynamically adjust the reader max_in_flight. Set to 0 to immediately disable a Reader"""
for conn in self.conns.values():
if conn.rdy_timeout is not None:
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

assert isinstance(max_in_flight, int)
self.max_in_flight = max_in_flight

Expand Down Expand Up @@ -660,6 +671,17 @@ def _redistribute_rdy_state(self):
if self.need_rdy_redistributed:
self.need_rdy_redistributed = False

if self.total_rdy > self.max_in_flight:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this block instead just set RDY 0 for all connections and let the rest of the logic below identify conns to receive non-zero ready?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current redistribution logic I think that change could make it unsafe to call set_max_in_flight() with messages in flight. Right now redistribution avoids giving positive RDY to a connection with a message in flight, so something like the following could happen:

  1. Let max-in-flight be > len(conns).
  2. With a message in flight on conn c, reduce m-i-f but keep it >= len(conns).
  3. _redistribute_rdy_state() sets RDY 0 on all connections, including c.
  4. Because c has something in flight, this redistribution round leaves c at RDY 0.
  5. Subsequent redistribution rounds see m-i-f >= len(conns) and so don't redistribute RDY, leaving c starved.

We could make the simplification you suggest if we allow redistribution to give RDY to a conn with something in flight:

--- a/nsq/reader.py
+++ b/nsq/reader.py
@@ -720,7 +720,7 @@ class Reader(Client):
             # We also don't attempt to avoid the connections who previously might have had RDY 1
             # because it would be overly complicated and not actually worth it (ie. given enough
             # redistribution rounds it doesn't matter).
-            possible_conns = [c for c in conns if not (c.in_flight or c.rdy)]
+            possible_conns = [c for c in conns if not c.rdy]
             while possible_conns and available_rdy:
                 available_rdy -= 1
                 conn = possible_conns.pop(random.randrange(len(possible_conns)))

which (I think) is fine from a correctness standpoint and will at worst increase the variance in how long it takes to service all connections equally.

conns = list(self.conns.values())
available_rdy = self.max_in_flight
while conns and available_rdy:
available_rdy -= 1
conn = conns.pop(random.randrange(len(conns)))
self._send_rdy(conn, 1)
while conns:
conn = conns.pop()
self._send_rdy(conn, 0)

# first set RDY 0 to all connections that have not received a message within
# a configurable timeframe (low_rdy_idle_timeout).
for conn_id, conn in iteritems(self.conns):
Expand Down
17 changes: 11 additions & 6 deletions tests/reader_unit_test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import

import collections
import time

from mock import patch, create_autospec
Expand All @@ -13,18 +14,22 @@


def get_reader(max_in_flight=5):
return nsq.Reader("test", "test",
message_handler=_message_handler,
lookupd_http_addresses=["http://localhost:4161"],
max_in_flight=max_in_flight,
max_backoff_duration=2.0,
)
r = nsq.Reader(
"test", "test",
message_handler=_message_handler,
lookupd_http_addresses=["http://localhost:4161"],
max_in_flight=max_in_flight,
max_backoff_duration=2.0,
)
r.conns = collections.OrderedDict()
return r


def get_ioloop():
ioloop = create_autospec(IOLoop, instance=True)
ioloop.time.side_effect = time.time
ioloop.call_later.side_effect = lambda dt, cb: ioloop.add_timeout(time.time() + dt, cb)
ioloop.add_timeout.side_effect = lambda _, cb: cb
return ioloop


Expand Down
248 changes: 248 additions & 0 deletions tests/test_rdy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import pytest
from mock import patch

from nsq import event

from .reader_unit_test_helpers import (
get_reader,
get_conn,
get_ioloop,
send_message
)

Expand All @@ -22,3 +28,245 @@ def test_new_conn_throttles_down_existing_conns():
conn2 = get_conn(r)
assert conn2.rdy == 1
assert conn1.rdy == r._connection_max_in_flight()


def test_new_conn_respects_max_in_flight():
max_in_flight = 1
r = get_reader(max_in_flight)
get_conn(r)
get_conn(r)


@pytest.mark.parametrize(['conn_count', 'max_in_flight'], [
[5, 10],
[5, 4],
])
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_complete_backoff(ioloop_current_mock, conn_count, max_in_flight):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
r = get_reader(max_in_flight=max_in_flight)
conns = [get_conn(r) for _ in range(conn_count)]

msg = send_message(conns[0])
msg.trigger(event.REQUEUE, message=msg)

timeout_args, _ = ioloop_mock.add_timeout.call_args
conn = timeout_args[1]()
assert r.total_rdy == 1

msg = send_message(conn)
msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == max_in_flight


@pytest.mark.parametrize(['conn_count', 'old_max', 'new_max'], [
[4, 8, 12],
[4, 3, 12],
[4, 2, 3],
[4, 12, 8],
[4, 12, 3],
[4, 3, 2],
])
def test_rdy_set_max_in_flight(old_max, new_max, conn_count):
r = get_reader(old_max)
conns = [get_conn(r) for _ in range(conn_count)]
for c in conns:
msg = send_message(c)
msg.trigger(event.FINISH, message=msg)
r.set_max_in_flight(new_max)
assert r.total_rdy <= new_max
for c in conns:
msg = send_message(c)
msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == new_max


def test_rdy_disable_enable():
max_in_flight = 10
r = get_reader(max_in_flight)
conns = [get_conn(r) for _ in range(max_in_flight // 2 - 1)]
msg = send_message(conns[0])

r.set_max_in_flight(0)
assert r.total_rdy == 0

conns.append(get_conn(r))
assert r.total_rdy == 0

msg.trigger(event.FINISH, message=msg)
assert r.total_rdy == 0

r.set_max_in_flight(max_in_flight)
assert r.total_rdy == len(conns)


@patch("tornado.ioloop.IOLoop.current")
def test_rdy_disable_enable_in_backoff(ioloop_current_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
max_in_flight = 10
r = get_reader(max_in_flight)
conns = [get_conn(r) for _ in range(2)]
msg = send_message(conns[0])

r.set_max_in_flight(0)
assert r.total_rdy == 0

msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert r.total_rdy == 0

r.set_max_in_flight(max_in_flight)
assert r.total_rdy == 1


@patch("random.randrange")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_retry(ioloop_current_mock, randrange_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
randrange_mock.return_value = 0
r = get_reader(max_in_flight=1)
get_conn(r)
r.set_max_in_flight(0)
c = get_conn(r)
r.set_max_in_flight(1)
if c.rdy_timeout is not None:
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert r.total_rdy <= r.max_in_flight


@patch("random.randrange")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_idle_conns_lose_rdy(ioloop_current_mock, time_mock, randrange_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
randrange_mock.return_value = 1
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
conn3 = get_conn(r)
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
msg = send_message(conn1)
msg.trigger(event.FINISH, message=msg)
t += r.low_rdy_idle_timeout - 0.1
r._redistribute_rdy_state()
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
t += 0.2
r._redistribute_rdy_state()
assert conn1.rdy == 0
assert conn2.rdy == conn3.rdy == 1


@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_doesnt_reallocate_rdy_in_use(ioloop_current_mock, time_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
get_conn(r)
assert r.total_rdy == 2
msg1 = send_message(conn1)
msg2 = send_message(conn2)
msg1.trigger(event.FINISH, message=msg1)
t += r.low_rdy_idle_timeout + 0.1
r._redistribute_rdy_state()
assert r.total_rdy == 1
msg2.trigger(event.FINISH, message=msg2)
r._redistribute_rdy_state()
assert r.total_rdy == 2


@patch("random.randrange")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_isnt_pinned_to_conns(ioloop_current_mock, time_mock, randrange_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
randrange_mock.return_value = 1
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
conn3 = get_conn(r)
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
msg1 = send_message(conn1)
msg2 = send_message(conn2)
msg1.trigger(event.FINISH, message=msg1)
t += 1
r._redistribute_rdy_state()
assert conn1.rdy == 1
assert conn2.rdy == conn3.rdy == 0
t += 1
msg2.trigger(event.FINISH, message=msg2)
r._redistribute_rdy_state()
assert conn1.rdy == conn3.rdy == 1
assert conn2.rdy == 0


@patch("random.choice")
@patch("random.randrange")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_idle_conns_lose_rdy_in_backoff(
ioloop_current_mock, time_mock, randrange_mock, choice_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
randrange_mock.return_value = 2
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
conn3 = get_conn(r)
choice_mock.return_value = conn1
assert conn1.rdy == conn2.rdy == 1
assert conn3.rdy == 0
msg = send_message(conn1)
msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
timeout_args[1]()
assert conn1.rdy == r.total_rdy == 1
t += r.low_rdy_idle_timeout + 0.1
r._redistribute_rdy_state()
assert conn3.rdy == r.total_rdy == 1


@patch("random.choice")
@patch("time.time")
@patch("tornado.ioloop.IOLoop.current")
def test_rdy_redistribution_doesnt_reallocate_rdy_in_use_in_backoff(
ioloop_current_mock, time_mock, choice_mock):
ioloop_mock = get_ioloop()
ioloop_current_mock.return_value = ioloop_mock
t = 1
time_mock.side_effect = lambda: t
r = get_reader(max_in_flight=2)
conn1 = get_conn(r)
conn2 = get_conn(r)
get_conn(r)
assert r.total_rdy == 2
msg = send_message(conn1)
send_message(conn2)
msg.trigger(event.REQUEUE, message=msg)
timeout_args, _ = ioloop_mock.add_timeout.call_args
choice_mock.return_value = conn1
timeout_args[1]()
assert r.total_rdy == 1
t += r.low_rdy_idle_timeout + 0.1
r._redistribute_rdy_state()
assert r.total_rdy == 0