Skip to content

Commit

Permalink
reader: Don't decrement total_rdy on message receipt. Adjust RDY
Browse files Browse the repository at this point in the history
redistribution logic accordingly.

This brings reader behavior into agreement with nsqd behavior (compare nsqio/nsq#404)
and removes an opportunity for max_in_flight violations (nsqio#177).
  • Loading branch information
Alp Aker committed May 10, 2017
1 parent 278f148 commit a3acfe4
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 28 deletions.
1 change: 0 additions & 1 deletion nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ def _on_data(self, data, **kwargs):
frame, data = protocol.unpack_response(data)
if frame == protocol.FRAME_TYPE_MESSAGE:
self.last_msg_timestamp = time.time()
self.rdy = max(self.rdy - 1, 0)
self.in_flight += 1

message = protocol.decode_message(data)
Expand Down
45 changes: 19 additions & 26 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,21 +317,7 @@ def _on_message(self, conn, message, **kwargs):
logger.exception('[%s:%s] failed to handle_message() %r', conn.id, self.name, message)

def _handle_message(self, conn, message):
self.total_rdy = max(self.total_rdy - 1, 0)

rdy_conn = conn
if len(self.conns) > self.max_in_flight and time.time() - self.random_rdy_ts > 30:
# if all connections aren't getting RDY
# occsionally randomize which connection gets RDY
self.random_rdy_ts = time.time()
conns_with_no_rdy = [c for c in itervalues(self.conns) if not c.rdy]
if conns_with_no_rdy:
rdy_conn = random.choice(conns_with_no_rdy)
if rdy_conn is not conn:
logger.info('[%s:%s] redistributing RDY to %s',
conn.id, self.name, rdy_conn.id)

self._maybe_update_rdy(rdy_conn)
self._maybe_update_rdy(conn)

success = False
try:
Expand All @@ -358,7 +344,7 @@ def _maybe_update_rdy(self, conn):
if self.backoff_timer.get_interval() or self.max_in_flight == 0:
return

if conn.rdy <= 1 or conn.rdy < int(conn.last_rdy * 0.25):
if conn.rdy == 1:
self._send_rdy(conn, self._connection_max_in_flight())

def _finish_backoff_block(self):
Expand Down Expand Up @@ -452,15 +438,10 @@ def _send_rdy(self, conn, value):
if value > conn.max_rdy_count:
value = conn.max_rdy_count

if (self.total_rdy + value) > self.max_in_flight:
if not conn.rdy:
# if we're going from RDY 0 to non-0 and we couldn't because
# of the configured max in flight, try again
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 5, rdy_retry_callback)
new_rdy = max(self.total_rdy - conn.rdy + value, 0)
if new_rdy > self.max_in_flight:
return

new_rdy = max(self.total_rdy - conn.rdy + value, 0)
if conn.send_rdy(value):
self.total_rdy = new_rdy

Expand Down Expand Up @@ -657,18 +638,31 @@ def _redistribute_rdy_state(self):

# first set RDY 0 to all connections that have not received a message within
# a configurable timeframe (low_rdy_idle_timeout).
possible_conns, in_flight = [], []
for conn_id, conn in iteritems(self.conns):
last_message_duration = time.time() - conn.last_msg_timestamp
logger.debug('[%s:%s] rdy: %d (last message received %.02fs)',
conn.id, self.name, conn.rdy, last_message_duration)
if conn.rdy > 0 and last_message_duration > self.low_rdy_idle_timeout:
logger.info('[%s:%s] idle connection, giving up RDY count', conn.id, self.name)
self._send_rdy(conn, 0)
if not (conn.in_flight or conn.rdy):
possible_conns.append(conn)
elif conn.in_flight:
in_flight.append(conn)

if backoff_interval:
max_in_flight = 1 - self.total_rdy
max_in_flight = max(0, 1 - len(in_flight))
else:
max_in_flight = self.max_in_flight - self.total_rdy
max_in_flight = self.max_in_flight - len(in_flight)

# if moving any connections from RDY 0 to non-0 would violate in-flight
# constraints, set RDY 0 on some connection with a message in flight so that
# redistribution will eventually proceed
if not max_in_flight:
c = random.choice(in_flight)
logger.info('[%s:%s] too many msgs in flight, giving up RDY count', c.id, self.name)
self._send_rdy(c, 0)

# randomly walk the list of possible connections and send RDY 1 (up to our
# calculated "max_in_flight"). We only need to send RDY 1 because in both
Expand All @@ -677,7 +671,6 @@ def _redistribute_rdy_state(self):
# We also don't attempt to avoid the connections who previously might have had RDY 1
# because it would be overly complicated and not actually worth it (ie. given enough
# redistribution rounds it doesn't matter).
possible_conns = list(self.conns.values())
while possible_conns and max_in_flight:
max_in_flight -= 1
conn = possible_conns.pop(random.randrange(len(possible_conns)))
Expand Down
1 change: 1 addition & 0 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _get_conn(reader):

def _send_message(conn):
msg = _get_message(conn)
conn.in_flight += 1
conn.trigger(event.MESSAGE, conn=conn, message=msg)
return msg

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_conn_messages(self):

def _on_message(*args, **kwargs):
self.msg_count += 1
if c.rdy == 0:
if c.in_flight == 5:
self.stop()

def _on_ready(*args, **kwargs):
Expand Down

0 comments on commit a3acfe4

Please sign in to comment.